"""Audit logging for Claude Code sessions.""" from __future__ import annotations import json import logging from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) AUDIT_DIR = Path(__file__).parent.parent / "data" / "audit" def _ensure_audit_dir() -> None: AUDIT_DIR.mkdir(parents=True, exist_ok=True) def log_interaction( conv_id: str, prompt: str, response: str, cwd: Optional[str] = None, user_id: Optional[str] = None, ) -> None: """ Log an interaction to a JSONL file per session. Args: conv_id: Conversation/session ID prompt: User's message/prompt response: Claude Code's response cwd: Working directory (optional) user_id: User identifier (optional) """ try: _ensure_audit_dir() log_file = AUDIT_DIR / f"{conv_id}.jsonl" entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "conv_id": conv_id, "prompt": prompt, "response": response, } if cwd: entry["cwd"] = cwd if user_id: entry["user_id"] = user_id[-8:] if len(user_id) > 8 else user_id with open(log_file, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") logger.debug("Logged interaction for session %s", conv_id) except Exception: logger.exception("Failed to log audit entry for session %s", conv_id) def log_tool_use( session_id: str, tool_name: str, tool_input: dict, tool_response: Optional[object] = None, ) -> None: """Log a tool call to the audit JSONL file.""" try: _ensure_audit_dir() log_file = AUDIT_DIR / f"{session_id}.jsonl" entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "type": "tool_use", "session_id": session_id, "tool_name": tool_name, "tool_input": str(tool_input)[:500], } if tool_response is not None: entry["tool_response"] = str(tool_response)[:500] with open(log_file, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") except Exception: logger.exception("Failed to log tool use for session %s", session_id) def log_permission_decision( conv_id: str, tool_name: str, tool_input: dict, approved: bool, ) -> None: """Log a permission approval/denial decision.""" try: _ensure_audit_dir() log_file = AUDIT_DIR / f"{conv_id}.jsonl" entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "type": "permission_decision", "conv_id": conv_id, "tool_name": tool_name, "tool_input": str(tool_input)[:300], "approved": approved, } with open(log_file, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") except Exception: logger.exception("Failed to log permission decision for session %s", conv_id) def get_audit_log(conv_id: str, limit: int = 100) -> list[dict]: """Read the audit log for a session.""" log_file = AUDIT_DIR / f"{conv_id}.jsonl" if not log_file.exists(): return [] entries = [] try: with open(log_file, "r", encoding="utf-8") as f: for line in f: if line.strip(): entries.append(json.loads(line)) except Exception: logger.exception("Failed to read audit log for session %s", conv_id) return entries[-limit:]