diff --git a/README.md b/README.md
index a080826..5674006 100644
--- a/README.md
+++ b/README.md
@@ -1,19 +1,19 @@
# PhoneWork
-Feishu bot integration with Claude Code CLI.
+Feishu bot that lets users control Claude Code CLI from their phone.
## Architecture
```
┌─────────────┐ WebSocket ┌──────────────┐ LangChain ┌─────────────┐
│ Feishu │ ◄──────────────► │ FastAPI │ ◄──────────────► │ LLM API │
-│ (client) │ │ (server) │ │ (OpenAI) │
+│ (client) │ │ (server) │ │ (ZhipuAI) │
└─────────────┘ └──────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Claude Code │
- │ (PTY) │
+ │ (headless) │
└─────────────┘
```
@@ -22,36 +22,144 @@ Feishu bot integration with Claude Code CLI.
| Module | Purpose |
|--------|---------|
| `main.py` | FastAPI entry point, starts WebSocket client + session manager |
-| `bot/handler.py` | Receives Feishu events, dispatches to orchestrator |
-| `bot/feishu.py` | Sends replies back to Feishu chats |
-| `orchestrator/agent.py` | LangChain agent with per-user history + active session tracking |
-| `orchestrator/tools.py` | Tools: `create_conversation`, `send_to_conversation`, `close_conversation` |
-| `agent/manager.py` | Session registry with idle timeout reaper |
-| `agent/pty_process.py` | Runs `claude -p` headlessly, manages session continuity |
+| `bot/handler.py` | Receives Feishu events via long-connection WebSocket |
+| `bot/feishu.py` | Sends text/file/card replies back to Feishu |
+| `bot/commands.py` | Slash command handler (`/new`, `/list`, `/close`, `/switch`, `/help`) |
+| `orchestrator/agent.py` | LangChain agent with per-user history + passthrough mode |
+| `orchestrator/tools.py` | Tools: `create_conversation`, `send_to_conversation`, `list_conversations`, `close_conversation` |
+| `agent/manager.py` | Session registry with persistence and idle timeout reaper |
+| `agent/pty_process.py` | Runs `claude -p` headlessly, manages session continuity via `--resume` |
+| `agent/audit.py` | Audit log of all interactions |
-**Flow:** User message → Feishu WebSocket → Handler → Orchestrator (LLM decides action) → Tool → Session Manager → Claude Code PTY → Response back to Feishu
+**Flow:** User message → Feishu WebSocket → Handler → (passthrough or LLM) → Session Manager → `claude -p` → Response back to Feishu
-## Setup
+---
-1. **Feishu App**: Create at https://open.feishu.cn
- - Enable Bot capability + long-connection event subscription
- - Get `FEISHU_APP_ID` and `FEISHU_APP_SECRET`
+## Feishu App Setup
-2. **LLM Endpoint**: Configure OpenAI-compatible endpoint
- - `OPENAI_BASE_URL`, `OPENAI_API_KEY`, `OPENAI_MODEL`
+### 1. Create App
-3. **Claude Code CLI**: Install and authenticate `claude` command
+Go to [Feishu Open Platform](https://open.feishu.cn/app) → **Create App** → **Custom App**.
-4. **Configuration**:
- ```bash
- cp keyring.example.yaml keyring.yaml
- # Edit keyring.yaml with your credentials
- ```
+Record the **App ID** and **App Secret** from the Credentials page.
-5. **Run**:
- ```bash
- pip install -r requirements.txt
- python main.py
- ```
+### 2. Enable Bot Capability
-**Requirements**: Python 3.11+
+**App Features** → **Bot** → Enable.
+
+### 3. Subscribe to Events (Long-connection mode)
+
+**Event Subscriptions** → **Request URL** tab:
+
+- Switch to **"Use long connection to receive events"** (长连接接收事件)
+- No public URL required
+
+Add event subscription:
+
+| Event | Event Type |
+|-------|-----------|
+| Receive messages | `im.message.receive_v1` |
+
+### 4. Required Permissions (API Scopes)
+
+Go to **Permissions & Scopes** and add the following:
+
+| Permission | API Scope | Used For |
+|------------|-----------|----------|
+| Read private messages sent to the bot | `im:message` (read) | Receiving user messages via WebSocket |
+| Send messages | `im:message:send_as_bot` | Sending text replies |
+| Upload files | `im:resource` | Uploading files before sending |
+| Send messages in private chats | `im:message` (write) | Sending file messages |
+
+Minimal scope list to request:
+
+```
+im:message
+im:message:send_as_bot
+im:resource
+```
+
+> **Note:** `im:resource` covers both file upload (`im.v1.file.create`) and sending
+> file-type messages. Without it, `send_file()` will fail with a permission error.
+
+### 5. Publish App
+
+After adding all permissions:
+
+1. **Version Management** → Create a new version → Submit for review (or self-publish if in the same org)
+2. Install the app to your workspace
+
+---
+
+## Configuration
+
+Copy and fill in credentials:
+
+```bash
+cp keyring.example.yaml keyring.yaml
+```
+
+`keyring.yaml` fields:
+
+```yaml
+FEISHU_APP_ID: cli_xxxxxxxxxxxxxxxx
+FEISHU_APP_SECRET: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+
+OPENAI_BASE_URL: https://open.bigmodel.cn/api/paas/v4/
+OPENAI_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+OPENAI_MODEL: glm-4.7
+
+# Root directory for all project sessions (absolute path)
+WORKING_DIR: C:/Users/yourname/projects
+
+# Allowlist of Feishu open_ids that may use the bot.
+# Leave empty to allow all users.
+ALLOWED_OPEN_IDS:
+ - ou_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+```
+
+---
+
+## Installation & Run
+
+**Requirements:** Python 3.11+, [Claude Code CLI](https://claude.ai/code) installed and authenticated.
+
+```bash
+python -m venv .venv
+source .venv/Scripts/activate # Windows
+# source .venv/bin/activate # Linux/macOS
+
+pip install -r requirements.txt
+python main.py
+```
+
+Server listens on `http://0.0.0.0:8000`.
+
+Health check: `GET /health`
+Claude smoke test: `GET /health/claude`
+Active sessions: `GET /sessions`
+
+---
+
+## Bot Commands
+
+| Command | Description |
+|---------|-------------|
+| `/new
[msg]` | Create a new Claude Code session in `` |
+| `/new [msg] --timeout N` | Create with custom CC timeout (seconds) |
+| `/new [msg] --idle N` | Create with custom idle timeout (seconds) |
+| `/list` | List your active sessions |
+| `/switch ` | Switch active session to number `` from `/list` |
+| `/close [n]` | Close active session (or session ``) |
+| `/help` | Show command reference |
+
+Any message without a `/` prefix is forwarded directly to the active Claude Code session.
+
+---
+
+## Security
+
+- **Allowlist** (`ALLOWED_OPEN_IDS`): only listed `open_id`s can use the bot. Empty = open to all.
+- **Path sandbox**: all session directories must be under `WORKING_DIR`; path traversal is blocked.
+- **Session ownership**: sessions are tied to the creating user; other users cannot send to them.
+- **Audit log**: all prompts and responses are written to `agent/audit.log`.
diff --git a/ROADMAP.md b/ROADMAP.md
deleted file mode 100644
index 56ef465..0000000
--- a/ROADMAP.md
+++ /dev/null
@@ -1,93 +0,0 @@
-# PhoneWork Roadmap
-
-Issues observed in real usage, grouped by impact. No priority order within each phase.
-
----
-
-## Phase 1 — Core Reliability
-
-These are friction points that directly break or degrade the basic send-message → get-reply loop.
-
-### 1.1 Long output splitting
-**Problem:** Feishu truncates messages at 4000 chars. Long code output is silently cut.
-**Fix:** Automatically split into multiple sequential messages with `[1/3]`, `[2/3]` headers.
-
-### 1.2 Concurrent message handling
-**Problem:** If the user sends two messages quickly, both fire `agent.run()` simultaneously for the same user, causing race conditions in `_active_conv` and interleaved `--resume` calls to the same CC session.
-**Fix:** Per-user async lock (or queue) so messages process one at a time per user.
-
-### 1.3 Session persistence across restarts
-**Problem:** `manager._sessions` is in-memory. A server restart loses all active sessions. Users have to recreate them.
-**Fix:** Persist `{conv_id, cwd, cc_session_id}` to a JSON file on disk; reload on startup.
-
-### 1.4 Mail boy passthrough mode
-**Problem:** The mail boy (GLM) sometimes paraphrases or summarizes instead of relaying verbatim, losing code blocks and exact output.
-**Fix:** Bypass the mail boy entirely for follow-up messages — detect that there's an active session and call `manager.send()` directly without an LLM round-trip.
-
----
-
-## Phase 2 — Better Interaction Model
-
-Reducing the number of messages needed to get things done.
-
-### 2.1 Slash commands
-**Problem:** Users must phrase everything as natural language for the mail boy to interpret.
-**Fix:** Recognize a small set of commands directly in `handler.py` before hitting the LLM:
-- `/new ` — create session
-- `/list` — list sessions
-- `/close` — close active session
-- `/switch ` — switch active session by number
-- `/retry` — resend last message to CC
-
-### 2.2 Multi-session switching
-**Problem:** Only one "active session" per user. To switch projects, the user must remember conv_ids.
-**Fix:** `/list` shows numbered sessions; `/switch 2` activates session #2. The system prompt shows all open sessions, not just the active one.
-
-### 2.3 Feishu message cards
-**Problem:** Plain text is hard to scan — code blocks, file paths, and status info all look the same.
-**Fix:** Use Feishu Interactive Cards (`msg_type: interactive`) to render:
-- Session status as a structured card (project name, cwd, session ID)
-- Action buttons: **Continue**, **Close session**, **Run again**
-
----
-
-## Phase 3 — Operational Quality
-
-Making it reliable enough to leave running 24/7.
-
-### 3.1 Health check improvements
-**Problem:** `/health` only reports session count. No way to know if the Feishu WS connection is alive, or if CC is callable.
-**Fix:** Add to `/health`:
-- WebSocket connection status
-- Last message received timestamp
-- A `claude -p "ping"` smoke test result
-
-### 3.2 Automatic reconnection
-**Problem:** The Feishu WebSocket thread is a daemon — if it dies silently (network blip), no messages are received and there's no recovery.
-**Fix:** Wrap `ws_client.start()` in a retry loop with exponential backoff and log reconnection events.
-
-### 3.3 Per-session timeout configuration
-**Problem:** All sessions share a 30-min idle timeout and 300s CC timeout. Long-running tasks (e.g. running tests) may need more; quick chats need less.
-**Fix:** Allow per-session timeout overrides; expose via `/new --timeout 600`.
-
-### 3.4 Audit log
-**Problem:** No record of what was sent to Claude Code or what it did. Impossible to debug after the fact.
-**Fix:** Append each `(timestamp, conv_id, prompt, response)` to a JSONL file per session under the project directory.
-
----
-
-## Phase 4 — Multi-user & Security
-
-For sharing the bot with teammates.
-
-### 4.1 User allowlist
-**Problem:** Anyone who can message the bot can run arbitrary code via Claude Code.
-**Fix:** `ALLOWED_OPEN_IDS` list in `keyring.yaml`; reject messages from unknown users.
-
-### 4.2 Per-user session isolation
-**Problem:** All users share the same `manager` singleton — user A could theoretically send to user B's session by guessing a conv_id.
-**Fix:** Namespace sessions by user_id; `send_to_conversation` validates that the requesting user owns the session.
-
-### 4.3 Working directory sandboxing
-**Problem:** The safety check in `_resolve_dir` blocks paths outside `WORKING_DIR`, but Claude Code itself runs with `--dangerously-skip-permissions` and can write anywhere.
-**Fix:** Consider running CC in a restricted user account or container; or drop `--dangerously-skip-permissions` and implement a permission-approval flow via Feishu buttons.
diff --git a/agent/audit.py b/agent/audit.py
new file mode 100644
index 0000000..beb9dc8
--- /dev/null
+++ b/agent/audit.py
@@ -0,0 +1,76 @@
+"""Audit logging for Claude Code sessions."""
+
+from __future__ import annotations
+
+import json
+import logging
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Optional
+
+logger = logging.getLogger(__name__)
+
+AUDIT_DIR = Path(__file__).parent.parent / "audit"
+
+
+def _ensure_audit_dir() -> None:
+ AUDIT_DIR.mkdir(parents=True, exist_ok=True)
+
+
+def log_interaction(
+ conv_id: str,
+ prompt: str,
+ response: str,
+ cwd: Optional[str] = None,
+ user_id: Optional[str] = None,
+) -> None:
+ """
+ Log an interaction to a JSONL file per session.
+
+ Args:
+ conv_id: Conversation/session ID
+ prompt: User's message/prompt
+ response: Claude Code's response
+ cwd: Working directory (optional)
+ user_id: User identifier (optional)
+ """
+ try:
+ _ensure_audit_dir()
+ log_file = AUDIT_DIR / f"{conv_id}.jsonl"
+
+ entry = {
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ "conv_id": conv_id,
+ "prompt": prompt,
+ "response": response,
+ }
+ if cwd:
+ entry["cwd"] = cwd
+ if user_id:
+ entry["user_id"] = user_id[-8:] if len(user_id) > 8 else user_id
+
+ with open(log_file, "a", encoding="utf-8") as f:
+ f.write(json.dumps(entry, ensure_ascii=False) + "\n")
+
+ logger.debug("Logged interaction for session %s", conv_id)
+
+ except Exception:
+ logger.exception("Failed to log audit entry for session %s", conv_id)
+
+
+def get_audit_log(conv_id: str, limit: int = 100) -> list[dict]:
+ """Read the audit log for a session."""
+ log_file = AUDIT_DIR / f"{conv_id}.jsonl"
+ if not log_file.exists():
+ return []
+
+ entries = []
+ try:
+ with open(log_file, "r", encoding="utf-8") as f:
+ for line in f:
+ if line.strip():
+ entries.append(json.loads(line))
+ except Exception:
+ logger.exception("Failed to read audit log for session %s", conv_id)
+
+ return entries[-limit:]
diff --git a/agent/manager.py b/agent/manager.py
index 978c431..785ec69 100644
--- a/agent/manager.py
+++ b/agent/manager.py
@@ -3,45 +3,55 @@
from __future__ import annotations
import asyncio
+import json
import logging
import uuid
-from dataclasses import dataclass, field
+from dataclasses import dataclass, field, asdict
+from pathlib import Path
from typing import Dict, List, Optional
from agent.pty_process import run_claude
+from agent.audit import log_interaction
logger = logging.getLogger(__name__)
-IDLE_TIMEOUT = 30 * 60 # 30 minutes in seconds
+DEFAULT_IDLE_TIMEOUT = 30 * 60
+DEFAULT_CC_TIMEOUT = 300.0
+PERSISTENCE_FILE = Path(__file__).parent.parent / "sessions.json"
@dataclass
class Session:
conv_id: str
cwd: str
- # Stable UUID passed to `claude --session-id` so CC owns the history
+ owner_id: str = ""
cc_session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
- last_activity: float = field(default_factory=lambda: asyncio.get_event_loop().time())
- # True after the first message has been sent (so we know to use --resume)
+ last_activity: float = 0.0
started: bool = False
+ idle_timeout: int = DEFAULT_IDLE_TIMEOUT
+ cc_timeout: float = DEFAULT_CC_TIMEOUT
def touch(self) -> None:
self.last_activity = asyncio.get_event_loop().time()
+ def to_dict(self) -> dict:
+ return asdict(self)
+
+ @classmethod
+ def from_dict(cls, data: dict) -> "Session":
+ return cls(**data)
+
class SessionManager:
- """Registry of active Claude Code project sessions."""
+ """Registry of active Claude Code project sessions with persistence and user isolation."""
def __init__(self) -> None:
self._sessions: Dict[str, Session] = {}
self._lock = asyncio.Lock()
self._reaper_task: Optional[asyncio.Task] = None
- # ------------------------------------------------------------------
- # Public API
- # ------------------------------------------------------------------
-
async def start(self) -> None:
+ self._load()
loop = asyncio.get_event_loop()
self._reaper_task = loop.create_task(self._reaper_loop())
@@ -50,62 +60,116 @@ class SessionManager:
self._reaper_task.cancel()
async with self._lock:
self._sessions.clear()
+ if PERSISTENCE_FILE.exists():
+ PERSISTENCE_FILE.unlink()
- async def create(self, conv_id: str, working_dir: str) -> Session:
- """Register a new session for the given working directory."""
+ async def create(
+ self,
+ conv_id: str,
+ working_dir: str,
+ owner_id: str = "",
+ idle_timeout: int = DEFAULT_IDLE_TIMEOUT,
+ cc_timeout: float = DEFAULT_CC_TIMEOUT,
+ ) -> Session:
async with self._lock:
- session = Session(conv_id=conv_id, cwd=working_dir)
+ session = Session(
+ conv_id=conv_id,
+ cwd=working_dir,
+ owner_id=owner_id,
+ idle_timeout=idle_timeout,
+ cc_timeout=cc_timeout,
+ )
self._sessions[conv_id] = session
+ self._save()
logger.info(
- "Created session %s (cc_session_id=%s) in %s",
- conv_id, session.cc_session_id, working_dir,
+ "Created session %s (owner=...%s) in %s (idle=%ds, cc=%.0fs)",
+ conv_id, owner_id[-8:] if owner_id else "-", working_dir, idle_timeout, cc_timeout,
)
return session
- async def send(self, conv_id: str, message: str) -> str:
- """
- Run claude -p with the message in the session's directory.
-
- - First message: uses --session-id to establish the CC session.
- - Subsequent messages: uses --resume so CC has full history.
- """
+ async def send(self, conv_id: str, message: str, user_id: Optional[str] = None) -> str:
async with self._lock:
session = self._sessions.get(conv_id)
if session is None:
raise KeyError(f"No session for conv_id={conv_id!r}")
+ if session.owner_id and user_id and session.owner_id != user_id:
+ raise PermissionError(f"Session {conv_id} belongs to another user")
session.touch()
cwd = session.cwd
cc_session_id = session.cc_session_id
+ cc_timeout = session.cc_timeout
first_message = not session.started
if first_message:
session.started = True
+ self._save()
output = await run_claude(
message,
cwd=cwd,
cc_session_id=cc_session_id,
resume=not first_message,
+ timeout=cc_timeout,
)
+
+ log_interaction(
+ conv_id=conv_id,
+ prompt=message,
+ response=output,
+ cwd=cwd,
+ user_id=user_id,
+ )
+
return output
- async def close(self, conv_id: str) -> bool:
- """Remove a session. Returns True if it existed."""
+ async def close(self, conv_id: str, user_id: Optional[str] = None) -> bool:
async with self._lock:
- if conv_id not in self._sessions:
+ session = self._sessions.get(conv_id)
+ if session is None:
return False
+ if session.owner_id and user_id and session.owner_id != user_id:
+ raise PermissionError(f"Session {conv_id} belongs to another user")
del self._sessions[conv_id]
+ self._save()
logger.info("Closed session %s", conv_id)
return True
- def list_sessions(self) -> list[dict]:
+ def list_sessions(self, user_id: Optional[str] = None) -> list[dict]:
+ sessions = self._sessions.values()
+ if user_id:
+ sessions = [s for s in sessions if not s.owner_id or s.owner_id == user_id]
return [
- {"conv_id": s.conv_id, "cwd": s.cwd, "cc_session_id": s.cc_session_id}
- for s in self._sessions.values()
+ {
+ "conv_id": s.conv_id,
+ "cwd": s.cwd,
+ "owner_id": s.owner_id[-8:] if s.owner_id else None,
+ "cc_session_id": s.cc_session_id,
+ "started": s.started,
+ "idle_timeout": s.idle_timeout,
+ "cc_timeout": s.cc_timeout,
+ }
+ for s in sessions
]
- # ------------------------------------------------------------------
- # Internal helpers
- # ------------------------------------------------------------------
+ def _save(self) -> None:
+ try:
+ data = {cid: s.to_dict() for cid, s in self._sessions.items()}
+ with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
+ json.dump(data, f, indent=2)
+ logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE)
+ except Exception:
+ logger.exception("Failed to save sessions")
+
+ def _load(self) -> None:
+ if not PERSISTENCE_FILE.exists():
+ return
+ try:
+ with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ for cid, sdata in data.items():
+ self._sessions[cid] = Session.from_dict(sdata)
+ logger.info("Loaded %d sessions from %s", len(self._sessions), PERSISTENCE_FILE)
+ except Exception:
+ logger.exception("Failed to load sessions")
async def _reaper_loop(self) -> None:
while True:
@@ -115,14 +179,15 @@ class SessionManager:
async def _reap_idle(self) -> None:
now = asyncio.get_event_loop().time()
async with self._lock:
- to_close = [
- cid for cid, s in self._sessions.items()
- if (now - s.last_activity) > IDLE_TIMEOUT
- ]
+ to_close = []
+ for cid, s in self._sessions.items():
+ if s.last_activity > 0 and (now - s.last_activity) > s.idle_timeout:
+ to_close.append(cid)
for cid in to_close:
del self._sessions[cid]
logger.info("Reaped idle session %s", cid)
+ if to_close:
+ self._save()
-# Module-level singleton
manager = SessionManager()
diff --git a/bot/commands.py b/bot/commands.py
new file mode 100644
index 0000000..bd7552d
--- /dev/null
+++ b/bot/commands.py
@@ -0,0 +1,191 @@
+"""Slash command handler for direct bot control."""
+
+from __future__ import annotations
+
+import argparse
+import json
+import logging
+import re
+import uuid
+from typing import Optional, Tuple
+
+from agent.manager import manager
+from orchestrator.agent import agent
+from orchestrator.tools import set_current_user, get_current_user
+
+logger = logging.getLogger(__name__)
+
+
+def parse_command(text: str) -> Optional[Tuple[str, str]]:
+ """
+ Parse a slash command from text.
+ Returns (command, args) or None if not a command.
+ """
+ text = text.strip()
+ if not text.startswith("/"):
+ return None
+ parts = text.split(None, 1)
+ cmd = parts[0].lower()
+ args = parts[1] if len(parts) > 1 else ""
+ return (cmd, args)
+
+
+async def handle_command(user_id: str, text: str) -> Optional[str]:
+ """
+ Handle a slash command. Returns the reply or None if not a command.
+ """
+ parsed = parse_command(text)
+ if not parsed:
+ return None
+
+ cmd, args = parsed
+ logger.info("Command: %s args=%r user=...%s", cmd, args[:50], user_id[-8:])
+
+ set_current_user(user_id)
+
+ if cmd in ("/new", "/n"):
+ return await _cmd_new(user_id, args)
+ elif cmd in ("/list", "/ls", "/l"):
+ return await _cmd_list(user_id)
+ elif cmd in ("/close", "/c"):
+ return await _cmd_close(user_id, args)
+ elif cmd in ("/switch", "/s"):
+ return await _cmd_switch(user_id, args)
+ elif cmd == "/retry":
+ return await _cmd_retry(user_id)
+ elif cmd in ("/help", "/h", "/?"):
+ return _cmd_help()
+ else:
+ return f"Unknown command: {cmd}\n\n{_cmd_help()}"
+
+
+async def _cmd_new(user_id: str, args: str) -> str:
+ """Create a new session."""
+ if not args:
+ return "Usage: /new [initial_message] [--timeout N]\nExample: /new todo_app fix the bug --timeout 600"
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("working_dir", nargs="?", help="Project directory")
+ parser.add_argument("rest", nargs="*", help="Initial message")
+ parser.add_argument("--timeout", type=int, default=None, help="CC timeout in seconds")
+ parser.add_argument("--idle", type=int, default=None, help="Idle timeout in seconds")
+
+ try:
+ parsed = parser.parse_args(args.split())
+ except SystemExit:
+ return "Usage: /new [initial_message] [--timeout N] [--idle N]"
+
+ if not parsed.working_dir:
+ return "Error: project_dir is required"
+
+ working_dir = parsed.working_dir
+ initial_msg = " ".join(parsed.rest) if parsed.rest else None
+
+ from orchestrator.tools import CreateConversationTool
+
+ tool = CreateConversationTool()
+ result = await tool._arun(
+ working_dir=working_dir,
+ initial_message=initial_msg,
+ cc_timeout=parsed.timeout,
+ idle_timeout=parsed.idle,
+ )
+ try:
+ data = json.loads(result)
+ if "error" in data:
+ return f"Error: {data['error']}"
+ conv_id = data.get("conv_id", "")
+ agent._active_conv[user_id] = conv_id
+ cwd = data.get("working_dir", working_dir)
+ reply = f"✓ Created session `{conv_id}` in `{cwd}`"
+ if parsed.timeout:
+ reply += f" (timeout: {parsed.timeout}s)"
+ if initial_msg:
+ reply += f"\n\nSent: {initial_msg[:100]}..."
+ return reply
+ except Exception:
+ return result
+
+
+async def _cmd_list(user_id: str) -> str:
+ """List all sessions for this user."""
+ sessions = manager.list_sessions(user_id=user_id)
+ if not sessions:
+ return "No active sessions."
+
+ active = agent.get_active_conv(user_id)
+ lines = ["**Your Sessions:**\n"]
+ for i, s in enumerate(sessions, 1):
+ marker = "→ " if s["conv_id"] == active else " "
+ lines.append(f"{marker}{i}. `{s['conv_id']}` - `{s['cwd']}`")
+ lines.append("\nUse `/switch ` to activate a session.")
+ return "\n".join(lines)
+
+
+async def _cmd_close(user_id: str, args: str) -> str:
+ """Close a session."""
+ sessions = manager.list_sessions(user_id=user_id)
+ if not sessions:
+ return "No sessions to close."
+
+ if args:
+ try:
+ idx = int(args) - 1
+ if 0 <= idx < len(sessions):
+ conv_id = sessions[idx]["conv_id"]
+ else:
+ return f"Invalid session number. Use 1-{len(sessions)}."
+ except ValueError:
+ conv_id = args.strip()
+ else:
+ conv_id = agent.get_active_conv(user_id)
+ if not conv_id:
+ return "No active session. Use `/close ` or `/close `."
+
+ try:
+ success = await manager.close(conv_id, user_id=user_id)
+ if success:
+ if agent.get_active_conv(user_id) == conv_id:
+ agent._active_conv[user_id] = None
+ return f"✓ Closed session `{conv_id}`"
+ else:
+ return f"Session `{conv_id}` not found."
+ except PermissionError as e:
+ return str(e)
+
+
+async def _cmd_switch(user_id: str, args: str) -> str:
+ """Switch to a different session."""
+ sessions = manager.list_sessions(user_id=user_id)
+ if not sessions:
+ return "No sessions available."
+
+ if not args:
+ return "Usage: /switch \n" + await _cmd_list(user_id)
+
+ try:
+ idx = int(args) - 1
+ if 0 <= idx < len(sessions):
+ conv_id = sessions[idx]["conv_id"]
+ agent._active_conv[user_id] = conv_id
+ return f"✓ Switched to session `{conv_id}` ({sessions[idx]['cwd']})"
+ else:
+ return f"Invalid session number. Use 1-{len(sessions)}."
+ except ValueError:
+ return f"Invalid number: {args}"
+
+
+async def _cmd_retry(user_id: str) -> str:
+ """Retry the last message (placeholder - needs history tracking)."""
+ return "Retry not yet implemented. Just send your message again."
+
+
+def _cmd_help() -> str:
+ """Show help."""
+ return """**Commands:**
+/new [msg] [--timeout N] [--idle N] - Create session
+/list - List your sessions
+/close [n] - Close session (active or by number)
+/switch - Switch to session by number
+/retry - Retry last message
+/help - Show this help"""
diff --git a/bot/feishu.py b/bot/feishu.py
index c15d5a7..21586c3 100644
--- a/bot/feishu.py
+++ b/bot/feishu.py
@@ -2,10 +2,14 @@
from __future__ import annotations
+import asyncio
+import json
import logging
import lark_oapi as lark
from lark_oapi.api.im.v1 import (
+ CreateFileRequest,
+ CreateFileRequestBody,
CreateMessageRequest,
CreateMessageRequestBody,
)
@@ -14,8 +18,7 @@ from config import FEISHU_APP_ID, FEISHU_APP_SECRET
logger = logging.getLogger(__name__)
-# Max Feishu text message length
-MAX_TEXT_LEN = 4000
+MAX_TEXT_LEN = 3900
def _make_client() -> lark.Client:
@@ -31,54 +34,252 @@ def _make_client() -> lark.Client:
_client = _make_client()
-def _truncate(text: str) -> str:
+def _split_message(text: str) -> list[str]:
if len(text) <= MAX_TEXT_LEN:
- return text
- return text[: MAX_TEXT_LEN - 20] + "\n...[truncated]"
+ return [text]
+ parts: list[str] = []
+ remaining = text
+ while remaining:
+ if len(remaining) <= MAX_TEXT_LEN:
+ parts.append(remaining)
+ break
+ chunk = remaining[:MAX_TEXT_LEN]
+ last_newline = chunk.rfind("\n")
+ if last_newline > MAX_TEXT_LEN // 2:
+ chunk = remaining[:last_newline + 1]
+ parts.append(chunk)
+ remaining = remaining[len(chunk):]
+ total = len(parts)
+ headered_parts = []
+ for i, part in enumerate(parts, 1):
+ headered_parts.append(f"[{i}/{total}]\n{part}")
+ return headered_parts
async def send_text(receive_id: str, receive_id_type: str, text: str) -> None:
"""
Send a plain-text message to a Feishu chat or user.
+ Automatically splits long messages into multiple parts with [1/N] headers.
Args:
receive_id: chat_id or open_id depending on receive_id_type.
receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id".
text: message content.
"""
- import json as _json
- truncated = _truncate(text)
- logger.debug(
- "[feishu] send_text to=%s type=%s len=%d/%d text=%r",
- receive_id, receive_id_type, len(truncated), len(text), truncated[:120],
- )
- content = _json.dumps({"text": truncated}, ensure_ascii=False)
+ parts = _split_message(text)
+ loop = asyncio.get_event_loop()
+ for i, part in enumerate(parts):
+ logger.debug(
+ "[feishu] send_text to=%s type=%s part=%d/%d len=%d",
+ receive_id, receive_id_type, i + 1, len(parts), len(part),
+ )
+ content = json.dumps({"text": part}, ensure_ascii=False)
+
+ request = (
+ CreateMessageRequest.builder()
+ .receive_id_type(receive_id_type)
+ .request_body(
+ CreateMessageRequestBody.builder()
+ .receive_id(receive_id)
+ .msg_type("text")
+ .content(content)
+ .build()
+ )
+ .build()
+ )
+
+ response = await loop.run_in_executor(
+ None,
+ lambda: _client.im.v1.message.create(request),
+ )
+
+ if not response.success():
+ logger.error(
+ "Feishu send_text failed: code=%s msg=%s",
+ response.code,
+ response.msg,
+ )
+ return
+ else:
+ logger.debug("Sent message part %d/%d to %s (%s)", i + 1, len(parts), receive_id, receive_id_type)
+
+ if len(parts) > 1 and i < len(parts) - 1:
+ await asyncio.sleep(0.3)
+
+
+async def send_card(receive_id: str, receive_id_type: str, title: str, content: str, buttons: list[dict] | None = None) -> None:
+ """
+ Send an interactive card message.
+
+ Args:
+ receive_id: chat_id or open_id
+ receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id"
+ title: Card title
+ content: Card content (markdown supported)
+ buttons: List of button dicts with "text" and "value" keys
+ """
+ elements = [
+ {
+ "tag": "div",
+ "text": {
+ "tag": "lark_md",
+ "content": content,
+ },
+ },
+ ]
+
+ if buttons:
+ actions = []
+ for btn in buttons:
+ actions.append({
+ "tag": "button",
+ "text": {"tag": "plain_text", "content": btn.get("text", "Button")},
+ "type": "primary",
+ "value": btn.get("value", {}),
+ })
+ elements.append({"tag": "action", "actions": actions})
+
+ card = {
+ "type": "template",
+ "data": {
+ "template_id": "AAqkz9****",
+ "template_variable": {
+ "title": title,
+ "elements": elements,
+ },
+ },
+ }
+
+ card_content = {
+ "config": {"wide_screen_mode": True},
+ "header": {
+ "title": {"tag": "plain_text", "content": title},
+ "template": "blue",
+ },
+ "elements": elements,
+ }
+
+ loop = asyncio.get_event_loop()
request = (
CreateMessageRequest.builder()
.receive_id_type(receive_id_type)
.request_body(
CreateMessageRequestBody.builder()
.receive_id(receive_id)
- .msg_type("text")
- .content(content)
+ .msg_type("interactive")
+ .content(json.dumps(card_content, ensure_ascii=False))
.build()
)
.build()
)
- import asyncio
- loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: _client.im.v1.message.create(request),
)
if not response.success():
+ logger.error("Feishu send_card failed: code=%s msg=%s", response.code, response.msg)
+ else:
+ logger.debug("Sent card to %s (%s)", receive_id, receive_id_type)
+
+
+async def send_file(receive_id: str, receive_id_type: str, file_path: str, file_type: str = "stream") -> None:
+ """
+ Upload a local file to Feishu and send it as a file message.
+
+ Args:
+ receive_id: chat_id or open_id depending on receive_id_type.
+ receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id".
+ file_path: Absolute path to the local file to send.
+ file_type: Feishu file type — "stream" (generic), "opus", "mp4", "pdf", "doc", "xls", "ppt".
+ """
+ import os
+ path = os.path.abspath(file_path)
+ file_name = os.path.basename(path)
+ loop = asyncio.get_event_loop()
+
+ # Step 1: Upload file → get file_key
+ with open(path, "rb") as f:
+ file_data = f.read()
+
+ def _upload():
+ req = (
+ CreateFileRequest.builder()
+ .request_body(
+ CreateFileRequestBody.builder()
+ .file_type(file_type)
+ .file_name(file_name)
+ .file(file_data)
+ .build()
+ )
+ .build()
+ )
+ return _client.im.v1.file.create(req)
+
+ upload_resp = await loop.run_in_executor(None, _upload)
+
+ if not upload_resp.success():
logger.error(
- "Feishu send_text failed: code=%s msg=%s",
- response.code,
- response.msg,
+ "Feishu file upload failed: code=%s msg=%s",
+ upload_resp.code,
+ upload_resp.msg,
+ )
+ return
+
+ file_key = upload_resp.data.file_key
+ logger.debug("Uploaded file %r → file_key=%r", file_name, file_key)
+
+ # Step 2: Send file message using the file_key
+ content = json.dumps({"file_key": file_key}, ensure_ascii=False)
+ request = (
+ CreateMessageRequest.builder()
+ .receive_id_type(receive_id_type)
+ .request_body(
+ CreateMessageRequestBody.builder()
+ .receive_id(receive_id)
+ .msg_type("file")
+ .content(content)
+ .build()
+ )
+ .build()
+ )
+
+ send_resp = await loop.run_in_executor(
+ None,
+ lambda: _client.im.v1.message.create(request),
+ )
+
+ if not send_resp.success():
+ logger.error(
+ "Feishu send_file failed: code=%s msg=%s",
+ send_resp.code,
+ send_resp.msg,
)
else:
- logger.debug("Sent message to %s (%s)", receive_id, receive_id_type)
+ logger.debug("Sent file %r to %s (%s)", file_name, receive_id, receive_id_type)
+
+
+def build_session_card(conv_id: str, cwd: str, started: bool) -> dict:
+ """Build a session status card."""
+ status = "🟢 Active" if started else "🟡 Ready"
+ content = f"**Session ID:** `{conv_id}`\n**Directory:** `{cwd}`\n**Status:** {status}"
+ return {
+ "config": {"wide_screen_mode": True},
+ "header": {
+ "title": {"tag": "plain_text", "content": "Claude Code Session"},
+ "template": "turquoise",
+ },
+ "elements": [
+ {"tag": "div", "text": {"tag": "lark_md", "content": content}},
+ {"tag": "hr"},
+ {
+ "tag": "action",
+ "actions": [
+ {"tag": "button", "text": {"tag": "plain_text", "content": "Continue"}, "type": "primary", "value": {"action": "continue", "conv_id": conv_id}},
+ {"tag": "button", "text": {"tag": "plain_text", "content": "Close"}, "type": "default", "value": {"action": "close", "conv_id": conv_id}},
+ ],
+ },
+ ],
+ }
diff --git a/bot/handler.py b/bot/handler.py
index 071c8b9..6699242 100644
--- a/bot/handler.py
+++ b/bot/handler.py
@@ -6,30 +6,41 @@ import asyncio
import json
import logging
import threading
+import time
import lark_oapi as lark
from lark_oapi.api.im.v1 import P2ImMessageReceiveV1
+from bot.commands import handle_command
from bot.feishu import send_text
-from config import FEISHU_APP_ID, FEISHU_APP_SECRET
+from config import FEISHU_APP_ID, FEISHU_APP_SECRET, is_user_allowed
from orchestrator.agent import agent
logger = logging.getLogger(__name__)
-# Keep a reference to the running event loop so sync callbacks can schedule coroutines
_main_loop: asyncio.AbstractEventLoop | None = None
+_ws_connected: bool = False
+_last_message_time: float = 0.0
+_reconnect_count: int = 0
+
+
+def get_ws_status() -> dict:
+ """Return WebSocket connection status."""
+ return {
+ "connected": _ws_connected,
+ "last_message_time": _last_message_time,
+ "reconnect_count": _reconnect_count,
+ }
def _handle_message(data: P2ImMessageReceiveV1) -> None:
- """
- Synchronous callback invoked by the lark-oapi SDK on every incoming message.
- We schedule async work onto the main event loop.
- """
+ global _last_message_time
+ _last_message_time = time.time()
+
try:
message = data.event.message
sender = data.event.sender
- # Log raw event for debugging
logger.debug(
"event type=%r chat_type=%r content=%r",
getattr(message, "message_type", None),
@@ -37,18 +48,15 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None:
(getattr(message, "content", None) or "")[:100],
)
- # Only handle text messages
if message.message_type != "text":
logger.info("Skipping non-text message_type=%r", message.message_type)
return
- # Extract fields
chat_id: str = message.chat_id
raw_content: str = message.content or "{}"
content_obj = json.loads(raw_content)
text: str = content_obj.get("text", "").strip()
- # Strip @mentions (Feishu injects "@bot_name " at start of group messages)
import re
text = re.sub(r"@\S+\s*", "", text).strip()
@@ -62,14 +70,12 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None:
logger.info("✉ ...%s → %r", open_id[-8:], text[:80])
- # Use open_id as user identifier for per-user history in the orchestrator
user_id = open_id or chat_id
if _main_loop is None:
logger.error("Main event loop not set; cannot process message")
return
- # Schedule async processing; fire-and-forget
asyncio.run_coroutine_threadsafe(
_process_message(user_id, chat_id, text),
_main_loop,
@@ -79,9 +85,16 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None:
async def _process_message(user_id: str, chat_id: str, text: str) -> None:
- """Run the orchestration agent and send the reply back to Feishu."""
+ """Process message: check allowlist, then commands, then agent."""
try:
- reply = await agent.run(user_id, text)
+ if not is_user_allowed(user_id):
+ logger.warning("Rejected message from unauthorized user: ...%s", user_id[-8:])
+ await send_text(chat_id, "chat_id", "Sorry, you are not authorized to use this bot.")
+ return
+
+ reply = await handle_command(user_id, text)
+ if reply is None:
+ reply = await agent.run(user_id, text)
if reply:
await send_text(chat_id, "chat_id", reply)
except Exception:
@@ -112,19 +125,38 @@ def start_websocket_client(loop: asyncio.AbstractEventLoop) -> None:
global _main_loop
_main_loop = loop
- event_handler = build_event_handler()
+ def _run_with_reconnect() -> None:
+ global _ws_connected, _reconnect_count
+ backoff = 1.0
+ max_backoff = 60.0
- ws_client = lark.ws.Client(
- FEISHU_APP_ID,
- FEISHU_APP_SECRET,
- event_handler=event_handler,
- log_level=lark.LogLevel.INFO,
- )
+ while True:
+ try:
+ _ws_connected = False
+ event_handler = build_event_handler()
+ ws_client = lark.ws.Client(
+ FEISHU_APP_ID,
+ FEISHU_APP_SECRET,
+ event_handler=event_handler,
+ log_level=lark.LogLevel.INFO,
+ )
- def _run() -> None:
- logger.info("Starting Feishu long-connection client...")
- ws_client.start() # blocks until disconnected
+ logger.info("Starting Feishu long-connection client...")
+ _ws_connected = True
+ _reconnect_count += 1
+ ws_client.start()
+ logger.warning("WebSocket disconnected, will reconnect...")
- thread = threading.Thread(target=_run, daemon=True, name="feishu-ws")
+ except Exception as e:
+ logger.error("WebSocket error: %s", e)
+
+ finally:
+ _ws_connected = False
+
+ logger.info("Reconnecting in %.1fs (attempt %d)...", backoff, _reconnect_count)
+ time.sleep(backoff)
+ backoff = min(backoff * 2, max_backoff)
+
+ thread = threading.Thread(target=_run_with_reconnect, daemon=True, name="feishu-ws")
thread.start()
logger.info("Feishu WebSocket thread started")
diff --git a/config.py b/config.py
index 1f99f57..3071060 100644
--- a/config.py
+++ b/config.py
@@ -1,5 +1,6 @@
import yaml
from pathlib import Path
+from typing import List
_CONFIG_PATH = Path(__file__).parent / "keyring.yaml"
@@ -17,3 +18,14 @@ OPENAI_BASE_URL: str = _cfg["OPENAI_BASE_URL"]
OPENAI_API_KEY: str = _cfg["OPENAI_API_KEY"]
OPENAI_MODEL: str = _cfg.get("OPENAI_MODEL", "glm-4.7")
WORKING_DIR: Path = Path(_cfg.get("WORKING_DIR", Path.home())).expanduser().resolve()
+
+ALLOWED_OPEN_IDS: List[str] = _cfg.get("ALLOWED_OPEN_IDS", [])
+if ALLOWED_OPEN_IDS and not isinstance(ALLOWED_OPEN_IDS, list):
+ ALLOWED_OPEN_IDS = [str(ALLOWED_OPEN_IDS)]
+
+
+def is_user_allowed(open_id: str) -> bool:
+ """Check if a user is allowed to use the bot."""
+ if not ALLOWED_OPEN_IDS:
+ return True
+ return open_id in ALLOWED_OPEN_IDS
diff --git a/main.py b/main.py
index b00dae6..7141d16 100644
--- a/main.py
+++ b/main.py
@@ -4,13 +4,14 @@ from __future__ import annotations
import asyncio
import logging
+import time
import uvicorn
from fastapi import FastAPI
from rich.logging import RichHandler
from agent.manager import manager
-from bot.handler import start_websocket_client
+from bot.handler import start_websocket_client, get_ws_status
logging.basicConfig(
level=logging.DEBUG,
@@ -23,18 +24,61 @@ logging.basicConfig(
omit_repeated_times=False,
)],
)
-# Suppress noisy third-party debug output
for _noisy in ("httpcore", "httpx", "openai._base_client", "urllib3", "lark_oapi", "websockets"):
logging.getLogger(_noisy).setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
app = FastAPI(title="PhoneWork", version="0.1.0")
+START_TIME = time.time()
+
@app.get("/health")
async def health() -> dict:
sessions = manager.list_sessions()
- return {"status": "ok", "active_sessions": len(sessions)}
+ ws_status = get_ws_status()
+ uptime = time.time() - START_TIME
+
+ result = {
+ "status": "ok",
+ "uptime_seconds": round(uptime, 1),
+ "active_sessions": len(sessions),
+ "websocket": ws_status,
+ }
+
+ if ws_status.get("connected"):
+ result["status"] = "ok"
+ else:
+ result["status"] = "degraded"
+
+ return result
+
+
+@app.get("/health/claude")
+async def health_claude() -> dict:
+ """Smoke test: run a simple claude -p command."""
+ from agent.pty_process import run_claude
+ import tempfile
+ import os
+
+ start = time.time()
+ try:
+ with tempfile.TemporaryDirectory() as tmpdir:
+ output = await run_claude(
+ "Say 'pong' and nothing else",
+ cwd=tmpdir,
+ timeout=30.0,
+ )
+ elapsed = time.time() - start
+ return {
+ "status": "ok",
+ "elapsed_seconds": round(elapsed, 2),
+ "output_preview": output[:100] if output else None,
+ }
+ except asyncio.TimeoutError:
+ return {"status": "timeout", "elapsed_seconds": 30.0}
+ except Exception as e:
+ return {"status": "error", "error": str(e)}
@app.get("/sessions")
@@ -44,11 +88,7 @@ async def list_sessions() -> list:
@app.on_event("startup")
async def startup_event() -> None:
- # Start the session manager's idle-reaper
await manager.start()
-
- # Start the Feishu WebSocket client in a background thread,
- # passing the running event loop so async work can be scheduled
loop = asyncio.get_event_loop()
start_websocket_client(loop)
logger.info("PhoneWork started")
diff --git a/orchestrator/agent.py b/orchestrator/agent.py
index c638cc1..2b8d62f 100644
--- a/orchestrator/agent.py
+++ b/orchestrator/agent.py
@@ -5,8 +5,10 @@ Uses LangChain 1.x tool-calling pattern: bind_tools + manual agentic loop.
from __future__ import annotations
+import asyncio
import json
import logging
+import re
from collections import defaultdict
from typing import Dict, List, Optional
@@ -19,8 +21,9 @@ from langchain_core.messages import (
)
from langchain_openai import ChatOpenAI
+from agent.manager import manager
from config import OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL, WORKING_DIR
-from orchestrator.tools import TOOLS
+from orchestrator.tools import TOOLS, set_current_user
logger = logging.getLogger(__name__)
@@ -52,6 +55,12 @@ Guidelines:
MAX_ITERATIONS = 10
_TOOL_MAP = {t.name: t for t in TOOLS}
+COMMAND_PATTERN = re.compile(r"^/(new|list|close|switch|retry|help)", re.IGNORECASE)
+
+
+def _looks_like_command(text: str) -> bool:
+ return bool(COMMAND_PATTERN.match(text.strip()))
+
class OrchestrationAgent:
"""Per-user agent with conversation history and active session tracking."""
@@ -69,6 +78,8 @@ class OrchestrationAgent:
self._history: Dict[str, List[BaseMessage]] = defaultdict(list)
# user_id -> most recently active conv_id
self._active_conv: Dict[str, Optional[str]] = defaultdict(lambda: None)
+ # user_id -> asyncio.Lock (prevents concurrent processing per user)
+ self._user_locks: Dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
def _build_system_prompt(self, user_id: str) -> str:
conv_id = self._active_conv[user_id]
@@ -81,13 +92,35 @@ class OrchestrationAgent:
active_session_line=active_line,
)
+ def get_active_conv(self, user_id: str) -> Optional[str]:
+ return self._active_conv.get(user_id)
+
async def run(self, user_id: str, text: str) -> str:
"""Process a user message and return the agent's reply."""
+ async with self._user_locks[user_id]:
+ return await self._run_locked(user_id, text)
+
+ async def _run_locked(self, user_id: str, text: str) -> str:
+ """Internal implementation, must be called with user lock held."""
+ set_current_user(user_id)
active_conv = self._active_conv[user_id]
short_uid = user_id[-8:]
logger.info(">>> user=...%s conv=%s msg=%r", short_uid, active_conv, text[:80])
logger.debug(" history_len=%d", len(self._history[user_id]))
+ # Passthrough mode: if active session and not a command, bypass LLM
+ if active_conv and not _looks_like_command(text):
+ try:
+ reply = await manager.send(active_conv, text, user_id=user_id)
+ logger.info("<<< [passthrough] reply: %r", reply[:120])
+ return reply
+ except KeyError:
+ logger.warning("Session %s no longer exists, clearing active_conv", active_conv)
+ self._active_conv[user_id] = None
+ except Exception as exc:
+ logger.exception("Passthrough error for user=%s", user_id)
+ return f"[Error] {exc}"
+
messages: List[BaseMessage] = (
[SystemMessage(content=self._build_system_prompt(user_id))]
+ self._history[user_id]
@@ -161,5 +194,4 @@ class OrchestrationAgent:
return reply
-# Module-level singleton
agent = OrchestrationAgent()
diff --git a/orchestrator/tools.py b/orchestrator/tools.py
index 349a3df..82f195a 100644
--- a/orchestrator/tools.py
+++ b/orchestrator/tools.py
@@ -4,6 +4,7 @@ from __future__ import annotations
import json
import uuid
+from contextvars import ContextVar
from pathlib import Path
from typing import Optional, Type
@@ -13,6 +14,16 @@ from pydantic import BaseModel, Field
from agent.manager import manager
from config import WORKING_DIR
+_current_user_id: ContextVar[Optional[str]] = ContextVar("current_user_id", default=None)
+
+
+def set_current_user(user_id: Optional[str]) -> None:
+ _current_user_id.set(user_id)
+
+
+def get_current_user() -> Optional[str]:
+ return _current_user_id.get()
+
def _resolve_dir(working_dir: str) -> Path:
"""
@@ -21,14 +32,21 @@ def _resolve_dir(working_dir: str) -> Path:
Rules:
- Absolute paths are used as-is (but must stay within WORKING_DIR for safety).
- Relative paths / bare names are joined onto WORKING_DIR.
+ - Path traversal attempts (..) are blocked.
- The resolved directory is created if it doesn't exist.
"""
+ working_dir = working_dir.strip()
+
+ if ".." in working_dir.split("/") or ".." in working_dir.split("\\"):
+ raise ValueError(
+ "Path traversal not allowed. Use a subfolder name or path inside the working directory."
+ )
+
p = Path(working_dir)
if not p.is_absolute():
p = WORKING_DIR / p
p = p.resolve()
- # Safety: must be inside WORKING_DIR
try:
p.relative_to(WORKING_DIR)
except ValueError:
@@ -55,6 +73,8 @@ class CreateConversationInput(BaseModel):
),
)
initial_message: Optional[str] = Field(None, description="Optional first message to send after spawning")
+ idle_timeout: Optional[int] = Field(None, description="Idle timeout in seconds (default 1800)")
+ cc_timeout: Optional[float] = Field(None, description="Claude Code execution timeout in seconds (default 300)")
class SendToConversationInput(BaseModel):
@@ -79,17 +99,24 @@ class CreateConversationTool(BaseTool):
)
args_schema: Type[BaseModel] = CreateConversationInput
- def _run(self, working_dir: str, initial_message: Optional[str] = None) -> str:
+ def _run(self, working_dir: str, initial_message: Optional[str] = None, idle_timeout: Optional[int] = None, cc_timeout: Optional[float] = None) -> str:
raise NotImplementedError("Use async version")
- async def _arun(self, working_dir: str, initial_message: Optional[str] = None) -> str:
+ async def _arun(self, working_dir: str, initial_message: Optional[str] = None, idle_timeout: Optional[int] = None, cc_timeout: Optional[float] = None) -> str:
try:
resolved = _resolve_dir(working_dir)
except ValueError as exc:
return json.dumps({"error": str(exc)})
+ user_id = get_current_user()
conv_id = str(uuid.uuid4())[:8]
- await manager.create(conv_id, str(resolved))
+ await manager.create(
+ conv_id,
+ str(resolved),
+ owner_id=user_id or "",
+ idle_timeout=idle_timeout or 1800,
+ cc_timeout=cc_timeout or 300.0,
+ )
result: dict = {
"conv_id": conv_id,
@@ -97,7 +124,7 @@ class CreateConversationTool(BaseTool):
}
if initial_message:
- output = await manager.send(conv_id, initial_message)
+ output = await manager.send(conv_id, initial_message, user_id=user_id)
result["response"] = output
else:
result["status"] = "Session created. Send a message to start working."
@@ -117,11 +144,14 @@ class SendToConversationTool(BaseTool):
raise NotImplementedError("Use async version")
async def _arun(self, conv_id: str, message: str) -> str:
+ user_id = get_current_user()
try:
- output = await manager.send(conv_id, message)
+ output = await manager.send(conv_id, message, user_id=user_id)
return json.dumps({"conv_id": conv_id, "response": output}, ensure_ascii=False)
except KeyError:
return json.dumps({"error": f"No active session for conv_id={conv_id!r}"})
+ except PermissionError as e:
+ return json.dumps({"error": str(e)})
class ListConversationsTool(BaseTool):
@@ -132,7 +162,8 @@ class ListConversationsTool(BaseTool):
raise NotImplementedError("Use async version")
async def _arun(self) -> str:
- sessions = manager.list_sessions()
+ user_id = get_current_user()
+ sessions = manager.list_sessions(user_id=user_id)
if not sessions:
return "No active sessions."
return json.dumps(sessions, ensure_ascii=False, indent=2)
@@ -147,10 +178,14 @@ class CloseConversationTool(BaseTool):
raise NotImplementedError("Use async version")
async def _arun(self, conv_id: str) -> str:
- closed = await manager.close(conv_id)
- if closed:
- return f"Session {conv_id!r} closed."
- return f"Session {conv_id!r} not found."
+ user_id = get_current_user()
+ try:
+ closed = await manager.close(conv_id, user_id=user_id)
+ if closed:
+ return f"Session {conv_id!r} closed."
+ return f"Session {conv_id!r} not found."
+ except PermissionError as e:
+ return str(e)
# Module-level tool list for easy import