From 6307deb701b4d7424ee1c1f37f0b3fc56a5c878f Mon Sep 17 00:00:00 2001 From: "Yuyao Huang (Sam)" Date: Sat, 28 Mar 2026 08:39:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E6=9D=83=E9=99=90=E6=8E=A7=E5=88=B6=E3=80=81=E4=BC=9A=E8=AF=9D?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=92=8C=E5=AE=A1=E8=AE=A1=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加用户权限检查功能,支持配置允许使用的用户列表 - 实现会话管理功能,包括会话创建、关闭、列表和切换 - 新增审计日志模块,记录所有交互信息 - 改进WebSocket连接,增加自动重连机制 - 添加健康检查端点,包含Claude服务可用性测试 - 实现会话持久化功能,重启后恢复会话状态 - 增加命令行功能支持,包括/new、/list、/close等命令 - 优化消息处理流程,支持直接传递模式 --- README.md | 164 +++++++++++++++++++++++----- ROADMAP.md | 93 ---------------- agent/audit.py | 76 +++++++++++++ agent/manager.py | 137 +++++++++++++++++------- bot/commands.py | 191 +++++++++++++++++++++++++++++++++ bot/feishu.py | 241 ++++++++++++++++++++++++++++++++++++++---- bot/handler.py | 82 +++++++++----- config.py | 12 +++ main.py | 54 ++++++++-- orchestrator/agent.py | 36 ++++++- orchestrator/tools.py | 57 ++++++++-- 11 files changed, 921 insertions(+), 222 deletions(-) delete mode 100644 ROADMAP.md create mode 100644 agent/audit.py create mode 100644 bot/commands.py diff --git a/README.md b/README.md index a080826..5674006 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,19 @@ # PhoneWork -Feishu bot integration with Claude Code CLI. +Feishu bot that lets users control Claude Code CLI from their phone. ## Architecture ``` ┌─────────────┐ WebSocket ┌──────────────┐ LangChain ┌─────────────┐ │ Feishu │ ◄──────────────► │ FastAPI │ ◄──────────────► │ LLM API │ -│ (client) │ │ (server) │ │ (OpenAI) │ +│ (client) │ │ (server) │ │ (ZhipuAI) │ └─────────────┘ └──────────────┘ └─────────────┘ │ ▼ ┌─────────────┐ │ Claude Code │ - │ (PTY) │ + │ (headless) │ └─────────────┘ ``` @@ -22,36 +22,144 @@ Feishu bot integration with Claude Code CLI. | Module | Purpose | |--------|---------| | `main.py` | FastAPI entry point, starts WebSocket client + session manager | -| `bot/handler.py` | Receives Feishu events, dispatches to orchestrator | -| `bot/feishu.py` | Sends replies back to Feishu chats | -| `orchestrator/agent.py` | LangChain agent with per-user history + active session tracking | -| `orchestrator/tools.py` | Tools: `create_conversation`, `send_to_conversation`, `close_conversation` | -| `agent/manager.py` | Session registry with idle timeout reaper | -| `agent/pty_process.py` | Runs `claude -p` headlessly, manages session continuity | +| `bot/handler.py` | Receives Feishu events via long-connection WebSocket | +| `bot/feishu.py` | Sends text/file/card replies back to Feishu | +| `bot/commands.py` | Slash command handler (`/new`, `/list`, `/close`, `/switch`, `/help`) | +| `orchestrator/agent.py` | LangChain agent with per-user history + passthrough mode | +| `orchestrator/tools.py` | Tools: `create_conversation`, `send_to_conversation`, `list_conversations`, `close_conversation` | +| `agent/manager.py` | Session registry with persistence and idle timeout reaper | +| `agent/pty_process.py` | Runs `claude -p` headlessly, manages session continuity via `--resume` | +| `agent/audit.py` | Audit log of all interactions | -**Flow:** User message → Feishu WebSocket → Handler → Orchestrator (LLM decides action) → Tool → Session Manager → Claude Code PTY → Response back to Feishu +**Flow:** User message → Feishu WebSocket → Handler → (passthrough or LLM) → Session Manager → `claude -p` → Response back to Feishu -## Setup +--- -1. **Feishu App**: Create at https://open.feishu.cn - - Enable Bot capability + long-connection event subscription - - Get `FEISHU_APP_ID` and `FEISHU_APP_SECRET` +## Feishu App Setup -2. **LLM Endpoint**: Configure OpenAI-compatible endpoint - - `OPENAI_BASE_URL`, `OPENAI_API_KEY`, `OPENAI_MODEL` +### 1. Create App -3. **Claude Code CLI**: Install and authenticate `claude` command +Go to [Feishu Open Platform](https://open.feishu.cn/app) → **Create App** → **Custom App**. -4. **Configuration**: - ```bash - cp keyring.example.yaml keyring.yaml - # Edit keyring.yaml with your credentials - ``` +Record the **App ID** and **App Secret** from the Credentials page. -5. **Run**: - ```bash - pip install -r requirements.txt - python main.py - ``` +### 2. Enable Bot Capability -**Requirements**: Python 3.11+ +**App Features** → **Bot** → Enable. + +### 3. Subscribe to Events (Long-connection mode) + +**Event Subscriptions** → **Request URL** tab: + +- Switch to **"Use long connection to receive events"** (长连接接收事件) +- No public URL required + +Add event subscription: + +| Event | Event Type | +|-------|-----------| +| Receive messages | `im.message.receive_v1` | + +### 4. Required Permissions (API Scopes) + +Go to **Permissions & Scopes** and add the following: + +| Permission | API Scope | Used For | +|------------|-----------|----------| +| Read private messages sent to the bot | `im:message` (read) | Receiving user messages via WebSocket | +| Send messages | `im:message:send_as_bot` | Sending text replies | +| Upload files | `im:resource` | Uploading files before sending | +| Send messages in private chats | `im:message` (write) | Sending file messages | + +Minimal scope list to request: + +``` +im:message +im:message:send_as_bot +im:resource +``` + +> **Note:** `im:resource` covers both file upload (`im.v1.file.create`) and sending +> file-type messages. Without it, `send_file()` will fail with a permission error. + +### 5. Publish App + +After adding all permissions: + +1. **Version Management** → Create a new version → Submit for review (or self-publish if in the same org) +2. Install the app to your workspace + +--- + +## Configuration + +Copy and fill in credentials: + +```bash +cp keyring.example.yaml keyring.yaml +``` + +`keyring.yaml` fields: + +```yaml +FEISHU_APP_ID: cli_xxxxxxxxxxxxxxxx +FEISHU_APP_SECRET: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +OPENAI_BASE_URL: https://open.bigmodel.cn/api/paas/v4/ +OPENAI_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +OPENAI_MODEL: glm-4.7 + +# Root directory for all project sessions (absolute path) +WORKING_DIR: C:/Users/yourname/projects + +# Allowlist of Feishu open_ids that may use the bot. +# Leave empty to allow all users. +ALLOWED_OPEN_IDS: + - ou_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +``` + +--- + +## Installation & Run + +**Requirements:** Python 3.11+, [Claude Code CLI](https://claude.ai/code) installed and authenticated. + +```bash +python -m venv .venv +source .venv/Scripts/activate # Windows +# source .venv/bin/activate # Linux/macOS + +pip install -r requirements.txt +python main.py +``` + +Server listens on `http://0.0.0.0:8000`. + +Health check: `GET /health` +Claude smoke test: `GET /health/claude` +Active sessions: `GET /sessions` + +--- + +## Bot Commands + +| Command | Description | +|---------|-------------| +| `/new [msg]` | Create a new Claude Code session in `` | +| `/new [msg] --timeout N` | Create with custom CC timeout (seconds) | +| `/new [msg] --idle N` | Create with custom idle timeout (seconds) | +| `/list` | List your active sessions | +| `/switch ` | Switch active session to number `` from `/list` | +| `/close [n]` | Close active session (or session ``) | +| `/help` | Show command reference | + +Any message without a `/` prefix is forwarded directly to the active Claude Code session. + +--- + +## Security + +- **Allowlist** (`ALLOWED_OPEN_IDS`): only listed `open_id`s can use the bot. Empty = open to all. +- **Path sandbox**: all session directories must be under `WORKING_DIR`; path traversal is blocked. +- **Session ownership**: sessions are tied to the creating user; other users cannot send to them. +- **Audit log**: all prompts and responses are written to `agent/audit.log`. diff --git a/ROADMAP.md b/ROADMAP.md deleted file mode 100644 index 56ef465..0000000 --- a/ROADMAP.md +++ /dev/null @@ -1,93 +0,0 @@ -# PhoneWork Roadmap - -Issues observed in real usage, grouped by impact. No priority order within each phase. - ---- - -## Phase 1 — Core Reliability - -These are friction points that directly break or degrade the basic send-message → get-reply loop. - -### 1.1 Long output splitting -**Problem:** Feishu truncates messages at 4000 chars. Long code output is silently cut. -**Fix:** Automatically split into multiple sequential messages with `[1/3]`, `[2/3]` headers. - -### 1.2 Concurrent message handling -**Problem:** If the user sends two messages quickly, both fire `agent.run()` simultaneously for the same user, causing race conditions in `_active_conv` and interleaved `--resume` calls to the same CC session. -**Fix:** Per-user async lock (or queue) so messages process one at a time per user. - -### 1.3 Session persistence across restarts -**Problem:** `manager._sessions` is in-memory. A server restart loses all active sessions. Users have to recreate them. -**Fix:** Persist `{conv_id, cwd, cc_session_id}` to a JSON file on disk; reload on startup. - -### 1.4 Mail boy passthrough mode -**Problem:** The mail boy (GLM) sometimes paraphrases or summarizes instead of relaying verbatim, losing code blocks and exact output. -**Fix:** Bypass the mail boy entirely for follow-up messages — detect that there's an active session and call `manager.send()` directly without an LLM round-trip. - ---- - -## Phase 2 — Better Interaction Model - -Reducing the number of messages needed to get things done. - -### 2.1 Slash commands -**Problem:** Users must phrase everything as natural language for the mail boy to interpret. -**Fix:** Recognize a small set of commands directly in `handler.py` before hitting the LLM: -- `/new ` — create session -- `/list` — list sessions -- `/close` — close active session -- `/switch ` — switch active session by number -- `/retry` — resend last message to CC - -### 2.2 Multi-session switching -**Problem:** Only one "active session" per user. To switch projects, the user must remember conv_ids. -**Fix:** `/list` shows numbered sessions; `/switch 2` activates session #2. The system prompt shows all open sessions, not just the active one. - -### 2.3 Feishu message cards -**Problem:** Plain text is hard to scan — code blocks, file paths, and status info all look the same. -**Fix:** Use Feishu Interactive Cards (`msg_type: interactive`) to render: -- Session status as a structured card (project name, cwd, session ID) -- Action buttons: **Continue**, **Close session**, **Run again** - ---- - -## Phase 3 — Operational Quality - -Making it reliable enough to leave running 24/7. - -### 3.1 Health check improvements -**Problem:** `/health` only reports session count. No way to know if the Feishu WS connection is alive, or if CC is callable. -**Fix:** Add to `/health`: -- WebSocket connection status -- Last message received timestamp -- A `claude -p "ping"` smoke test result - -### 3.2 Automatic reconnection -**Problem:** The Feishu WebSocket thread is a daemon — if it dies silently (network blip), no messages are received and there's no recovery. -**Fix:** Wrap `ws_client.start()` in a retry loop with exponential backoff and log reconnection events. - -### 3.3 Per-session timeout configuration -**Problem:** All sessions share a 30-min idle timeout and 300s CC timeout. Long-running tasks (e.g. running tests) may need more; quick chats need less. -**Fix:** Allow per-session timeout overrides; expose via `/new --timeout 600`. - -### 3.4 Audit log -**Problem:** No record of what was sent to Claude Code or what it did. Impossible to debug after the fact. -**Fix:** Append each `(timestamp, conv_id, prompt, response)` to a JSONL file per session under the project directory. - ---- - -## Phase 4 — Multi-user & Security - -For sharing the bot with teammates. - -### 4.1 User allowlist -**Problem:** Anyone who can message the bot can run arbitrary code via Claude Code. -**Fix:** `ALLOWED_OPEN_IDS` list in `keyring.yaml`; reject messages from unknown users. - -### 4.2 Per-user session isolation -**Problem:** All users share the same `manager` singleton — user A could theoretically send to user B's session by guessing a conv_id. -**Fix:** Namespace sessions by user_id; `send_to_conversation` validates that the requesting user owns the session. - -### 4.3 Working directory sandboxing -**Problem:** The safety check in `_resolve_dir` blocks paths outside `WORKING_DIR`, but Claude Code itself runs with `--dangerously-skip-permissions` and can write anywhere. -**Fix:** Consider running CC in a restricted user account or container; or drop `--dangerously-skip-permissions` and implement a permission-approval flow via Feishu buttons. diff --git a/agent/audit.py b/agent/audit.py new file mode 100644 index 0000000..beb9dc8 --- /dev/null +++ b/agent/audit.py @@ -0,0 +1,76 @@ +"""Audit logging for Claude Code sessions.""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +AUDIT_DIR = Path(__file__).parent.parent / "audit" + + +def _ensure_audit_dir() -> None: + AUDIT_DIR.mkdir(parents=True, exist_ok=True) + + +def log_interaction( + conv_id: str, + prompt: str, + response: str, + cwd: Optional[str] = None, + user_id: Optional[str] = None, +) -> None: + """ + Log an interaction to a JSONL file per session. + + Args: + conv_id: Conversation/session ID + prompt: User's message/prompt + response: Claude Code's response + cwd: Working directory (optional) + user_id: User identifier (optional) + """ + try: + _ensure_audit_dir() + log_file = AUDIT_DIR / f"{conv_id}.jsonl" + + entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "conv_id": conv_id, + "prompt": prompt, + "response": response, + } + if cwd: + entry["cwd"] = cwd + if user_id: + entry["user_id"] = user_id[-8:] if len(user_id) > 8 else user_id + + with open(log_file, "a", encoding="utf-8") as f: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + + logger.debug("Logged interaction for session %s", conv_id) + + except Exception: + logger.exception("Failed to log audit entry for session %s", conv_id) + + +def get_audit_log(conv_id: str, limit: int = 100) -> list[dict]: + """Read the audit log for a session.""" + log_file = AUDIT_DIR / f"{conv_id}.jsonl" + if not log_file.exists(): + return [] + + entries = [] + try: + with open(log_file, "r", encoding="utf-8") as f: + for line in f: + if line.strip(): + entries.append(json.loads(line)) + except Exception: + logger.exception("Failed to read audit log for session %s", conv_id) + + return entries[-limit:] diff --git a/agent/manager.py b/agent/manager.py index 978c431..785ec69 100644 --- a/agent/manager.py +++ b/agent/manager.py @@ -3,45 +3,55 @@ from __future__ import annotations import asyncio +import json import logging import uuid -from dataclasses import dataclass, field +from dataclasses import dataclass, field, asdict +from pathlib import Path from typing import Dict, List, Optional from agent.pty_process import run_claude +from agent.audit import log_interaction logger = logging.getLogger(__name__) -IDLE_TIMEOUT = 30 * 60 # 30 minutes in seconds +DEFAULT_IDLE_TIMEOUT = 30 * 60 +DEFAULT_CC_TIMEOUT = 300.0 +PERSISTENCE_FILE = Path(__file__).parent.parent / "sessions.json" @dataclass class Session: conv_id: str cwd: str - # Stable UUID passed to `claude --session-id` so CC owns the history + owner_id: str = "" cc_session_id: str = field(default_factory=lambda: str(uuid.uuid4())) - last_activity: float = field(default_factory=lambda: asyncio.get_event_loop().time()) - # True after the first message has been sent (so we know to use --resume) + last_activity: float = 0.0 started: bool = False + idle_timeout: int = DEFAULT_IDLE_TIMEOUT + cc_timeout: float = DEFAULT_CC_TIMEOUT def touch(self) -> None: self.last_activity = asyncio.get_event_loop().time() + def to_dict(self) -> dict: + return asdict(self) + + @classmethod + def from_dict(cls, data: dict) -> "Session": + return cls(**data) + class SessionManager: - """Registry of active Claude Code project sessions.""" + """Registry of active Claude Code project sessions with persistence and user isolation.""" def __init__(self) -> None: self._sessions: Dict[str, Session] = {} self._lock = asyncio.Lock() self._reaper_task: Optional[asyncio.Task] = None - # ------------------------------------------------------------------ - # Public API - # ------------------------------------------------------------------ - async def start(self) -> None: + self._load() loop = asyncio.get_event_loop() self._reaper_task = loop.create_task(self._reaper_loop()) @@ -50,62 +60,116 @@ class SessionManager: self._reaper_task.cancel() async with self._lock: self._sessions.clear() + if PERSISTENCE_FILE.exists(): + PERSISTENCE_FILE.unlink() - async def create(self, conv_id: str, working_dir: str) -> Session: - """Register a new session for the given working directory.""" + async def create( + self, + conv_id: str, + working_dir: str, + owner_id: str = "", + idle_timeout: int = DEFAULT_IDLE_TIMEOUT, + cc_timeout: float = DEFAULT_CC_TIMEOUT, + ) -> Session: async with self._lock: - session = Session(conv_id=conv_id, cwd=working_dir) + session = Session( + conv_id=conv_id, + cwd=working_dir, + owner_id=owner_id, + idle_timeout=idle_timeout, + cc_timeout=cc_timeout, + ) self._sessions[conv_id] = session + self._save() logger.info( - "Created session %s (cc_session_id=%s) in %s", - conv_id, session.cc_session_id, working_dir, + "Created session %s (owner=...%s) in %s (idle=%ds, cc=%.0fs)", + conv_id, owner_id[-8:] if owner_id else "-", working_dir, idle_timeout, cc_timeout, ) return session - async def send(self, conv_id: str, message: str) -> str: - """ - Run claude -p with the message in the session's directory. - - - First message: uses --session-id to establish the CC session. - - Subsequent messages: uses --resume so CC has full history. - """ + async def send(self, conv_id: str, message: str, user_id: Optional[str] = None) -> str: async with self._lock: session = self._sessions.get(conv_id) if session is None: raise KeyError(f"No session for conv_id={conv_id!r}") + if session.owner_id and user_id and session.owner_id != user_id: + raise PermissionError(f"Session {conv_id} belongs to another user") session.touch() cwd = session.cwd cc_session_id = session.cc_session_id + cc_timeout = session.cc_timeout first_message = not session.started if first_message: session.started = True + self._save() output = await run_claude( message, cwd=cwd, cc_session_id=cc_session_id, resume=not first_message, + timeout=cc_timeout, ) + + log_interaction( + conv_id=conv_id, + prompt=message, + response=output, + cwd=cwd, + user_id=user_id, + ) + return output - async def close(self, conv_id: str) -> bool: - """Remove a session. Returns True if it existed.""" + async def close(self, conv_id: str, user_id: Optional[str] = None) -> bool: async with self._lock: - if conv_id not in self._sessions: + session = self._sessions.get(conv_id) + if session is None: return False + if session.owner_id and user_id and session.owner_id != user_id: + raise PermissionError(f"Session {conv_id} belongs to another user") del self._sessions[conv_id] + self._save() logger.info("Closed session %s", conv_id) return True - def list_sessions(self) -> list[dict]: + def list_sessions(self, user_id: Optional[str] = None) -> list[dict]: + sessions = self._sessions.values() + if user_id: + sessions = [s for s in sessions if not s.owner_id or s.owner_id == user_id] return [ - {"conv_id": s.conv_id, "cwd": s.cwd, "cc_session_id": s.cc_session_id} - for s in self._sessions.values() + { + "conv_id": s.conv_id, + "cwd": s.cwd, + "owner_id": s.owner_id[-8:] if s.owner_id else None, + "cc_session_id": s.cc_session_id, + "started": s.started, + "idle_timeout": s.idle_timeout, + "cc_timeout": s.cc_timeout, + } + for s in sessions ] - # ------------------------------------------------------------------ - # Internal helpers - # ------------------------------------------------------------------ + def _save(self) -> None: + try: + data = {cid: s.to_dict() for cid, s in self._sessions.items()} + with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE) + except Exception: + logger.exception("Failed to save sessions") + + def _load(self) -> None: + if not PERSISTENCE_FILE.exists(): + return + try: + with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f: + data = json.load(f) + for cid, sdata in data.items(): + self._sessions[cid] = Session.from_dict(sdata) + logger.info("Loaded %d sessions from %s", len(self._sessions), PERSISTENCE_FILE) + except Exception: + logger.exception("Failed to load sessions") async def _reaper_loop(self) -> None: while True: @@ -115,14 +179,15 @@ class SessionManager: async def _reap_idle(self) -> None: now = asyncio.get_event_loop().time() async with self._lock: - to_close = [ - cid for cid, s in self._sessions.items() - if (now - s.last_activity) > IDLE_TIMEOUT - ] + to_close = [] + for cid, s in self._sessions.items(): + if s.last_activity > 0 and (now - s.last_activity) > s.idle_timeout: + to_close.append(cid) for cid in to_close: del self._sessions[cid] logger.info("Reaped idle session %s", cid) + if to_close: + self._save() -# Module-level singleton manager = SessionManager() diff --git a/bot/commands.py b/bot/commands.py new file mode 100644 index 0000000..bd7552d --- /dev/null +++ b/bot/commands.py @@ -0,0 +1,191 @@ +"""Slash command handler for direct bot control.""" + +from __future__ import annotations + +import argparse +import json +import logging +import re +import uuid +from typing import Optional, Tuple + +from agent.manager import manager +from orchestrator.agent import agent +from orchestrator.tools import set_current_user, get_current_user + +logger = logging.getLogger(__name__) + + +def parse_command(text: str) -> Optional[Tuple[str, str]]: + """ + Parse a slash command from text. + Returns (command, args) or None if not a command. + """ + text = text.strip() + if not text.startswith("/"): + return None + parts = text.split(None, 1) + cmd = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + return (cmd, args) + + +async def handle_command(user_id: str, text: str) -> Optional[str]: + """ + Handle a slash command. Returns the reply or None if not a command. + """ + parsed = parse_command(text) + if not parsed: + return None + + cmd, args = parsed + logger.info("Command: %s args=%r user=...%s", cmd, args[:50], user_id[-8:]) + + set_current_user(user_id) + + if cmd in ("/new", "/n"): + return await _cmd_new(user_id, args) + elif cmd in ("/list", "/ls", "/l"): + return await _cmd_list(user_id) + elif cmd in ("/close", "/c"): + return await _cmd_close(user_id, args) + elif cmd in ("/switch", "/s"): + return await _cmd_switch(user_id, args) + elif cmd == "/retry": + return await _cmd_retry(user_id) + elif cmd in ("/help", "/h", "/?"): + return _cmd_help() + else: + return f"Unknown command: {cmd}\n\n{_cmd_help()}" + + +async def _cmd_new(user_id: str, args: str) -> str: + """Create a new session.""" + if not args: + return "Usage: /new [initial_message] [--timeout N]\nExample: /new todo_app fix the bug --timeout 600" + + parser = argparse.ArgumentParser() + parser.add_argument("working_dir", nargs="?", help="Project directory") + parser.add_argument("rest", nargs="*", help="Initial message") + parser.add_argument("--timeout", type=int, default=None, help="CC timeout in seconds") + parser.add_argument("--idle", type=int, default=None, help="Idle timeout in seconds") + + try: + parsed = parser.parse_args(args.split()) + except SystemExit: + return "Usage: /new [initial_message] [--timeout N] [--idle N]" + + if not parsed.working_dir: + return "Error: project_dir is required" + + working_dir = parsed.working_dir + initial_msg = " ".join(parsed.rest) if parsed.rest else None + + from orchestrator.tools import CreateConversationTool + + tool = CreateConversationTool() + result = await tool._arun( + working_dir=working_dir, + initial_message=initial_msg, + cc_timeout=parsed.timeout, + idle_timeout=parsed.idle, + ) + try: + data = json.loads(result) + if "error" in data: + return f"Error: {data['error']}" + conv_id = data.get("conv_id", "") + agent._active_conv[user_id] = conv_id + cwd = data.get("working_dir", working_dir) + reply = f"✓ Created session `{conv_id}` in `{cwd}`" + if parsed.timeout: + reply += f" (timeout: {parsed.timeout}s)" + if initial_msg: + reply += f"\n\nSent: {initial_msg[:100]}..." + return reply + except Exception: + return result + + +async def _cmd_list(user_id: str) -> str: + """List all sessions for this user.""" + sessions = manager.list_sessions(user_id=user_id) + if not sessions: + return "No active sessions." + + active = agent.get_active_conv(user_id) + lines = ["**Your Sessions:**\n"] + for i, s in enumerate(sessions, 1): + marker = "→ " if s["conv_id"] == active else " " + lines.append(f"{marker}{i}. `{s['conv_id']}` - `{s['cwd']}`") + lines.append("\nUse `/switch ` to activate a session.") + return "\n".join(lines) + + +async def _cmd_close(user_id: str, args: str) -> str: + """Close a session.""" + sessions = manager.list_sessions(user_id=user_id) + if not sessions: + return "No sessions to close." + + if args: + try: + idx = int(args) - 1 + if 0 <= idx < len(sessions): + conv_id = sessions[idx]["conv_id"] + else: + return f"Invalid session number. Use 1-{len(sessions)}." + except ValueError: + conv_id = args.strip() + else: + conv_id = agent.get_active_conv(user_id) + if not conv_id: + return "No active session. Use `/close ` or `/close `." + + try: + success = await manager.close(conv_id, user_id=user_id) + if success: + if agent.get_active_conv(user_id) == conv_id: + agent._active_conv[user_id] = None + return f"✓ Closed session `{conv_id}`" + else: + return f"Session `{conv_id}` not found." + except PermissionError as e: + return str(e) + + +async def _cmd_switch(user_id: str, args: str) -> str: + """Switch to a different session.""" + sessions = manager.list_sessions(user_id=user_id) + if not sessions: + return "No sessions available." + + if not args: + return "Usage: /switch \n" + await _cmd_list(user_id) + + try: + idx = int(args) - 1 + if 0 <= idx < len(sessions): + conv_id = sessions[idx]["conv_id"] + agent._active_conv[user_id] = conv_id + return f"✓ Switched to session `{conv_id}` ({sessions[idx]['cwd']})" + else: + return f"Invalid session number. Use 1-{len(sessions)}." + except ValueError: + return f"Invalid number: {args}" + + +async def _cmd_retry(user_id: str) -> str: + """Retry the last message (placeholder - needs history tracking).""" + return "Retry not yet implemented. Just send your message again." + + +def _cmd_help() -> str: + """Show help.""" + return """**Commands:** +/new [msg] [--timeout N] [--idle N] - Create session +/list - List your sessions +/close [n] - Close session (active or by number) +/switch - Switch to session by number +/retry - Retry last message +/help - Show this help""" diff --git a/bot/feishu.py b/bot/feishu.py index c15d5a7..21586c3 100644 --- a/bot/feishu.py +++ b/bot/feishu.py @@ -2,10 +2,14 @@ from __future__ import annotations +import asyncio +import json import logging import lark_oapi as lark from lark_oapi.api.im.v1 import ( + CreateFileRequest, + CreateFileRequestBody, CreateMessageRequest, CreateMessageRequestBody, ) @@ -14,8 +18,7 @@ from config import FEISHU_APP_ID, FEISHU_APP_SECRET logger = logging.getLogger(__name__) -# Max Feishu text message length -MAX_TEXT_LEN = 4000 +MAX_TEXT_LEN = 3900 def _make_client() -> lark.Client: @@ -31,54 +34,252 @@ def _make_client() -> lark.Client: _client = _make_client() -def _truncate(text: str) -> str: +def _split_message(text: str) -> list[str]: if len(text) <= MAX_TEXT_LEN: - return text - return text[: MAX_TEXT_LEN - 20] + "\n...[truncated]" + return [text] + parts: list[str] = [] + remaining = text + while remaining: + if len(remaining) <= MAX_TEXT_LEN: + parts.append(remaining) + break + chunk = remaining[:MAX_TEXT_LEN] + last_newline = chunk.rfind("\n") + if last_newline > MAX_TEXT_LEN // 2: + chunk = remaining[:last_newline + 1] + parts.append(chunk) + remaining = remaining[len(chunk):] + total = len(parts) + headered_parts = [] + for i, part in enumerate(parts, 1): + headered_parts.append(f"[{i}/{total}]\n{part}") + return headered_parts async def send_text(receive_id: str, receive_id_type: str, text: str) -> None: """ Send a plain-text message to a Feishu chat or user. + Automatically splits long messages into multiple parts with [1/N] headers. Args: receive_id: chat_id or open_id depending on receive_id_type. receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id". text: message content. """ - import json as _json - truncated = _truncate(text) - logger.debug( - "[feishu] send_text to=%s type=%s len=%d/%d text=%r", - receive_id, receive_id_type, len(truncated), len(text), truncated[:120], - ) - content = _json.dumps({"text": truncated}, ensure_ascii=False) + parts = _split_message(text) + loop = asyncio.get_event_loop() + for i, part in enumerate(parts): + logger.debug( + "[feishu] send_text to=%s type=%s part=%d/%d len=%d", + receive_id, receive_id_type, i + 1, len(parts), len(part), + ) + content = json.dumps({"text": part}, ensure_ascii=False) + + request = ( + CreateMessageRequest.builder() + .receive_id_type(receive_id_type) + .request_body( + CreateMessageRequestBody.builder() + .receive_id(receive_id) + .msg_type("text") + .content(content) + .build() + ) + .build() + ) + + response = await loop.run_in_executor( + None, + lambda: _client.im.v1.message.create(request), + ) + + if not response.success(): + logger.error( + "Feishu send_text failed: code=%s msg=%s", + response.code, + response.msg, + ) + return + else: + logger.debug("Sent message part %d/%d to %s (%s)", i + 1, len(parts), receive_id, receive_id_type) + + if len(parts) > 1 and i < len(parts) - 1: + await asyncio.sleep(0.3) + + +async def send_card(receive_id: str, receive_id_type: str, title: str, content: str, buttons: list[dict] | None = None) -> None: + """ + Send an interactive card message. + + Args: + receive_id: chat_id or open_id + receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id" + title: Card title + content: Card content (markdown supported) + buttons: List of button dicts with "text" and "value" keys + """ + elements = [ + { + "tag": "div", + "text": { + "tag": "lark_md", + "content": content, + }, + }, + ] + + if buttons: + actions = [] + for btn in buttons: + actions.append({ + "tag": "button", + "text": {"tag": "plain_text", "content": btn.get("text", "Button")}, + "type": "primary", + "value": btn.get("value", {}), + }) + elements.append({"tag": "action", "actions": actions}) + + card = { + "type": "template", + "data": { + "template_id": "AAqkz9****", + "template_variable": { + "title": title, + "elements": elements, + }, + }, + } + + card_content = { + "config": {"wide_screen_mode": True}, + "header": { + "title": {"tag": "plain_text", "content": title}, + "template": "blue", + }, + "elements": elements, + } + + loop = asyncio.get_event_loop() request = ( CreateMessageRequest.builder() .receive_id_type(receive_id_type) .request_body( CreateMessageRequestBody.builder() .receive_id(receive_id) - .msg_type("text") - .content(content) + .msg_type("interactive") + .content(json.dumps(card_content, ensure_ascii=False)) .build() ) .build() ) - import asyncio - loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: _client.im.v1.message.create(request), ) if not response.success(): + logger.error("Feishu send_card failed: code=%s msg=%s", response.code, response.msg) + else: + logger.debug("Sent card to %s (%s)", receive_id, receive_id_type) + + +async def send_file(receive_id: str, receive_id_type: str, file_path: str, file_type: str = "stream") -> None: + """ + Upload a local file to Feishu and send it as a file message. + + Args: + receive_id: chat_id or open_id depending on receive_id_type. + receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id". + file_path: Absolute path to the local file to send. + file_type: Feishu file type — "stream" (generic), "opus", "mp4", "pdf", "doc", "xls", "ppt". + """ + import os + path = os.path.abspath(file_path) + file_name = os.path.basename(path) + loop = asyncio.get_event_loop() + + # Step 1: Upload file → get file_key + with open(path, "rb") as f: + file_data = f.read() + + def _upload(): + req = ( + CreateFileRequest.builder() + .request_body( + CreateFileRequestBody.builder() + .file_type(file_type) + .file_name(file_name) + .file(file_data) + .build() + ) + .build() + ) + return _client.im.v1.file.create(req) + + upload_resp = await loop.run_in_executor(None, _upload) + + if not upload_resp.success(): logger.error( - "Feishu send_text failed: code=%s msg=%s", - response.code, - response.msg, + "Feishu file upload failed: code=%s msg=%s", + upload_resp.code, + upload_resp.msg, + ) + return + + file_key = upload_resp.data.file_key + logger.debug("Uploaded file %r → file_key=%r", file_name, file_key) + + # Step 2: Send file message using the file_key + content = json.dumps({"file_key": file_key}, ensure_ascii=False) + request = ( + CreateMessageRequest.builder() + .receive_id_type(receive_id_type) + .request_body( + CreateMessageRequestBody.builder() + .receive_id(receive_id) + .msg_type("file") + .content(content) + .build() + ) + .build() + ) + + send_resp = await loop.run_in_executor( + None, + lambda: _client.im.v1.message.create(request), + ) + + if not send_resp.success(): + logger.error( + "Feishu send_file failed: code=%s msg=%s", + send_resp.code, + send_resp.msg, ) else: - logger.debug("Sent message to %s (%s)", receive_id, receive_id_type) + logger.debug("Sent file %r to %s (%s)", file_name, receive_id, receive_id_type) + + +def build_session_card(conv_id: str, cwd: str, started: bool) -> dict: + """Build a session status card.""" + status = "🟢 Active" if started else "🟡 Ready" + content = f"**Session ID:** `{conv_id}`\n**Directory:** `{cwd}`\n**Status:** {status}" + return { + "config": {"wide_screen_mode": True}, + "header": { + "title": {"tag": "plain_text", "content": "Claude Code Session"}, + "template": "turquoise", + }, + "elements": [ + {"tag": "div", "text": {"tag": "lark_md", "content": content}}, + {"tag": "hr"}, + { + "tag": "action", + "actions": [ + {"tag": "button", "text": {"tag": "plain_text", "content": "Continue"}, "type": "primary", "value": {"action": "continue", "conv_id": conv_id}}, + {"tag": "button", "text": {"tag": "plain_text", "content": "Close"}, "type": "default", "value": {"action": "close", "conv_id": conv_id}}, + ], + }, + ], + } diff --git a/bot/handler.py b/bot/handler.py index 071c8b9..6699242 100644 --- a/bot/handler.py +++ b/bot/handler.py @@ -6,30 +6,41 @@ import asyncio import json import logging import threading +import time import lark_oapi as lark from lark_oapi.api.im.v1 import P2ImMessageReceiveV1 +from bot.commands import handle_command from bot.feishu import send_text -from config import FEISHU_APP_ID, FEISHU_APP_SECRET +from config import FEISHU_APP_ID, FEISHU_APP_SECRET, is_user_allowed from orchestrator.agent import agent logger = logging.getLogger(__name__) -# Keep a reference to the running event loop so sync callbacks can schedule coroutines _main_loop: asyncio.AbstractEventLoop | None = None +_ws_connected: bool = False +_last_message_time: float = 0.0 +_reconnect_count: int = 0 + + +def get_ws_status() -> dict: + """Return WebSocket connection status.""" + return { + "connected": _ws_connected, + "last_message_time": _last_message_time, + "reconnect_count": _reconnect_count, + } def _handle_message(data: P2ImMessageReceiveV1) -> None: - """ - Synchronous callback invoked by the lark-oapi SDK on every incoming message. - We schedule async work onto the main event loop. - """ + global _last_message_time + _last_message_time = time.time() + try: message = data.event.message sender = data.event.sender - # Log raw event for debugging logger.debug( "event type=%r chat_type=%r content=%r", getattr(message, "message_type", None), @@ -37,18 +48,15 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None: (getattr(message, "content", None) or "")[:100], ) - # Only handle text messages if message.message_type != "text": logger.info("Skipping non-text message_type=%r", message.message_type) return - # Extract fields chat_id: str = message.chat_id raw_content: str = message.content or "{}" content_obj = json.loads(raw_content) text: str = content_obj.get("text", "").strip() - # Strip @mentions (Feishu injects "@bot_name " at start of group messages) import re text = re.sub(r"@\S+\s*", "", text).strip() @@ -62,14 +70,12 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None: logger.info("✉ ...%s → %r", open_id[-8:], text[:80]) - # Use open_id as user identifier for per-user history in the orchestrator user_id = open_id or chat_id if _main_loop is None: logger.error("Main event loop not set; cannot process message") return - # Schedule async processing; fire-and-forget asyncio.run_coroutine_threadsafe( _process_message(user_id, chat_id, text), _main_loop, @@ -79,9 +85,16 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None: async def _process_message(user_id: str, chat_id: str, text: str) -> None: - """Run the orchestration agent and send the reply back to Feishu.""" + """Process message: check allowlist, then commands, then agent.""" try: - reply = await agent.run(user_id, text) + if not is_user_allowed(user_id): + logger.warning("Rejected message from unauthorized user: ...%s", user_id[-8:]) + await send_text(chat_id, "chat_id", "Sorry, you are not authorized to use this bot.") + return + + reply = await handle_command(user_id, text) + if reply is None: + reply = await agent.run(user_id, text) if reply: await send_text(chat_id, "chat_id", reply) except Exception: @@ -112,19 +125,38 @@ def start_websocket_client(loop: asyncio.AbstractEventLoop) -> None: global _main_loop _main_loop = loop - event_handler = build_event_handler() + def _run_with_reconnect() -> None: + global _ws_connected, _reconnect_count + backoff = 1.0 + max_backoff = 60.0 - ws_client = lark.ws.Client( - FEISHU_APP_ID, - FEISHU_APP_SECRET, - event_handler=event_handler, - log_level=lark.LogLevel.INFO, - ) + while True: + try: + _ws_connected = False + event_handler = build_event_handler() + ws_client = lark.ws.Client( + FEISHU_APP_ID, + FEISHU_APP_SECRET, + event_handler=event_handler, + log_level=lark.LogLevel.INFO, + ) - def _run() -> None: - logger.info("Starting Feishu long-connection client...") - ws_client.start() # blocks until disconnected + logger.info("Starting Feishu long-connection client...") + _ws_connected = True + _reconnect_count += 1 + ws_client.start() + logger.warning("WebSocket disconnected, will reconnect...") - thread = threading.Thread(target=_run, daemon=True, name="feishu-ws") + except Exception as e: + logger.error("WebSocket error: %s", e) + + finally: + _ws_connected = False + + logger.info("Reconnecting in %.1fs (attempt %d)...", backoff, _reconnect_count) + time.sleep(backoff) + backoff = min(backoff * 2, max_backoff) + + thread = threading.Thread(target=_run_with_reconnect, daemon=True, name="feishu-ws") thread.start() logger.info("Feishu WebSocket thread started") diff --git a/config.py b/config.py index 1f99f57..3071060 100644 --- a/config.py +++ b/config.py @@ -1,5 +1,6 @@ import yaml from pathlib import Path +from typing import List _CONFIG_PATH = Path(__file__).parent / "keyring.yaml" @@ -17,3 +18,14 @@ OPENAI_BASE_URL: str = _cfg["OPENAI_BASE_URL"] OPENAI_API_KEY: str = _cfg["OPENAI_API_KEY"] OPENAI_MODEL: str = _cfg.get("OPENAI_MODEL", "glm-4.7") WORKING_DIR: Path = Path(_cfg.get("WORKING_DIR", Path.home())).expanduser().resolve() + +ALLOWED_OPEN_IDS: List[str] = _cfg.get("ALLOWED_OPEN_IDS", []) +if ALLOWED_OPEN_IDS and not isinstance(ALLOWED_OPEN_IDS, list): + ALLOWED_OPEN_IDS = [str(ALLOWED_OPEN_IDS)] + + +def is_user_allowed(open_id: str) -> bool: + """Check if a user is allowed to use the bot.""" + if not ALLOWED_OPEN_IDS: + return True + return open_id in ALLOWED_OPEN_IDS diff --git a/main.py b/main.py index b00dae6..7141d16 100644 --- a/main.py +++ b/main.py @@ -4,13 +4,14 @@ from __future__ import annotations import asyncio import logging +import time import uvicorn from fastapi import FastAPI from rich.logging import RichHandler from agent.manager import manager -from bot.handler import start_websocket_client +from bot.handler import start_websocket_client, get_ws_status logging.basicConfig( level=logging.DEBUG, @@ -23,18 +24,61 @@ logging.basicConfig( omit_repeated_times=False, )], ) -# Suppress noisy third-party debug output for _noisy in ("httpcore", "httpx", "openai._base_client", "urllib3", "lark_oapi", "websockets"): logging.getLogger(_noisy).setLevel(logging.WARNING) logger = logging.getLogger(__name__) app = FastAPI(title="PhoneWork", version="0.1.0") +START_TIME = time.time() + @app.get("/health") async def health() -> dict: sessions = manager.list_sessions() - return {"status": "ok", "active_sessions": len(sessions)} + ws_status = get_ws_status() + uptime = time.time() - START_TIME + + result = { + "status": "ok", + "uptime_seconds": round(uptime, 1), + "active_sessions": len(sessions), + "websocket": ws_status, + } + + if ws_status.get("connected"): + result["status"] = "ok" + else: + result["status"] = "degraded" + + return result + + +@app.get("/health/claude") +async def health_claude() -> dict: + """Smoke test: run a simple claude -p command.""" + from agent.pty_process import run_claude + import tempfile + import os + + start = time.time() + try: + with tempfile.TemporaryDirectory() as tmpdir: + output = await run_claude( + "Say 'pong' and nothing else", + cwd=tmpdir, + timeout=30.0, + ) + elapsed = time.time() - start + return { + "status": "ok", + "elapsed_seconds": round(elapsed, 2), + "output_preview": output[:100] if output else None, + } + except asyncio.TimeoutError: + return {"status": "timeout", "elapsed_seconds": 30.0} + except Exception as e: + return {"status": "error", "error": str(e)} @app.get("/sessions") @@ -44,11 +88,7 @@ async def list_sessions() -> list: @app.on_event("startup") async def startup_event() -> None: - # Start the session manager's idle-reaper await manager.start() - - # Start the Feishu WebSocket client in a background thread, - # passing the running event loop so async work can be scheduled loop = asyncio.get_event_loop() start_websocket_client(loop) logger.info("PhoneWork started") diff --git a/orchestrator/agent.py b/orchestrator/agent.py index c638cc1..2b8d62f 100644 --- a/orchestrator/agent.py +++ b/orchestrator/agent.py @@ -5,8 +5,10 @@ Uses LangChain 1.x tool-calling pattern: bind_tools + manual agentic loop. from __future__ import annotations +import asyncio import json import logging +import re from collections import defaultdict from typing import Dict, List, Optional @@ -19,8 +21,9 @@ from langchain_core.messages import ( ) from langchain_openai import ChatOpenAI +from agent.manager import manager from config import OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL, WORKING_DIR -from orchestrator.tools import TOOLS +from orchestrator.tools import TOOLS, set_current_user logger = logging.getLogger(__name__) @@ -52,6 +55,12 @@ Guidelines: MAX_ITERATIONS = 10 _TOOL_MAP = {t.name: t for t in TOOLS} +COMMAND_PATTERN = re.compile(r"^/(new|list|close|switch|retry|help)", re.IGNORECASE) + + +def _looks_like_command(text: str) -> bool: + return bool(COMMAND_PATTERN.match(text.strip())) + class OrchestrationAgent: """Per-user agent with conversation history and active session tracking.""" @@ -69,6 +78,8 @@ class OrchestrationAgent: self._history: Dict[str, List[BaseMessage]] = defaultdict(list) # user_id -> most recently active conv_id self._active_conv: Dict[str, Optional[str]] = defaultdict(lambda: None) + # user_id -> asyncio.Lock (prevents concurrent processing per user) + self._user_locks: Dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) def _build_system_prompt(self, user_id: str) -> str: conv_id = self._active_conv[user_id] @@ -81,13 +92,35 @@ class OrchestrationAgent: active_session_line=active_line, ) + def get_active_conv(self, user_id: str) -> Optional[str]: + return self._active_conv.get(user_id) + async def run(self, user_id: str, text: str) -> str: """Process a user message and return the agent's reply.""" + async with self._user_locks[user_id]: + return await self._run_locked(user_id, text) + + async def _run_locked(self, user_id: str, text: str) -> str: + """Internal implementation, must be called with user lock held.""" + set_current_user(user_id) active_conv = self._active_conv[user_id] short_uid = user_id[-8:] logger.info(">>> user=...%s conv=%s msg=%r", short_uid, active_conv, text[:80]) logger.debug(" history_len=%d", len(self._history[user_id])) + # Passthrough mode: if active session and not a command, bypass LLM + if active_conv and not _looks_like_command(text): + try: + reply = await manager.send(active_conv, text, user_id=user_id) + logger.info("<<< [passthrough] reply: %r", reply[:120]) + return reply + except KeyError: + logger.warning("Session %s no longer exists, clearing active_conv", active_conv) + self._active_conv[user_id] = None + except Exception as exc: + logger.exception("Passthrough error for user=%s", user_id) + return f"[Error] {exc}" + messages: List[BaseMessage] = ( [SystemMessage(content=self._build_system_prompt(user_id))] + self._history[user_id] @@ -161,5 +194,4 @@ class OrchestrationAgent: return reply -# Module-level singleton agent = OrchestrationAgent() diff --git a/orchestrator/tools.py b/orchestrator/tools.py index 349a3df..82f195a 100644 --- a/orchestrator/tools.py +++ b/orchestrator/tools.py @@ -4,6 +4,7 @@ from __future__ import annotations import json import uuid +from contextvars import ContextVar from pathlib import Path from typing import Optional, Type @@ -13,6 +14,16 @@ from pydantic import BaseModel, Field from agent.manager import manager from config import WORKING_DIR +_current_user_id: ContextVar[Optional[str]] = ContextVar("current_user_id", default=None) + + +def set_current_user(user_id: Optional[str]) -> None: + _current_user_id.set(user_id) + + +def get_current_user() -> Optional[str]: + return _current_user_id.get() + def _resolve_dir(working_dir: str) -> Path: """ @@ -21,14 +32,21 @@ def _resolve_dir(working_dir: str) -> Path: Rules: - Absolute paths are used as-is (but must stay within WORKING_DIR for safety). - Relative paths / bare names are joined onto WORKING_DIR. + - Path traversal attempts (..) are blocked. - The resolved directory is created if it doesn't exist. """ + working_dir = working_dir.strip() + + if ".." in working_dir.split("/") or ".." in working_dir.split("\\"): + raise ValueError( + "Path traversal not allowed. Use a subfolder name or path inside the working directory." + ) + p = Path(working_dir) if not p.is_absolute(): p = WORKING_DIR / p p = p.resolve() - # Safety: must be inside WORKING_DIR try: p.relative_to(WORKING_DIR) except ValueError: @@ -55,6 +73,8 @@ class CreateConversationInput(BaseModel): ), ) initial_message: Optional[str] = Field(None, description="Optional first message to send after spawning") + idle_timeout: Optional[int] = Field(None, description="Idle timeout in seconds (default 1800)") + cc_timeout: Optional[float] = Field(None, description="Claude Code execution timeout in seconds (default 300)") class SendToConversationInput(BaseModel): @@ -79,17 +99,24 @@ class CreateConversationTool(BaseTool): ) args_schema: Type[BaseModel] = CreateConversationInput - def _run(self, working_dir: str, initial_message: Optional[str] = None) -> str: + def _run(self, working_dir: str, initial_message: Optional[str] = None, idle_timeout: Optional[int] = None, cc_timeout: Optional[float] = None) -> str: raise NotImplementedError("Use async version") - async def _arun(self, working_dir: str, initial_message: Optional[str] = None) -> str: + async def _arun(self, working_dir: str, initial_message: Optional[str] = None, idle_timeout: Optional[int] = None, cc_timeout: Optional[float] = None) -> str: try: resolved = _resolve_dir(working_dir) except ValueError as exc: return json.dumps({"error": str(exc)}) + user_id = get_current_user() conv_id = str(uuid.uuid4())[:8] - await manager.create(conv_id, str(resolved)) + await manager.create( + conv_id, + str(resolved), + owner_id=user_id or "", + idle_timeout=idle_timeout or 1800, + cc_timeout=cc_timeout or 300.0, + ) result: dict = { "conv_id": conv_id, @@ -97,7 +124,7 @@ class CreateConversationTool(BaseTool): } if initial_message: - output = await manager.send(conv_id, initial_message) + output = await manager.send(conv_id, initial_message, user_id=user_id) result["response"] = output else: result["status"] = "Session created. Send a message to start working." @@ -117,11 +144,14 @@ class SendToConversationTool(BaseTool): raise NotImplementedError("Use async version") async def _arun(self, conv_id: str, message: str) -> str: + user_id = get_current_user() try: - output = await manager.send(conv_id, message) + output = await manager.send(conv_id, message, user_id=user_id) return json.dumps({"conv_id": conv_id, "response": output}, ensure_ascii=False) except KeyError: return json.dumps({"error": f"No active session for conv_id={conv_id!r}"}) + except PermissionError as e: + return json.dumps({"error": str(e)}) class ListConversationsTool(BaseTool): @@ -132,7 +162,8 @@ class ListConversationsTool(BaseTool): raise NotImplementedError("Use async version") async def _arun(self) -> str: - sessions = manager.list_sessions() + user_id = get_current_user() + sessions = manager.list_sessions(user_id=user_id) if not sessions: return "No active sessions." return json.dumps(sessions, ensure_ascii=False, indent=2) @@ -147,10 +178,14 @@ class CloseConversationTool(BaseTool): raise NotImplementedError("Use async version") async def _arun(self, conv_id: str) -> str: - closed = await manager.close(conv_id) - if closed: - return f"Session {conv_id!r} closed." - return f"Session {conv_id!r} not found." + user_id = get_current_user() + try: + closed = await manager.close(conv_id, user_id=user_id) + if closed: + return f"Session {conv_id!r} closed." + return f"Session {conv_id!r} not found." + except PermissionError as e: + return str(e) # Module-level tool list for easy import