"""Session registry: tracks active project sessions by conversation ID.""" from __future__ import annotations import asyncio import json import logging import uuid from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Dict, List, Optional from agent.cc_runner import run_claude, DEFAULT_PERMISSION_MODE, VALID_PERMISSION_MODES from agent.audit import log_interaction logger = logging.getLogger(__name__) DEFAULT_IDLE_TIMEOUT = 30 * 60 DEFAULT_CC_TIMEOUT = 300.0 PERSISTENCE_FILE = Path(__file__).parent.parent / "data" / "sessions.json" @dataclass class Session: conv_id: str cwd: str owner_id: str = "" cc_session_id: str = field(default_factory=lambda: str(uuid.uuid4())) last_activity: float = 0.0 started: bool = False idle_timeout: int = DEFAULT_IDLE_TIMEOUT cc_timeout: float = DEFAULT_CC_TIMEOUT permission_mode: str = field(default_factory=lambda: DEFAULT_PERMISSION_MODE) def touch(self) -> None: self.last_activity = asyncio.get_event_loop().time() def to_dict(self) -> dict: return asdict(self) @classmethod def from_dict(cls, data: dict) -> "Session": return cls(**data) class SessionManager: """Registry of active Claude Code project sessions with persistence and user isolation.""" def __init__(self) -> None: self._sessions: dict[str, Session] = {} self._lock = asyncio.Lock() self._reaper_task: Optional[asyncio.Task] = None async def start(self) -> None: self._load() loop = asyncio.get_event_loop() self._reaper_task = loop.create_task(self._reaper_loop()) async def stop(self) -> None: if self._reaper_task: self._reaper_task.cancel() async with self._lock: self._sessions.clear() if PERSISTENCE_FILE.exists(): PERSISTENCE_FILE.unlink() async def create( self, conv_id: str, working_dir: str, owner_id: str = "", idle_timeout: int = DEFAULT_IDLE_TIMEOUT, cc_timeout: float = DEFAULT_CC_TIMEOUT, permission_mode: str = DEFAULT_PERMISSION_MODE, ) -> Session: async with self._lock: session = Session( conv_id=conv_id, cwd=working_dir, owner_id=owner_id, idle_timeout=idle_timeout, cc_timeout=cc_timeout, permission_mode=permission_mode, ) self._sessions[conv_id] = session self._save() logger.info( "Created session %s (owner=...%s) in %s (idle=%ds, cc=%.0fs, perm=%s)", conv_id, owner_id[-8:] if owner_id else "-", working_dir, idle_timeout, cc_timeout, permission_mode, ) return session async def send(self, conv_id: str, message: str, user_id: Optional[str] = None) -> str: async with self._lock: session = self._sessions.get(conv_id) if session is None: raise KeyError(f"No session for conv_id={conv_id!r}") if session.owner_id and user_id and session.owner_id != user_id: raise PermissionError(f"Session {conv_id} belongs to another user") session.touch() cwd = session.cwd cc_session_id = session.cc_session_id cc_timeout = session.cc_timeout permission_mode = session.permission_mode first_message = not session.started if first_message: session.started = True self._save() if cc_timeout > 60: from agent.task_runner import task_runner from orchestrator.tools import get_current_chat, set_current_chat, set_current_user chat_id = get_current_chat() async def run_task(): output = await run_claude( message, cwd=cwd, cc_session_id=cc_session_id, resume=not first_message, timeout=cc_timeout, permission_mode=permission_mode, ) log_interaction( conv_id=conv_id, prompt=message, response=output, cwd=cwd, user_id=user_id, ) return output async def on_task_complete(task) -> None: if not chat_id or not user_id or not task.result: return set_current_user(user_id) set_current_chat(chat_id) from orchestrator.agent import agent follow_up = ( f"CC task completed. Output:\n{task.result}\n\n" f"Original request was: {message}\n\n" "If the user asked you to send a file, use send_file now. " "Otherwise just acknowledge completion." ) reply = await agent.run(user_id, follow_up) if reply: from bot.feishu import send_text await send_text(chat_id, "chat_id", reply) task_id = await task_runner.submit( run_task(), description=f"CC session {conv_id}: {message[:50]}", notify_chat_id=chat_id, user_id=user_id, on_complete=on_task_complete, ) return f"⏳ Task #{task_id} started (timeout: {int(cc_timeout)}s). I'll notify you when it's done." output = await run_claude( message, cwd=cwd, cc_session_id=cc_session_id, resume=not first_message, timeout=cc_timeout, permission_mode=permission_mode, ) log_interaction( conv_id=conv_id, prompt=message, response=output, cwd=cwd, user_id=user_id, ) return output async def close(self, conv_id: str, user_id: Optional[str] = None) -> bool: async with self._lock: session = self._sessions.get(conv_id) if session is None: return False if session.owner_id and user_id and session.owner_id != user_id: raise PermissionError(f"Session {conv_id} belongs to another user") del self._sessions[conv_id] self._save() logger.info("Closed session %s", conv_id) return True def list_sessions(self, user_id: Optional[str] = None) -> list[dict]: sessions = self._sessions.values() if user_id: sessions = [s for s in sessions if not s.owner_id or s.owner_id == user_id] return [ { "conv_id": s.conv_id, "cwd": s.cwd, "owner_id": s.owner_id[-8:] if s.owner_id else None, "cc_session_id": s.cc_session_id, "started": s.started, "idle_timeout": s.idle_timeout, "cc_timeout": s.cc_timeout, "permission_mode": s.permission_mode, } for s in sessions ] def set_permission_mode(self, conv_id: str, mode: str, user_id: Optional[str] = None) -> None: """Change the permission mode for an existing session.""" session = self._sessions.get(conv_id) if session is None: raise KeyError(f"No session for conv_id={conv_id!r}") if session.owner_id and user_id and session.owner_id != user_id: raise PermissionError(f"Session {conv_id} belongs to another user") if mode not in VALID_PERMISSION_MODES: raise ValueError(f"Invalid permission mode {mode!r}. Valid: {VALID_PERMISSION_MODES}") session.permission_mode = mode self._save() logger.info("Set permission_mode=%s for session %s", mode, conv_id) def _save(self) -> None: try: data = {cid: s.to_dict() for cid, s in self._sessions.items()} PERSISTENCE_FILE.parent.mkdir(parents=True, exist_ok=True) with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE) except Exception: logger.exception("Failed to save sessions") def _load(self) -> None: if not PERSISTENCE_FILE.exists(): return try: with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f: data = json.load(f) for cid, sdata in data.items(): self._sessions[cid] = Session.from_dict(sdata) logger.info("Loaded %d sessions from %s", len(self._sessions), PERSISTENCE_FILE) except Exception: logger.exception("Failed to load sessions") async def _reaper_loop(self) -> None: while True: await asyncio.sleep(60) await self._reap_idle() async def _reap_idle(self) -> None: now = asyncio.get_event_loop().time() async with self._lock: to_close = [] for cid, s in self._sessions.items(): if s.last_activity > 0 and (now - s.last_activity) > s.idle_timeout: to_close.append(cid) for cid in to_close: del self._sessions[cid] logger.info("Reaped idle session %s", cid) if to_close: self._save() manager = SessionManager()