feat: 实现用户权限控制、会话管理和审计日志功能

- 添加用户权限检查功能,支持配置允许使用的用户列表
- 实现会话管理功能,包括会话创建、关闭、列表和切换
- 新增审计日志模块,记录所有交互信息
- 改进WebSocket连接,增加自动重连机制
- 添加健康检查端点,包含Claude服务可用性测试
- 实现会话持久化功能,重启后恢复会话状态
- 增加命令行功能支持,包括/new、/list、/close等命令
- 优化消息处理流程,支持直接传递模式
This commit is contained in:
Yuyao Huang (Sam) 2026-03-28 08:39:32 +08:00
parent 29c0f2e403
commit 6307deb701
11 changed files with 921 additions and 222 deletions

164
README.md
View File

@ -1,19 +1,19 @@
# PhoneWork
Feishu bot integration with Claude Code CLI.
Feishu bot that lets users control Claude Code CLI from their phone.
## Architecture
```
┌─────────────┐ WebSocket ┌──────────────┐ LangChain ┌─────────────┐
│ Feishu │ ◄──────────────► │ FastAPI │ ◄──────────────► │ LLM API │
│ (client) │ │ (server) │ │ (OpenAI)
│ (client) │ │ (server) │ │ (ZhipuAI)
└─────────────┘ └──────────────┘ └─────────────┘
┌─────────────┐
│ Claude Code │
(PTY)
(headless)
└─────────────┘
```
@ -22,36 +22,144 @@ Feishu bot integration with Claude Code CLI.
| Module | Purpose |
|--------|---------|
| `main.py` | FastAPI entry point, starts WebSocket client + session manager |
| `bot/handler.py` | Receives Feishu events, dispatches to orchestrator |
| `bot/feishu.py` | Sends replies back to Feishu chats |
| `orchestrator/agent.py` | LangChain agent with per-user history + active session tracking |
| `orchestrator/tools.py` | Tools: `create_conversation`, `send_to_conversation`, `close_conversation` |
| `agent/manager.py` | Session registry with idle timeout reaper |
| `agent/pty_process.py` | Runs `claude -p` headlessly, manages session continuity |
| `bot/handler.py` | Receives Feishu events via long-connection WebSocket |
| `bot/feishu.py` | Sends text/file/card replies back to Feishu |
| `bot/commands.py` | Slash command handler (`/new`, `/list`, `/close`, `/switch`, `/help`) |
| `orchestrator/agent.py` | LangChain agent with per-user history + passthrough mode |
| `orchestrator/tools.py` | Tools: `create_conversation`, `send_to_conversation`, `list_conversations`, `close_conversation` |
| `agent/manager.py` | Session registry with persistence and idle timeout reaper |
| `agent/pty_process.py` | Runs `claude -p` headlessly, manages session continuity via `--resume` |
| `agent/audit.py` | Audit log of all interactions |
**Flow:** User message → Feishu WebSocket → Handler → Orchestrator (LLM decides action) → Tool → Session Manager → Claude Code PTY → Response back to Feishu
**Flow:** User message → Feishu WebSocket → Handler → (passthrough or LLM) → Session Manager → `claude -p` → Response back to Feishu
## Setup
---
1. **Feishu App**: Create at https://open.feishu.cn
- Enable Bot capability + long-connection event subscription
- Get `FEISHU_APP_ID` and `FEISHU_APP_SECRET`
## Feishu App Setup
2. **LLM Endpoint**: Configure OpenAI-compatible endpoint
- `OPENAI_BASE_URL`, `OPENAI_API_KEY`, `OPENAI_MODEL`
### 1. Create App
3. **Claude Code CLI**: Install and authenticate `claude` command
Go to [Feishu Open Platform](https://open.feishu.cn/app) → **Create App****Custom App**.
4. **Configuration**:
```bash
cp keyring.example.yaml keyring.yaml
# Edit keyring.yaml with your credentials
```
Record the **App ID** and **App Secret** from the Credentials page.
5. **Run**:
```bash
pip install -r requirements.txt
python main.py
```
### 2. Enable Bot Capability
**Requirements**: Python 3.11+
**App Features** → **Bot** → Enable.
### 3. Subscribe to Events (Long-connection mode)
**Event Subscriptions** → **Request URL** tab:
- Switch to **"Use long connection to receive events"** (长连接接收事件)
- No public URL required
Add event subscription:
| Event | Event Type |
|-------|-----------|
| Receive messages | `im.message.receive_v1` |
### 4. Required Permissions (API Scopes)
Go to **Permissions & Scopes** and add the following:
| Permission | API Scope | Used For |
|------------|-----------|----------|
| Read private messages sent to the bot | `im:message` (read) | Receiving user messages via WebSocket |
| Send messages | `im:message:send_as_bot` | Sending text replies |
| Upload files | `im:resource` | Uploading files before sending |
| Send messages in private chats | `im:message` (write) | Sending file messages |
Minimal scope list to request:
```
im:message
im:message:send_as_bot
im:resource
```
> **Note:** `im:resource` covers both file upload (`im.v1.file.create`) and sending
> file-type messages. Without it, `send_file()` will fail with a permission error.
### 5. Publish App
After adding all permissions:
1. **Version Management** → Create a new version → Submit for review (or self-publish if in the same org)
2. Install the app to your workspace
---
## Configuration
Copy and fill in credentials:
```bash
cp keyring.example.yaml keyring.yaml
```
`keyring.yaml` fields:
```yaml
FEISHU_APP_ID: cli_xxxxxxxxxxxxxxxx
FEISHU_APP_SECRET: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_BASE_URL: https://open.bigmodel.cn/api/paas/v4/
OPENAI_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_MODEL: glm-4.7
# Root directory for all project sessions (absolute path)
WORKING_DIR: C:/Users/yourname/projects
# Allowlist of Feishu open_ids that may use the bot.
# Leave empty to allow all users.
ALLOWED_OPEN_IDS:
- ou_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
```
---
## Installation & Run
**Requirements:** Python 3.11+, [Claude Code CLI](https://claude.ai/code) installed and authenticated.
```bash
python -m venv .venv
source .venv/Scripts/activate # Windows
# source .venv/bin/activate # Linux/macOS
pip install -r requirements.txt
python main.py
```
Server listens on `http://0.0.0.0:8000`.
Health check: `GET /health`
Claude smoke test: `GET /health/claude`
Active sessions: `GET /sessions`
---
## Bot Commands
| Command | Description |
|---------|-------------|
| `/new <dir> [msg]` | Create a new Claude Code session in `<dir>` |
| `/new <dir> [msg] --timeout N` | Create with custom CC timeout (seconds) |
| `/new <dir> [msg] --idle N` | Create with custom idle timeout (seconds) |
| `/list` | List your active sessions |
| `/switch <n>` | Switch active session to number `<n>` from `/list` |
| `/close [n]` | Close active session (or session `<n>`) |
| `/help` | Show command reference |
Any message without a `/` prefix is forwarded directly to the active Claude Code session.
---
## Security
- **Allowlist** (`ALLOWED_OPEN_IDS`): only listed `open_id`s can use the bot. Empty = open to all.
- **Path sandbox**: all session directories must be under `WORKING_DIR`; path traversal is blocked.
- **Session ownership**: sessions are tied to the creating user; other users cannot send to them.
- **Audit log**: all prompts and responses are written to `agent/audit.log`.

View File

@ -1,93 +0,0 @@
# PhoneWork Roadmap
Issues observed in real usage, grouped by impact. No priority order within each phase.
---
## Phase 1 — Core Reliability
These are friction points that directly break or degrade the basic send-message → get-reply loop.
### 1.1 Long output splitting
**Problem:** Feishu truncates messages at 4000 chars. Long code output is silently cut.
**Fix:** Automatically split into multiple sequential messages with `[1/3]`, `[2/3]` headers.
### 1.2 Concurrent message handling
**Problem:** If the user sends two messages quickly, both fire `agent.run()` simultaneously for the same user, causing race conditions in `_active_conv` and interleaved `--resume` calls to the same CC session.
**Fix:** Per-user async lock (or queue) so messages process one at a time per user.
### 1.3 Session persistence across restarts
**Problem:** `manager._sessions` is in-memory. A server restart loses all active sessions. Users have to recreate them.
**Fix:** Persist `{conv_id, cwd, cc_session_id}` to a JSON file on disk; reload on startup.
### 1.4 Mail boy passthrough mode
**Problem:** The mail boy (GLM) sometimes paraphrases or summarizes instead of relaying verbatim, losing code blocks and exact output.
**Fix:** Bypass the mail boy entirely for follow-up messages — detect that there's an active session and call `manager.send()` directly without an LLM round-trip.
---
## Phase 2 — Better Interaction Model
Reducing the number of messages needed to get things done.
### 2.1 Slash commands
**Problem:** Users must phrase everything as natural language for the mail boy to interpret.
**Fix:** Recognize a small set of commands directly in `handler.py` before hitting the LLM:
- `/new <dir>` — create session
- `/list` — list sessions
- `/close` — close active session
- `/switch <n>` — switch active session by number
- `/retry` — resend last message to CC
### 2.2 Multi-session switching
**Problem:** Only one "active session" per user. To switch projects, the user must remember conv_ids.
**Fix:** `/list` shows numbered sessions; `/switch 2` activates session #2. The system prompt shows all open sessions, not just the active one.
### 2.3 Feishu message cards
**Problem:** Plain text is hard to scan — code blocks, file paths, and status info all look the same.
**Fix:** Use Feishu Interactive Cards (`msg_type: interactive`) to render:
- Session status as a structured card (project name, cwd, session ID)
- Action buttons: **Continue**, **Close session**, **Run again**
---
## Phase 3 — Operational Quality
Making it reliable enough to leave running 24/7.
### 3.1 Health check improvements
**Problem:** `/health` only reports session count. No way to know if the Feishu WS connection is alive, or if CC is callable.
**Fix:** Add to `/health`:
- WebSocket connection status
- Last message received timestamp
- A `claude -p "ping"` smoke test result
### 3.2 Automatic reconnection
**Problem:** The Feishu WebSocket thread is a daemon — if it dies silently (network blip), no messages are received and there's no recovery.
**Fix:** Wrap `ws_client.start()` in a retry loop with exponential backoff and log reconnection events.
### 3.3 Per-session timeout configuration
**Problem:** All sessions share a 30-min idle timeout and 300s CC timeout. Long-running tasks (e.g. running tests) may need more; quick chats need less.
**Fix:** Allow per-session timeout overrides; expose via `/new <dir> --timeout 600`.
### 3.4 Audit log
**Problem:** No record of what was sent to Claude Code or what it did. Impossible to debug after the fact.
**Fix:** Append each `(timestamp, conv_id, prompt, response)` to a JSONL file per session under the project directory.
---
## Phase 4 — Multi-user & Security
For sharing the bot with teammates.
### 4.1 User allowlist
**Problem:** Anyone who can message the bot can run arbitrary code via Claude Code.
**Fix:** `ALLOWED_OPEN_IDS` list in `keyring.yaml`; reject messages from unknown users.
### 4.2 Per-user session isolation
**Problem:** All users share the same `manager` singleton — user A could theoretically send to user B's session by guessing a conv_id.
**Fix:** Namespace sessions by user_id; `send_to_conversation` validates that the requesting user owns the session.
### 4.3 Working directory sandboxing
**Problem:** The safety check in `_resolve_dir` blocks paths outside `WORKING_DIR`, but Claude Code itself runs with `--dangerously-skip-permissions` and can write anywhere.
**Fix:** Consider running CC in a restricted user account or container; or drop `--dangerously-skip-permissions` and implement a permission-approval flow via Feishu buttons.

76
agent/audit.py Normal file
View File

@ -0,0 +1,76 @@
"""Audit logging for Claude Code sessions."""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
AUDIT_DIR = Path(__file__).parent.parent / "audit"
def _ensure_audit_dir() -> None:
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
def log_interaction(
conv_id: str,
prompt: str,
response: str,
cwd: Optional[str] = None,
user_id: Optional[str] = None,
) -> None:
"""
Log an interaction to a JSONL file per session.
Args:
conv_id: Conversation/session ID
prompt: User's message/prompt
response: Claude Code's response
cwd: Working directory (optional)
user_id: User identifier (optional)
"""
try:
_ensure_audit_dir()
log_file = AUDIT_DIR / f"{conv_id}.jsonl"
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"conv_id": conv_id,
"prompt": prompt,
"response": response,
}
if cwd:
entry["cwd"] = cwd
if user_id:
entry["user_id"] = user_id[-8:] if len(user_id) > 8 else user_id
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
logger.debug("Logged interaction for session %s", conv_id)
except Exception:
logger.exception("Failed to log audit entry for session %s", conv_id)
def get_audit_log(conv_id: str, limit: int = 100) -> list[dict]:
"""Read the audit log for a session."""
log_file = AUDIT_DIR / f"{conv_id}.jsonl"
if not log_file.exists():
return []
entries = []
try:
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
entries.append(json.loads(line))
except Exception:
logger.exception("Failed to read audit log for session %s", conv_id)
return entries[-limit:]

View File

@ -3,45 +3,55 @@
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from dataclasses import dataclass, field
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Dict, List, Optional
from agent.pty_process import run_claude
from agent.audit import log_interaction
logger = logging.getLogger(__name__)
IDLE_TIMEOUT = 30 * 60 # 30 minutes in seconds
DEFAULT_IDLE_TIMEOUT = 30 * 60
DEFAULT_CC_TIMEOUT = 300.0
PERSISTENCE_FILE = Path(__file__).parent.parent / "sessions.json"
@dataclass
class Session:
conv_id: str
cwd: str
# Stable UUID passed to `claude --session-id` so CC owns the history
owner_id: str = ""
cc_session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
last_activity: float = field(default_factory=lambda: asyncio.get_event_loop().time())
# True after the first message has been sent (so we know to use --resume)
last_activity: float = 0.0
started: bool = False
idle_timeout: int = DEFAULT_IDLE_TIMEOUT
cc_timeout: float = DEFAULT_CC_TIMEOUT
def touch(self) -> None:
self.last_activity = asyncio.get_event_loop().time()
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "Session":
return cls(**data)
class SessionManager:
"""Registry of active Claude Code project sessions."""
"""Registry of active Claude Code project sessions with persistence and user isolation."""
def __init__(self) -> None:
self._sessions: Dict[str, Session] = {}
self._lock = asyncio.Lock()
self._reaper_task: Optional[asyncio.Task] = None
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
async def start(self) -> None:
self._load()
loop = asyncio.get_event_loop()
self._reaper_task = loop.create_task(self._reaper_loop())
@ -50,62 +60,116 @@ class SessionManager:
self._reaper_task.cancel()
async with self._lock:
self._sessions.clear()
if PERSISTENCE_FILE.exists():
PERSISTENCE_FILE.unlink()
async def create(self, conv_id: str, working_dir: str) -> Session:
"""Register a new session for the given working directory."""
async def create(
self,
conv_id: str,
working_dir: str,
owner_id: str = "",
idle_timeout: int = DEFAULT_IDLE_TIMEOUT,
cc_timeout: float = DEFAULT_CC_TIMEOUT,
) -> Session:
async with self._lock:
session = Session(conv_id=conv_id, cwd=working_dir)
session = Session(
conv_id=conv_id,
cwd=working_dir,
owner_id=owner_id,
idle_timeout=idle_timeout,
cc_timeout=cc_timeout,
)
self._sessions[conv_id] = session
self._save()
logger.info(
"Created session %s (cc_session_id=%s) in %s",
conv_id, session.cc_session_id, working_dir,
"Created session %s (owner=...%s) in %s (idle=%ds, cc=%.0fs)",
conv_id, owner_id[-8:] if owner_id else "-", working_dir, idle_timeout, cc_timeout,
)
return session
async def send(self, conv_id: str, message: str) -> str:
"""
Run claude -p with the message in the session's directory.
- First message: uses --session-id <uuid> to establish the CC session.
- Subsequent messages: uses --resume <uuid> so CC has full history.
"""
async def send(self, conv_id: str, message: str, user_id: Optional[str] = None) -> str:
async with self._lock:
session = self._sessions.get(conv_id)
if session is None:
raise KeyError(f"No session for conv_id={conv_id!r}")
if session.owner_id and user_id and session.owner_id != user_id:
raise PermissionError(f"Session {conv_id} belongs to another user")
session.touch()
cwd = session.cwd
cc_session_id = session.cc_session_id
cc_timeout = session.cc_timeout
first_message = not session.started
if first_message:
session.started = True
self._save()
output = await run_claude(
message,
cwd=cwd,
cc_session_id=cc_session_id,
resume=not first_message,
timeout=cc_timeout,
)
log_interaction(
conv_id=conv_id,
prompt=message,
response=output,
cwd=cwd,
user_id=user_id,
)
return output
async def close(self, conv_id: str) -> bool:
"""Remove a session. Returns True if it existed."""
async def close(self, conv_id: str, user_id: Optional[str] = None) -> bool:
async with self._lock:
if conv_id not in self._sessions:
session = self._sessions.get(conv_id)
if session is None:
return False
if session.owner_id and user_id and session.owner_id != user_id:
raise PermissionError(f"Session {conv_id} belongs to another user")
del self._sessions[conv_id]
self._save()
logger.info("Closed session %s", conv_id)
return True
def list_sessions(self) -> list[dict]:
def list_sessions(self, user_id: Optional[str] = None) -> list[dict]:
sessions = self._sessions.values()
if user_id:
sessions = [s for s in sessions if not s.owner_id or s.owner_id == user_id]
return [
{"conv_id": s.conv_id, "cwd": s.cwd, "cc_session_id": s.cc_session_id}
for s in self._sessions.values()
{
"conv_id": s.conv_id,
"cwd": s.cwd,
"owner_id": s.owner_id[-8:] if s.owner_id else None,
"cc_session_id": s.cc_session_id,
"started": s.started,
"idle_timeout": s.idle_timeout,
"cc_timeout": s.cc_timeout,
}
for s in sessions
]
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _save(self) -> None:
try:
data = {cid: s.to_dict() for cid, s in self._sessions.items()}
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE)
except Exception:
logger.exception("Failed to save sessions")
def _load(self) -> None:
if not PERSISTENCE_FILE.exists():
return
try:
with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for cid, sdata in data.items():
self._sessions[cid] = Session.from_dict(sdata)
logger.info("Loaded %d sessions from %s", len(self._sessions), PERSISTENCE_FILE)
except Exception:
logger.exception("Failed to load sessions")
async def _reaper_loop(self) -> None:
while True:
@ -115,14 +179,15 @@ class SessionManager:
async def _reap_idle(self) -> None:
now = asyncio.get_event_loop().time()
async with self._lock:
to_close = [
cid for cid, s in self._sessions.items()
if (now - s.last_activity) > IDLE_TIMEOUT
]
to_close = []
for cid, s in self._sessions.items():
if s.last_activity > 0 and (now - s.last_activity) > s.idle_timeout:
to_close.append(cid)
for cid in to_close:
del self._sessions[cid]
logger.info("Reaped idle session %s", cid)
if to_close:
self._save()
# Module-level singleton
manager = SessionManager()

