PhoneWork/agent/audit.py
Yuyao Huang (Sam) 6307deb701 feat: 实现用户权限控制、会话管理和审计日志功能
- 添加用户权限检查功能,支持配置允许使用的用户列表
- 实现会话管理功能,包括会话创建、关闭、列表和切换
- 新增审计日志模块,记录所有交互信息
- 改进WebSocket连接,增加自动重连机制
- 添加健康检查端点,包含Claude服务可用性测试
- 实现会话持久化功能,重启后恢复会话状态
- 增加命令行功能支持,包括/new、/list、/close等命令
- 优化消息处理流程,支持直接传递模式
2026-03-28 08:39:32 +08:00

77 lines
2.0 KiB
Python

"""Audit logging for Claude Code sessions."""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
AUDIT_DIR = Path(__file__).parent.parent / "audit"
def _ensure_audit_dir() -> None:
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
def log_interaction(
conv_id: str,
prompt: str,
response: str,
cwd: Optional[str] = None,
user_id: Optional[str] = None,
) -> None:
"""
Log an interaction to a JSONL file per session.
Args:
conv_id: Conversation/session ID
prompt: User's message/prompt
response: Claude Code's response
cwd: Working directory (optional)
user_id: User identifier (optional)
"""
try:
_ensure_audit_dir()
log_file = AUDIT_DIR / f"{conv_id}.jsonl"
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"conv_id": conv_id,
"prompt": prompt,
"response": response,
}
if cwd:
entry["cwd"] = cwd
if user_id:
entry["user_id"] = user_id[-8:] if len(user_id) > 8 else user_id
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
logger.debug("Logged interaction for session %s", conv_id)
except Exception:
logger.exception("Failed to log audit entry for session %s", conv_id)
def get_audit_log(conv_id: str, limit: int = 100) -> list[dict]:
"""Read the audit log for a session."""
log_file = AUDIT_DIR / f"{conv_id}.jsonl"
if not log_file.exists():
return []
entries = []
try:
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
entries.append(json.loads(line))
except Exception:
logger.exception("Failed to read audit log for session %s", conv_id)
return entries[-limit:]