PhoneWork/agent/manager.py
Yuyao Huang (Sam) 6307deb701 feat: 实现用户权限控制、会话管理和审计日志功能
- 添加用户权限检查功能,支持配置允许使用的用户列表
- 实现会话管理功能,包括会话创建、关闭、列表和切换
- 新增审计日志模块,记录所有交互信息
- 改进WebSocket连接,增加自动重连机制
- 添加健康检查端点,包含Claude服务可用性测试
- 实现会话持久化功能,重启后恢复会话状态
- 增加命令行功能支持,包括/new、/list、/close等命令
- 优化消息处理流程,支持直接传递模式
2026-03-28 08:39:32 +08:00

194 lines
6.3 KiB
Python

"""Session registry: tracks active project sessions by conversation ID."""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Dict, List, Optional
from agent.pty_process import run_claude
from agent.audit import log_interaction
logger = logging.getLogger(__name__)
DEFAULT_IDLE_TIMEOUT = 30 * 60
DEFAULT_CC_TIMEOUT = 300.0
PERSISTENCE_FILE = Path(__file__).parent.parent / "sessions.json"
@dataclass
class Session:
conv_id: str
cwd: str
owner_id: str = ""
cc_session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
last_activity: float = 0.0
started: bool = False
idle_timeout: int = DEFAULT_IDLE_TIMEOUT
cc_timeout: float = DEFAULT_CC_TIMEOUT
def touch(self) -> None:
self.last_activity = asyncio.get_event_loop().time()
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "Session":
return cls(**data)
class SessionManager:
"""Registry of active Claude Code project sessions with persistence and user isolation."""
def __init__(self) -> None:
self._sessions: Dict[str, Session] = {}
self._lock = asyncio.Lock()
self._reaper_task: Optional[asyncio.Task] = None
async def start(self) -> None:
self._load()
loop = asyncio.get_event_loop()
self._reaper_task = loop.create_task(self._reaper_loop())
async def stop(self) -> None:
if self._reaper_task:
self._reaper_task.cancel()
async with self._lock:
self._sessions.clear()
if PERSISTENCE_FILE.exists():
PERSISTENCE_FILE.unlink()
async def create(
self,
conv_id: str,
working_dir: str,
owner_id: str = "",
idle_timeout: int = DEFAULT_IDLE_TIMEOUT,
cc_timeout: float = DEFAULT_CC_TIMEOUT,
) -> Session:
async with self._lock:
session = Session(
conv_id=conv_id,
cwd=working_dir,
owner_id=owner_id,
idle_timeout=idle_timeout,
cc_timeout=cc_timeout,
)
self._sessions[conv_id] = session
self._save()
logger.info(
"Created session %s (owner=...%s) in %s (idle=%ds, cc=%.0fs)",
conv_id, owner_id[-8:] if owner_id else "-", working_dir, idle_timeout, cc_timeout,
)
return session
async def send(self, conv_id: str, message: str, user_id: Optional[str] = None) -> str:
async with self._lock:
session = self._sessions.get(conv_id)
if session is None:
raise KeyError(f"No session for conv_id={conv_id!r}")
if session.owner_id and user_id and session.owner_id != user_id:
raise PermissionError(f"Session {conv_id} belongs to another user")
session.touch()
cwd = session.cwd
cc_session_id = session.cc_session_id
cc_timeout = session.cc_timeout
first_message = not session.started
if first_message:
session.started = True
self._save()
output = await run_claude(
message,
cwd=cwd,
cc_session_id=cc_session_id,
resume=not first_message,
timeout=cc_timeout,
)
log_interaction(
conv_id=conv_id,
prompt=message,
response=output,
cwd=cwd,
user_id=user_id,
)
return output
async def close(self, conv_id: str, user_id: Optional[str] = None) -> bool:
async with self._lock:
session = self._sessions.get(conv_id)
if session is None:
return False
if session.owner_id and user_id and session.owner_id != user_id:
raise PermissionError(f"Session {conv_id} belongs to another user")
del self._sessions[conv_id]
self._save()
logger.info("Closed session %s", conv_id)
return True
def list_sessions(self, user_id: Optional[str] = None) -> list[dict]:
sessions = self._sessions.values()
if user_id:
sessions = [s for s in sessions if not s.owner_id or s.owner_id == user_id]
return [
{
"conv_id": s.conv_id,
"cwd": s.cwd,
"owner_id": s.owner_id[-8:] if s.owner_id else None,
"cc_session_id": s.cc_session_id,
"started": s.started,
"idle_timeout": s.idle_timeout,
"cc_timeout": s.cc_timeout,
}
for s in sessions
]
def _save(self) -> None:
try:
data = {cid: s.to_dict() for cid, s in self._sessions.items()}
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE)
except Exception:
logger.exception("Failed to save sessions")
def _load(self) -> None:
if not PERSISTENCE_FILE.exists():
return
try:
with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for cid, sdata in data.items():
self._sessions[cid] = Session.from_dict(sdata)
logger.info("Loaded %d sessions from %s", len(self._sessions), PERSISTENCE_FILE)
except Exception:
logger.exception("Failed to load sessions")
async def _reaper_loop(self) -> None:
while True:
await asyncio.sleep(60)
await self._reap_idle()
async def _reap_idle(self) -> None:
now = asyncio.get_event_loop().time()
async with self._lock:
to_close = []
for cid, s in self._sessions.items():
if s.last_activity > 0 and (now - s.last_activity) > s.idle_timeout:
to_close.append(cid)
for cid in to_close:
del self._sessions[cid]
logger.info("Reaped idle session %s", cid)
if to_close:
self._save()
manager = SessionManager()