191
bot/commands.py Normal file
View File

@ -0,0 +1,191 @@
"""Slash command handler for direct bot control."""
from __future__ import annotations
import argparse
import json
import logging
import re
import uuid
from typing import Optional, Tuple
from agent.manager import manager
from orchestrator.agent import agent
from orchestrator.tools import set_current_user, get_current_user
logger = logging.getLogger(__name__)
def parse_command(text: str) -> Optional[Tuple[str, str]]:
"""
Parse a slash command from text.
Returns (command, args) or None if not a command.
"""
text = text.strip()
if not text.startswith("/"):
return None
parts = text.split(None, 1)
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
return (cmd, args)
async def handle_command(user_id: str, text: str) -> Optional[str]:
"""
Handle a slash command. Returns the reply or None if not a command.
"""
parsed = parse_command(text)
if not parsed:
return None
cmd, args = parsed
logger.info("Command: %s args=%r user=...%s", cmd, args[:50], user_id[-8:])
set_current_user(user_id)
if cmd in ("/new", "/n"):
return await _cmd_new(user_id, args)
elif cmd in ("/list", "/ls", "/l"):
return await _cmd_list(user_id)
elif cmd in ("/close", "/c"):
return await _cmd_close(user_id, args)
elif cmd in ("/switch", "/s"):
return await _cmd_switch(user_id, args)
elif cmd == "/retry":
return await _cmd_retry(user_id)
elif cmd in ("/help", "/h", "/?"):
return _cmd_help()
else:
return f"Unknown command: {cmd}\n\n{_cmd_help()}"
async def _cmd_new(user_id: str, args: str) -> str:
"""Create a new session."""
if not args:
return "Usage: /new <project_dir> [initial_message] [--timeout N]\nExample: /new todo_app fix the bug --timeout 600"
parser = argparse.ArgumentParser()
parser.add_argument("working_dir", nargs="?", help="Project directory")
parser.add_argument("rest", nargs="*", help="Initial message")
parser.add_argument("--timeout", type=int, default=None, help="CC timeout in seconds")
parser.add_argument("--idle", type=int, default=None, help="Idle timeout in seconds")
try:
parsed = parser.parse_args(args.split())
except SystemExit:
return "Usage: /new <project_dir> [initial_message] [--timeout N] [--idle N]"
if not parsed.working_dir:
return "Error: project_dir is required"
working_dir = parsed.working_dir
initial_msg = " ".join(parsed.rest) if parsed.rest else None
from orchestrator.tools import CreateConversationTool
tool = CreateConversationTool()
result = await tool._arun(
working_dir=working_dir,
initial_message=initial_msg,
cc_timeout=parsed.timeout,
idle_timeout=parsed.idle,
)
try:
data = json.loads(result)
if "error" in data:
return f"Error: {data['error']}"
conv_id = data.get("conv_id", "")
agent._active_conv[user_id] = conv_id
cwd = data.get("working_dir", working_dir)
reply = f"✓ Created session `{conv_id}` in `{cwd}`"
if parsed.timeout:
reply += f" (timeout: {parsed.timeout}s)"
if initial_msg:
reply += f"\n\nSent: {initial_msg[:100]}..."
return reply
except Exception:
return result
async def _cmd_list(user_id: str) -> str:
"""List all sessions for this user."""
sessions = manager.list_sessions(user_id=user_id)
if not sessions:
return "No active sessions."
active = agent.get_active_conv(user_id)
lines = ["**Your Sessions:**\n"]
for i, s in enumerate(sessions, 1):
marker = "" if s["conv_id"] == active else " "
lines.append(f"{marker}{i}. `{s['conv_id']}` - `{s['cwd']}`")
lines.append("\nUse `/switch <n>` to activate a session.")
return "\n".join(lines)
async def _cmd_close(user_id: str, args: str) -> str:
"""Close a session."""
sessions = manager.list_sessions(user_id=user_id)
if not sessions:
return "No sessions to close."
if args:
try:
idx = int(args) - 1
if 0 <= idx < len(sessions):
conv_id = sessions[idx]["conv_id"]
else:
return f"Invalid session number. Use 1-{len(sessions)}."
except ValueError:
conv_id = args.strip()
else:
conv_id = agent.get_active_conv(user_id)
if not conv_id:
return "No active session. Use `/close <conv_id>` or `/close <n>`."
try:
success = await manager.close(conv_id, user_id=user_id)
if success:
if agent.get_active_conv(user_id) == conv_id:
agent._active_conv[user_id] = None
return f"✓ Closed session `{conv_id}`"
else:
return f"Session `{conv_id}` not found."
except PermissionError as e:
return str(e)
async def _cmd_switch(user_id: str, args: str) -> str:
"""Switch to a different session."""
sessions = manager.list_sessions(user_id=user_id)
if not sessions:
return "No sessions available."
if not args:
return "Usage: /switch <n>\n" + await _cmd_list(user_id)
try:
idx = int(args) - 1
if 0 <= idx < len(sessions):
conv_id = sessions[idx]["conv_id"]
agent._active_conv[user_id] = conv_id
return f"✓ Switched to session `{conv_id}` ({sessions[idx]['cwd']})"
else:
return f"Invalid session number. Use 1-{len(sessions)}."
except ValueError:
return f"Invalid number: {args}"
async def _cmd_retry(user_id: str) -> str:
"""Retry the last message (placeholder - needs history tracking)."""
return "Retry not yet implemented. Just send your message again."
def _cmd_help() -> str:
"""Show help."""
return """**Commands:**
/new <dir> [msg] [--timeout N] [--idle N] - Create session
/list - List your sessions
/close [n] - Close session (active or by number)
/switch <n> - Switch to session by number
/retry - Retry last message
/help - Show this help"""

View File

@ -2,10 +2,14 @@
from __future__ import annotations
import asyncio
import json
import logging
import lark_oapi as lark
from lark_oapi.api.im.v1 import (
CreateFileRequest,
CreateFileRequestBody,
CreateMessageRequest,
CreateMessageRequestBody,
)
@ -14,8 +18,7 @@ from config import FEISHU_APP_ID, FEISHU_APP_SECRET
logger = logging.getLogger(__name__)
# Max Feishu text message length
MAX_TEXT_LEN = 4000
MAX_TEXT_LEN = 3900
def _make_client() -> lark.Client:
@ -31,54 +34,252 @@ def _make_client() -> lark.Client:
_client = _make_client()
def _truncate(text: str) -> str:
def _split_message(text: str) -> list[str]:
if len(text) <= MAX_TEXT_LEN:
return text
return text[: MAX_TEXT_LEN - 20] + "\n...[truncated]"
return [text]
parts: list[str] = []
remaining = text
while remaining:
if len(remaining) <= MAX_TEXT_LEN:
parts.append(remaining)
break
chunk = remaining[:MAX_TEXT_LEN]
last_newline = chunk.rfind("\n")
if last_newline > MAX_TEXT_LEN // 2:
chunk = remaining[:last_newline + 1]
parts.append(chunk)
remaining = remaining[len(chunk):]
total = len(parts)
headered_parts = []
for i, part in enumerate(parts, 1):
headered_parts.append(f"[{i}/{total}]\n{part}")
return headered_parts
async def send_text(receive_id: str, receive_id_type: str, text: str) -> None:
"""
Send a plain-text message to a Feishu chat or user.
Automatically splits long messages into multiple parts with [1/N] headers.
Args:
receive_id: chat_id or open_id depending on receive_id_type.
receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id".
text: message content.
"""
import json as _json
truncated = _truncate(text)
logger.debug(
"[feishu] send_text to=%s type=%s len=%d/%d text=%r",
receive_id, receive_id_type, len(truncated), len(text), truncated[:120],
)
content = _json.dumps({"text": truncated}, ensure_ascii=False)
parts = _split_message(text)
loop = asyncio.get_event_loop()
for i, part in enumerate(parts):
logger.debug(
"[feishu] send_text to=%s type=%s part=%d/%d len=%d",
receive_id, receive_id_type, i + 1, len(parts), len(part),
)
content = json.dumps({"text": part}, ensure_ascii=False)
request = (
CreateMessageRequest.builder()
.receive_id_type(receive_id_type)
.request_body(
CreateMessageRequestBody.builder()
.receive_id(receive_id)
.msg_type("text")
.content(content)
.build()
)
.build()
)
response = await loop.run_in_executor(
None,
lambda: _client.im.v1.message.create(request),
)
if not response.success():
logger.error(
"Feishu send_text failed: code=%s msg=%s",
response.code,
response.msg,
)
return
else:
logger.debug("Sent message part %d/%d to %s (%s)", i + 1, len(parts), receive_id, receive_id_type)
if len(parts) > 1 and i < len(parts) - 1:
await asyncio.sleep(0.3)
async def send_card(receive_id: str, receive_id_type: str, title: str, content: str, buttons: list[dict] | None = None) -> None:
"""
Send an interactive card message.
Args:
receive_id: chat_id or open_id
receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id"
title: Card title
content: Card content (markdown supported)
buttons: List of button dicts with "text" and "value" keys
"""
elements = [
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": content,
},
},
]
if buttons:
actions = []
for btn in buttons:
actions.append({
"tag": "button",
"text": {"tag": "plain_text", "content": btn.get("text", "Button")},
"type": "primary",
"value": btn.get("value", {}),
})
elements.append({"tag": "action", "actions": actions})
card = {
"type": "template",
"data": {
"template_id": "AAqkz9****",
"template_variable": {
"title": title,
"elements": elements,
},
},
}
card_content = {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": "blue",
},
"elements": elements,
}
loop = asyncio.get_event_loop()
request = (
CreateMessageRequest.builder()
.receive_id_type(receive_id_type)
.request_body(
CreateMessageRequestBody.builder()
.receive_id(receive_id)
.msg_type("text")
.content(content)
.msg_type("interactive")
.content(json.dumps(card_content, ensure_ascii=False))
.build()
)
.build()
)
import asyncio
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: _client.im.v1.message.create(request),
)
if not response.success():
logger.error("Feishu send_card failed: code=%s msg=%s", response.code, response.msg)
else:
logger.debug("Sent card to %s (%s)", receive_id, receive_id_type)
async def send_file(receive_id: str, receive_id_type: str, file_path: str, file_type: str = "stream") -> None:
"""
Upload a local file to Feishu and send it as a file message.
Args:
receive_id: chat_id or open_id depending on receive_id_type.
receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id".
file_path: Absolute path to the local file to send.
file_type: Feishu file type "stream" (generic), "opus", "mp4", "pdf", "doc", "xls", "ppt".
"""
import os
path = os.path.abspath(file_path)
file_name = os.path.basename(path)
loop = asyncio.get_event_loop()
# Step 1: Upload file → get file_key
with open(path, "rb") as f:
file_data = f.read()
def _upload():
req = (
CreateFileRequest.builder()
.request_body(
CreateFileRequestBody.builder()
.file_type(file_type)
.file_name(file_name)
.file(file_data)
.build()
)
.build()
)
return _client.im.v1.file.create(req)
upload_resp = await loop.run_in_executor(None, _upload)
if not upload_resp.success():
logger.error(
"Feishu send_text failed: code=%s msg=%s",
response.code,
response.msg,
"Feishu file upload failed: code=%s msg=%s",
upload_resp.code,
upload_resp.msg,
)
return
file_key = upload_resp.data.file_key
logger.debug("Uploaded file %r → file_key=%r", file_name, file_key)
# Step 2: Send file message using the file_key
content = json.dumps({"file_key": file_key}, ensure_ascii=False)
request = (
CreateMessageRequest.builder()
.receive_id_type(receive_id_type)
.request_body(
CreateMessageRequestBody.builder()
.receive_id(receive_id)
.msg_type("file")
.content(content)
.build()
)
.build()
)
send_resp = await loop.run_in_executor(
None,
lambda: _client.im.v1.message.create(request),
)
if not send_resp.success():
logger.error(
"Feishu send_file failed: code=%s msg=%s",
send_resp.code,
send_resp.msg,
)
else:
logger.debug("Sent message to %s (%s)", receive_id, receive_id_type)
logger.debug("Sent file %r to %s (%s)", file_name, receive_id, receive_id_type)
def build_session_card(conv_id: str, cwd: str, started: bool) -> dict:
"""Build a session status card."""
status = "🟢 Active" if started else "🟡 Ready"
content = f"**Session ID:** `{conv_id}`\n**Directory:** `{cwd}`\n**Status:** {status}"
return {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": "Claude Code Session"},
"template": "turquoise",
},
"elements": [
{"tag": "div", "text": {"tag": "lark_md", "content": content}},
{"tag": "hr"},
{
"tag": "action",
"actions": [
{"tag": "button", "text": {"tag": "plain_text", "content": "Continue"}, "type": "primary", "value": {"action": "continue", "conv_id": conv_id}},
{"tag": "button", "text": {"tag": "plain_text", "content": "Close"}, "type": "default", "value": {"action": "close", "conv_id": conv_id}},
],
},
],
}

View File

@ -6,30 +6,41 @@ import asyncio
import json
import logging
import threading
import time
import lark_oapi as lark
from lark_oapi.api.im.v1 import P2ImMessageReceiveV1
from bot.commands import handle_command
from bot.feishu import send_text
from config import FEISHU_APP_ID, FEISHU_APP_SECRET
from config import FEISHU_APP_ID, FEISHU_APP_SECRET, is_user_allowed
from orchestrator.agent import agent
logger = logging.getLogger(__name__)
# Keep a reference to the running event loop so sync callbacks can schedule coroutines
_main_loop: asyncio.AbstractEventLoop | None = None
_ws_connected: bool = False
_last_message_time: float = 0.0
_reconnect_count: int = 0
def get_ws_status() -> dict:
"""Return WebSocket connection status."""
return {
"connected": _ws_connected,
"last_message_time": _last_message_time,
"reconnect_count": _reconnect_count,
}
def _handle_message(data: P2ImMessageReceiveV1) -> None:
"""
Synchronous callback invoked by the lark-oapi SDK on every incoming message.
We schedule async work onto the main event loop.
"""
global _last_message_time
_last_message_time = time.time()
try:
message = data.event.message
sender = data.event.sender
# Log raw event for debugging
logger.debug(
"event type=%r chat_type=%r content=%r",
getattr(message, "message_type", None),
@ -37,18 +48,15 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None:
(getattr(message, "content", None) or "")[:100],
)
# Only handle text messages
if message.message_type != "text":
logger.info("Skipping non-text message_type=%r", message.message_type)
return
# Extract fields
chat_id: str = message.chat_id
raw_content: str = message.content or "{}"
content_obj = json.loads(raw_content)
text: str = content_obj.get("text", "").strip()
# Strip @mentions (Feishu injects "@bot_name " at start of group messages)
import re
text = re.sub(r"@\S+\s*", "", text).strip()
@ -62,14 +70,12 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None:
logger.info("✉ ...%s%r", open_id[-8:], text[:80])
# Use open_id as user identifier for per-user history in the orchestrator
user_id = open_id or chat_id
if _main_loop is None:
logger.error("Main event loop not set; cannot process message")
return
# Schedule async processing; fire-and-forget
asyncio.run_coroutine_threadsafe(
_process_message(user_id, chat_id, text),
_main_loop,
@ -79,9 +85,16 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None:
async def _process_message(user_id: str, chat_id: str, text: str) -> None:
"""Run the orchestration agent and send the reply back to Feishu."""
"""Process message: check allowlist, then commands, then agent."""
try:
reply = await agent.run(user_id, text)
if not is_user_allowed(user_id):
logger.warning("Rejected message from unauthorized user: ...%s", user_id[-8:])
await send_text(chat_id, "chat_id", "Sorry, you are not authorized to use this bot.")
return
reply = await handle_command(user_id, text)
if reply is None:
reply = await agent.run(user_id, text)
if reply:
await send_text(chat_id, "chat_id", reply)
except Exception:
@ -112,19 +125,38 @@ def start_websocket_client(loop: asyncio.AbstractEventLoop) -> None:
global _main_loop
_main_loop = loop
event_handler = build_event_handler()
def _run_with_reconnect() -> None:
global _ws_connected, _reconnect_count
backoff = 1.0
max_backoff = 60.0
ws_client = lark.ws.Client(
FEISHU_APP_ID,
FEISHU_APP_SECRET,
event_handler=event_handler,
log_level=lark.LogLevel.INFO,
)
while True:
try:
_ws_connected = False
event_handler = build_event_handler()
ws_client = lark.ws.Client(
FEISHU_APP_ID,
FEISHU_APP_SECRET,
event_handler=event_handler,
log_level=lark.LogLevel.INFO,
)
def _run() -> None:
logger.info("Starting Feishu long-connection client...")
ws_client.start() # blocks until disconnected
logger.info("Starting Feishu long-connection client...")
_ws_connected = True
_reconnect_count += 1
ws_client.start()
logger.warning("WebSocket disconnected, will reconnect...")
thread = threading.Thread(target=_run, daemon=True, name="feishu-ws")
except Exception as e:
logger.error("WebSocket error: %s", e)
finally:
_ws_connected = False
logger.info("Reconnecting in %.1fs (attempt %d)...", backoff, _reconnect_count)
time.sleep(backoff)
backoff = min(backoff * 2, max_backoff)
thread = threading.Thread(target=_run_with_reconnect, daemon=True, name="feishu-ws")
thread.start()
logger.info("Feishu WebSocket thread started")

View File

@ -1,5 +1,6 @@
import yaml
from pathlib import Path
from typing import List
_CONFIG_PATH = Path(__file__).parent / "keyring.yaml"
@ -17,3 +18,14 @@ OPENAI_BASE_URL: str = _cfg["OPENAI_BASE_URL"]
OPENAI_API_KEY: str = _cfg["OPENAI_API_KEY"]
OPENAI_MODEL: str = _cfg.get("OPENAI_MODEL", "glm-4.7")
WORKING_DIR: Path = Path(_cfg.get("WORKING_DIR", Path.home())).expanduser().resolve()
ALLOWED_OPEN_IDS: List[str] = _cfg.get("ALLOWED_OPEN_IDS", [])
if ALLOWED_OPEN_IDS and not isinstance(ALLOWED_OPEN_IDS, list):
ALLOWED_OPEN_IDS = [str(ALLOWED_OPEN_IDS)]
def is_user_allowed(open_id: str) -> bool:
"""Check if a user is allowed to use the bot."""
if not ALLOWED_OPEN_IDS:
return True
return open_id in ALLOWED_OPEN_IDS

54
main.py
View File

@ -4,13 +4,14 @@ from __future__ import annotations
import asyncio
import logging
import time
import uvicorn
from fastapi import FastAPI
from rich.logging import RichHandler
from agent.manager import manager
from bot.handler import start_websocket_client
from bot.handler import start_websocket_client, get_ws_status
logging.basicConfig(
level=logging.DEBUG,
@ -23,18 +24,61 @@ logging.basicConfig(
omit_repeated_times=False,
)],
)
# Suppress noisy third-party debug output
for _noisy in ("httpcore", "httpx", "openai._base_client", "urllib3", "lark_oapi", "websockets"):
logging.getLogger(_noisy).setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
app = FastAPI(title="PhoneWork", version="0.1.0")
START_TIME = time.time()
@app.get("/health")
async def health() -> dict:
sessions = manager.list_sessions()
return {"status": "ok", "active_sessions": len(sessions)}
ws_status = get_ws_status()
uptime = time.time() - START_TIME
result = {
"status": "ok",
"uptime_seconds": round(uptime, 1),
"active_sessions": len(sessions),
"websocket": ws_status,
}
if ws_status.get("connected"):
result["status"] = "ok"
else:
result["status"] = "degraded"
return result
@app.get("/health/claude")
async def health_claude() -> dict:
"""Smoke test: run a simple claude -p command."""
from agent.pty_process import run_claude
import tempfile
import os
start = time.time()
try:
with tempfile.TemporaryDirectory() as tmpdir:
output = await run_claude(
"Say 'pong' and nothing else",
cwd=tmpdir,
timeout=30.0,
)
elapsed = time.time() - start
return {
"status": "ok",
"elapsed_seconds": round(elapsed, 2),
"output_preview": output[:100] if output else None,
}
except asyncio.TimeoutError:
return {"status": "timeout", "elapsed_seconds": 30.0}
except Exception as e:
return {"status": "error", "error": str(e)}
@app.get("/sessions")
@ -44,11 +88,7 @@ async def list_sessions() -> list:
@app.on_event("startup")
async def startup_event() -> None:
# Start the session manager's idle-reaper
await manager.start()
# Start the Feishu WebSocket client in a background thread,
# passing the running event loop so async work can be scheduled
loop = asyncio.get_event_loop()
start_websocket_client(loop)
logger.info("PhoneWork started")

View File

@ -5,8 +5,10 @@ Uses LangChain 1.x tool-calling pattern: bind_tools + manual agentic loop.
from __future__ import annotations
import asyncio
import json
import logging
import re
from collections import defaultdict
from typing import Dict, List, Optional
@ -19,8 +21,9 @@ from langchain_core.messages import (
)
from langchain_openai import ChatOpenAI
from agent.manager import manager
from config import OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL, WORKING_DIR
from orchestrator.tools import TOOLS
from orchestrator.tools import TOOLS, set_current_user
logger = logging.getLogger(__name__)
@ -52,6 +55,12 @@ Guidelines:
MAX_ITERATIONS = 10
_TOOL_MAP = {t.name: t for t in TOOLS}
COMMAND_PATTERN = re.compile(r"^/(new|list|close|switch|retry|help)", re.IGNORECASE)
def _looks_like_command(text: str) -> bool:
return bool(COMMAND_PATTERN.match(text.strip()))
class OrchestrationAgent:
"""Per-user agent with conversation history and active session tracking."""
@ -69,6 +78,8 @@ class OrchestrationAgent:
self._history: Dict[str, List[BaseMessage]] = defaultdict(list)
# user_id -> most recently active conv_id
self._active_conv: Dict[str, Optional[str]] = defaultdict(lambda: None)
# user_id -> asyncio.Lock (prevents concurrent processing per user)
self._user_locks: Dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
def _build_system_prompt(self, user_id: str) -> str:
conv_id = self._active_conv[user_id]
@ -81,13 +92,35 @@ class OrchestrationAgent:
active_session_line=active_line,
)
def get_active_conv(self, user_id: str) -> Optional[str]:
return self._active_conv.get(user_id)
async def run(self, user_id: str, text: str) -> str:
"""Process a user message and return the agent's reply."""
async with self._user_locks[user_id]:
return await self._run_locked(user_id, text)
async def _run_locked(self, user_id: str, text: str) -> str:
"""Internal implementation, must be called with user lock held."""
set_current_user(user_id)
active_conv = self._active_conv[user_id]
short_uid = user_id[-8:]
logger.info(">>> user=...%s conv=%s msg=%r", short_uid, active_conv, text[:80])
logger.debug(" history_len=%d", len(self._history[user_id]))
# Passthrough mode: if active session and not a command, bypass LLM
if active_conv and not _looks_like_command(text):
try:
reply = await manager.send(active_conv, text, user_id=user_id)
logger.info("<<< [passthrough] reply: %r", reply[:120])
return reply
except KeyError:
logger.warning("Session %s no longer exists, clearing active_conv", active_conv)
self._active_conv[user_id] = None
except Exception as exc:
logger.exception("Passthrough error for user=%s", user_id)
return f"[Error] {exc}"
messages: List[BaseMessage] = (
[SystemMessage(content=self._build_system_prompt(user_id))]
+ self._history[user_id]
@ -161,5 +194,4 @@ class OrchestrationAgent:
return reply
# Module-level singleton
agent = OrchestrationAgent()

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import json
import uuid
from contextvars import ContextVar
from pathlib import Path
from typing import Optional, Type
@ -13,6 +14,16 @@ from pydantic import BaseModel, Field
from agent.manager import manager
from config import WORKING_DIR
_current_user_id: ContextVar[Optional[str]] = ContextVar("current_user_id", default=None)
def set_current_user(user_id: Optional[str]) -> None:
_current_user_id.set(user_id)
def get_current_user() -> Optional[str]:
return _current_user_id.get()
def _resolve_dir(working_dir: str) -> Path:
"""
@ -21,14 +32,21 @@ def _resolve_dir(working_dir: str) -> Path:
Rules:
- Absolute paths are used as-is (but must stay within WORKING_DIR for safety).
- Relative paths / bare names are joined onto WORKING_DIR.
- Path traversal attempts (..) are blocked.
- The resolved directory is created if it doesn't exist.
"""
working_dir = working_dir.strip()
if ".." in working_dir.split("/") or ".." in working_dir.split("\\"):
raise ValueError(
"Path traversal not allowed. Use a subfolder name or path inside the working directory."
)
p = Path(working_dir)
if not p.is_absolute():
p = WORKING_DIR / p
p = p.resolve()
# Safety: must be inside WORKING_DIR
try:
p.relative_to(WORKING_DIR)
except ValueError:
@ -55,6 +73,8 @@ class CreateConversationInput(BaseModel):
),
)
initial_message: Optional[str] = Field(None, description="Optional first message to send after spawning")
idle_timeout: Optional[int] = Field(None, description="Idle timeout in seconds (default 1800)")
cc_timeout: Optional[float] = Field(None, description="Claude Code execution timeout in seconds (default 300)")
class SendToConversationInput(BaseModel):
@ -79,17 +99,24 @@ class CreateConversationTool(BaseTool):
)
args_schema: Type[BaseModel] = CreateConversationInput
def _run(self, working_dir: str, initial_message: Optional[str] = None) -> str:
def _run(self, working_dir: str, initial_message: Optional[str] = None, idle_timeout: Optional[int] = None, cc_timeout: Optional[float] = None) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, working_dir: str, initial_message: Optional[str] = None) -> str:
async def _arun(self, working_dir: str, initial_message: Optional[str] = None, idle_timeout: Optional[int] = None, cc_timeout: Optional[float] = None) -> str:
try:
resolved = _resolve_dir(working_dir)
except ValueError as exc:
return json.dumps({"error": str(exc)})
user_id = get_current_user()
conv_id = str(uuid.uuid4())[:8]
await manager.create(conv_id, str(resolved))
await manager.create(
conv_id,
str(resolved),
owner_id=user_id or "",
idle_timeout=idle_timeout or 1800,
cc_timeout=cc_timeout or 300.0,
)
result: dict = {
"conv_id": conv_id,
@ -97,7 +124,7 @@ class CreateConversationTool(BaseTool):
}
if initial_message:
output = await manager.send(conv_id, initial_message)
output = await manager.send(conv_id, initial_message, user_id=user_id)
result["response"] = output
else:
result["status"] = "Session created. Send a message to start working."
@ -117,11 +144,14 @@ class SendToConversationTool(BaseTool):
raise NotImplementedError("Use async version")
async def _arun(self, conv_id: str, message: str) -> str:
user_id = get_current_user()
try:
output = await manager.send(conv_id, message)
output = await manager.send(conv_id, message, user_id=user_id)
return json.dumps({"conv_id": conv_id, "response": output}, ensure_ascii=False)
except KeyError:
return json.dumps({"error": f"No active session for conv_id={conv_id!r}"})
except PermissionError as e:
return json.dumps({"error": str(e)})
class ListConversationsTool(BaseTool):
@ -132,7 +162,8 @@ class ListConversationsTool(BaseTool):
raise NotImplementedError("Use async version")
async def _arun(self) -> str:
sessions = manager.list_sessions()
user_id = get_current_user()
sessions = manager.list_sessions(user_id=user_id)
if not sessions:
return "No active sessions."
return json.dumps(sessions, ensure_ascii=False, indent=2)
@ -147,10 +178,14 @@ class CloseConversationTool(BaseTool):
raise NotImplementedError("Use async version")
async def _arun(self, conv_id: str) -> str:
closed = await manager.close(conv_id)
if closed:
return f"Session {conv_id!r} closed."
return f"Session {conv_id!r} not found."
user_id = get_current_user()
try:
closed = await manager.close(conv_id, user_id=user_id)
if closed:
return f"Session {conv_id!r} closed."
return f"Session {conv_id!r} not found."
except PermissionError as e:
return str(e)
# Module-level tool list for easy import