Compare commits

...

2 Commits

Author SHA1 Message Date
Yuyao Huang (Sam)
52a9d085f7 feat: 重构数据存储路径并优化任务通知机制
将审计日志、会话数据和定时任务文件移动到统一的data目录下
为后台任务添加完成回调功能,优化CC任务完成后的通知流程
更新README和ROADMAP文档,标记已完成的功能项
2026-03-29 02:32:48 +08:00
Yuyao Huang (Sam)
80e4953cf9 feat: 优化WebSocket连接和心跳机制
- 在main.py和standalone.py中添加ws_ping_interval和ws_ping_timeout配置
- 调整ws.py中的心跳发送逻辑,先发送ping再等待
- 在host_client中优化消息处理,使用任务队列处理转发请求
- 更新WebTool以适配新的API格式并增加搜索结果限制
- 在agent.py中添加日期显示和web调用次数限制
- 修复bot/handler.py中的事件循环问题
2026-03-28 15:53:44 +08:00
15 changed files with 292 additions and 89 deletions

8
.gitignore vendored
View File

@ -70,3 +70,11 @@ dmypy.json
# Ruff # Ruff
.ruff_cache/ .ruff_cache/
# Runtime data (sessions, audit logs, scheduled jobs)
data/
# Legacy paths (pre-consolidation)
sessions.json
scheduled_jobs.json
audit/

119
README.md
View File

@ -4,27 +4,55 @@ Feishu bot that lets users control Claude Code CLI from their phone.
## Architecture ## Architecture
PhoneWork uses a **Router + Host Client** architecture that supports both single-machine and multi-host deployments:
``` ```
┌─────────────┐ WebSocket ┌──────────────┐ LangChain ┌─────────────┐ ┌─────────────────┐ ┌──────────┐ WebSocket ┌────────────────────────────────────┐
│ Feishu │ ◄──────────────► │ FastAPI │ ◄──────────────► │ LLM API │ │ Feishu App │ │ Feishu │◄────────────►│ Router (public VPS) │
│ (client) │ │ (server) │ │ (ZhipuAI) │ │ (User's Phone) │◄───────►│ Cloud │ │ - Feishu event handler │
└─────────────┘ └──────────────┘ └─────────────┘ └─────────────────┘ └──────────┘ │ - Router LLM (routing only) │
│ - Node registry + active node map │
└───────────┬────────────────────────┘
┌─────────────┐ │ WebSocket (host clients connect in)
│ Claude Code │ ┌───────────┴────────────────────────┐
│ (headless) │ │ │
└─────────────┘ ┌──────────▼──────────┐ ┌────────────▼────────┐
│ Host Client A │ │ Host Client B │
│ (home-pc) │ │ (work-server) │
│ - Mailboy LLM │ │ - Mailboy LLM │
│ - CC sessions │ │ - CC sessions │
│ - Shell / files │ │ - Shell / files │
└─────────────────────┘ └─────────────────────┘
``` ```
**Key design decisions:**
- Host clients connect TO the router (outbound WebSocket) — NAT-transparent
- A user can be registered on multiple nodes simultaneously
- The **router LLM** decides *which node* to route each message to
- The **node mailboy LLM** handles the full orchestration loop
- Each node maintains its own conversation history per user
**Deployment modes:**
- **Standalone (`python standalone.py`):** Runs router + host client at localhost. Same architecture, simpler setup for single-machine use.
- **Multi-host:** Router on a public VPS, host clients behind NAT on different machines.
**Components:** **Components:**
| Module | Purpose | | Module | Purpose |
|--------|---------| |--------|---------|
| `main.py` | FastAPI entry point, starts WebSocket client + session manager + scheduler | | `standalone.py` | Single-process entry point: runs router + host client together |
| `main.py` | FastAPI entry point for router-only mode |
| `shared/protocol.py` | Wire protocol for router-host communication |
| `router/main.py` | FastAPI app factory, mounts `/ws/node` endpoint |
| `router/nodes.py` | Node registry, connection management, user-to-node mapping |
| `router/ws.py` | WebSocket endpoint for host clients, heartbeat, message routing |
| `router/rpc.py` | Request correlation with asyncio.Future, timeout handling |
| `router/routing_agent.py` | Single-shot routing LLM to decide which node handles each message |
| `host_client/main.py` | WebSocket client connecting to router, message handling, reconnection |
| `host_client/config.py` | Host client configuration loader |
| `bot/handler.py` | Receives Feishu events via long-connection WebSocket | | `bot/handler.py` | Receives Feishu events via long-connection WebSocket |
| `bot/feishu.py` | Sends text/file/card replies back to Feishu | | `bot/feishu.py` | Sends text/file replies back to Feishu |
| `bot/commands.py` | Slash command handler (`/new`, `/status`, `/shell`, `/remind`, `/tasks`, etc.) | | `bot/commands.py` | Slash command handler (`/new`, `/status`, `/shell`, `/remind`, `/tasks`, `/nodes`, `/node`) |
| `orchestrator/agent.py` | LangChain agent with per-user history + direct/smart mode + direct Q&A | | `orchestrator/agent.py` | LangChain agent with per-user history + direct/smart mode + direct Q&A |
| `orchestrator/tools.py` | Tools: session management, shell, file ops, web search, scheduler, task status | | `orchestrator/tools.py` | Tools: session management, shell, file ops, web search, scheduler, task status |
| `agent/manager.py` | Session registry with persistence, idle timeout, and auto-background tasks | | `agent/manager.py` | Session registry with persistence, idle timeout, and auto-background tasks |
@ -122,6 +150,33 @@ ALLOWED_OPEN_IDS:
# Optional: 秘塔AI Search API key for web search functionality # Optional: 秘塔AI Search API key for web search functionality
# Get your key at: https://metaso.cn/search-api/api-keys # Get your key at: https://metaso.cn/search-api/api-keys
METASO_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx METASO_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# Optional: Multi-host mode configuration
# Set ROUTER_MODE to true to enable router mode (deploy on public VPS)
ROUTER_MODE: false
ROUTER_SECRET: your-shared-secret-for-router-host-auth
```
### Host Client Configuration (for multi-host mode)
Create `host_config.yaml` for each host client:
```yaml
NODE_ID: home-pc
DISPLAY_NAME: Home PC
ROUTER_URL: wss://router.example.com/ws/node
ROUTER_SECRET: <shared_secret>
OPENAI_BASE_URL: https://open.bigmodel.cn/api/paas/v4/
OPENAI_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_MODEL: glm-4.7
WORKING_DIR: C:/Users/me/projects
METASO_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# Which Feishu open_ids this node serves
SERVES_USERS:
- ou_abc123def456
``` ```
--- ---
@ -162,6 +217,8 @@ Active sessions: `GET /sessions`
| `/shell <cmd>` | Run a shell command directly (bypasses LLM) | | `/shell <cmd>` | Run a shell command directly (bypasses LLM) |
| `/remind <time> <msg>` | Set a reminder (e.g., `/remind 10m check build`) | | `/remind <time> <msg>` | Set a reminder (e.g., `/remind 10m check build`) |
| `/tasks` | List background tasks with status | | `/tasks` | List background tasks with status |
| `/nodes` | List connected host nodes (multi-host mode) |
| `/node <name>` | Switch active node (multi-host mode) |
| `/help` | Show command reference | | `/help` | Show command reference |
### Message Routing Modes ### Message Routing Modes
@ -240,3 +297,39 @@ Claude Code slash commands (like `/help`, `/clear`, `/compact`, `/cost`) are pas
- Schedule recurring reminders - Schedule recurring reminders
- Notifications delivered to Feishu - Notifications delivered to Feishu
- Persistent across server restarts - Persistent across server restarts
### Multi-Host Architecture (Milestone 3)
#### Deployment Options
**Single-Machine Mode:**
```bash
python standalone.py
```
Runs both router and host client in one process. Identical UX to pre-M3 setup.
**Router Mode (Public VPS):**
```bash
# Set ROUTER_MODE: true in keyring.yaml
python main.py
```
Runs only the router: Feishu handler + routing LLM + node registry.
**Host Client Mode (Behind NAT):**
```bash
# Create host_config.yaml with ROUTER_URL and ROUTER_SECRET
python -m host_client.main
```
Connects to router via WebSocket, runs full mailboy stack locally.
#### Node Management
- `/nodes` — View all connected host nodes with status
- `/node <name>` — Switch active node for your user
- Automatic routing: LLM decides which node handles each message
- Health monitoring: Router tracks node heartbeats
- Reconnection: Host clients auto-reconnect on disconnect
#### Security
- Shared secret authentication between router and host clients
- User isolation: Each node only serves configured users
- Path sandboxing: Sessions restricted to WORKING_DIR

View File

@ -1,9 +1,9 @@
# PhoneWork — Roadmap # PhoneWork — Roadmap
## Milestone 2: Mailboy as a Versatile Assistant ## Milestone 2: Mailboy as a Versatile Assistant (COMPLETED)
**Goal:** Elevate the mailboy (GLM-4.7 orchestrator) from a mere Claude Code relay into a **Goal:** Elevate the mailboy (GLM-4.7 orchestrator) from a mere Claude Code relay into a
fully capable phone assistant. Users should be able to control their machine, manage files, fully capable phone assistant. Users can control their machine, manage files,
search the web, get direct answers, and track long-running tasks — all without necessarily search the web, get direct answers, and track long-running tasks — all without necessarily
opening a Claude Code session. opening a Claude Code session.
@ -177,21 +177,21 @@ args: action ("remind" | "repeat"), delay_seconds (int), interval_seconds (int),
## Verification Checklist ## Verification Checklist
- [ ] M2.1: Ask "what is a Python generator?" — mailboy replies directly, no tool call - [x] M2.1: Ask "what is a Python generator?" — mailboy replies directly, no tool call
- [ ] M2.2: Send "check git status in todo_app" — `ShellTool` runs, output returned - [x] M2.2: Send "check git status in todo_app" — `ShellTool` runs, output returned
- [ ] M2.2: Send "rm -rf /" — blocked by safety guard - [x] M2.2: Send "rm -rf /" — blocked by safety guard
- [ ] M2.3: Send "show me the last 50 lines of audit/abc123.jsonl" — file content returned - [x] M2.3: Send "show me the last 50 lines of audit/abc123.jsonl" — file content returned
- [ ] M2.3: Send "send me the sessions.json file" — file arrives in Feishu chat - [x] M2.3: Send "send me the sessions.json file" — file arrives in Feishu chat
- [ ] M2.4: Start a long CC task (e.g. `--timeout 120`) — bot replies immediately, notifies on finish - [x] M2.4: Start a long CC task (e.g. `--timeout 120`) — bot replies immediately, notifies on finish
- [ ] M2.4: `/tasks` — lists running task with elapsed time - [x] M2.4: `/tasks` — lists running task with elapsed time
- [ ] M2.5: "Python 3.13 有哪些新特性?" — `web ask` returns RAG answer from metaso - [x] M2.5: "Python 3.13 有哪些新特性?" — `web ask` returns RAG answer from metaso
- [ ] M2.5: "帮我读取这个URL: https://example.com" — page content extracted as markdown - [x] M2.5: "帮我读取这个URL: https://example.com" — page content extracted as markdown
- [ ] M2.6: `/remind 10m deploy check` — 10 min later, message arrives in Feishu - [x] M2.6: `/remind 10m deploy check` — 10 min later, message arrives in Feishu
--- ---
--- ---
## Milestone 3: Multi-Host Architecture (Router / Host Client Split) ## Milestone 3: Multi-Host Architecture (Router / Host Client Split) (COMPLETED)
**Goal:** Split PhoneWork into two deployable components — a public-facing **Router** and **Goal:** Split PhoneWork into two deployable components — a public-facing **Router** and
one or more **Host Clients** behind NAT. A user can be served by multiple nodes simultaneously. one or more **Host Clients** behind NAT. A user can be served by multiple nodes simultaneously.
@ -519,16 +519,16 @@ PhoneWork/
## M3 Verification Checklist ## M3 Verification Checklist
- [ ] `python standalone.py` — works identically to current `python main.py` - [x] `python standalone.py` — works identically to current `python main.py`
- [ ] Router starts, host client connects, registration logged - [x] Router starts, host client connects, registration logged
- [ ] Feishu message → routing LLM selects node → forwarded → reply returned - [x] Feishu message → routing LLM selects node → forwarded → reply returned
- [ ] `/nodes` shows all connected nodes with active marker - [x] `/nodes` shows all connected nodes with active marker
- [ ] `/node work-server` — switches active node, confirmed in next message - [x] `/node work-server` — switches active node, confirmed in next message
- [ ] Two nodes serving same user — message routed to active node - [x] Two nodes serving same user — message routed to active node
- [ ] Kill host client → router marks offline, user sees "Node home-pc is offline" - [x] Kill host client → router marks offline, user sees "Node home-pc is offline"
- [ ] Host client reconnects → re-registered, messages flow again - [x] Host client reconnects → re-registered, messages flow again
- [ ] Long CC task on node finishes → router forwards completion notification to Feishu - [x] Long CC task on node finishes → router forwards completion notification to Feishu
- [ ] Wrong `ROUTER_SECRET` → connection rejected with 401 - [x] Wrong `ROUTER_SECRET` → connection rejected with 401
--- ---

View File

@ -10,7 +10,7 @@ from typing import Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
AUDIT_DIR = Path(__file__).parent.parent / "audit" AUDIT_DIR = Path(__file__).parent.parent / "data" / "audit"
def _ensure_audit_dir() -> None: def _ensure_audit_dir() -> None:

View File

@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
DEFAULT_IDLE_TIMEOUT = 30 * 60 DEFAULT_IDLE_TIMEOUT = 30 * 60
DEFAULT_CC_TIMEOUT = 300.0 DEFAULT_CC_TIMEOUT = 300.0
PERSISTENCE_FILE = Path(__file__).parent.parent / "sessions.json" PERSISTENCE_FILE = Path(__file__).parent.parent / "data" / "sessions.json"
@dataclass @dataclass
@ -105,7 +105,7 @@ class SessionManager:
if cc_timeout > 60: if cc_timeout > 60:
from agent.task_runner import task_runner from agent.task_runner import task_runner
from orchestrator.tools import get_current_chat from orchestrator.tools import get_current_chat, set_current_chat, set_current_user
chat_id = get_current_chat() chat_id = get_current_chat()
@ -126,10 +126,29 @@ class SessionManager:
) )
return output return output
async def on_task_complete(task) -> None:
if not chat_id or not user_id or not task.result:
return
set_current_user(user_id)
set_current_chat(chat_id)
from orchestrator.agent import agent
follow_up = (
f"CC task completed. Output:\n{task.result}\n\n"
f"Original request was: {message}\n\n"
"If the user asked you to send a file, use send_file now. "
"Otherwise just acknowledge completion."
)
reply = await agent.run(user_id, follow_up)
if reply:
from bot.feishu import send_text
await send_text(chat_id, "chat_id", reply)
task_id = await task_runner.submit( task_id = await task_runner.submit(
run_task, run_task(),
description=f"CC session {conv_id}: {message[:50]}", description=f"CC session {conv_id}: {message[:50]}",
notify_chat_id=chat_id, notify_chat_id=chat_id,
user_id=user_id,
on_complete=on_task_complete,
) )
return f"⏳ Task #{task_id} started (timeout: {int(cc_timeout)}s). I'll notify you when it's done." return f"⏳ Task #{task_id} started (timeout: {int(cc_timeout)}s). I'll notify you when it's done."
@ -183,6 +202,7 @@ class SessionManager:
def _save(self) -> None: def _save(self) -> None:
try: try:
data = {cid: s.to_dict() for cid, s in self._sessions.items()} data = {cid: s.to_dict() for cid, s in self._sessions.items()}
PERSISTENCE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f: with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2) json.dump(data, f, indent=2)
logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE) logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE)

View File

@ -14,7 +14,7 @@ from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
PERSISTENCE_FILE = Path(__file__).parent.parent / "scheduled_jobs.json" PERSISTENCE_FILE = Path(__file__).parent.parent / "data" / "scheduled_jobs.json"
class JobStatus(str, Enum): class JobStatus(str, Enum):
@ -98,6 +98,7 @@ class Scheduler:
"""Save jobs to persistence file.""" """Save jobs to persistence file."""
try: try:
data = {jid: job.to_dict() for jid, job in self._jobs.items()} data = {jid: job.to_dict() for jid, job in self._jobs.items()}
PERSISTENCE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f: with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False) json.dump(data, f, indent=2, ensure_ascii=False)
except Exception: except Exception:

View File

@ -57,6 +57,7 @@ class TaskRunner:
description: str, description: str,
notify_chat_id: Optional[str] = None, notify_chat_id: Optional[str] = None,
user_id: Optional[str] = None, user_id: Optional[str] = None,
on_complete: Optional[Callable[[BackgroundTask], Awaitable[None]]] = None,
) -> str: ) -> str:
"""Submit a coroutine as a background task.""" """Submit a coroutine as a background task."""
task_id = str(uuid.uuid4())[:8] task_id = str(uuid.uuid4())[:8]
@ -72,11 +73,11 @@ class TaskRunner:
async with self._lock: async with self._lock:
self._tasks[task_id] = task self._tasks[task_id] = task
asyncio.create_task(self._run_task(task_id, coro)) asyncio.create_task(self._run_task(task_id, coro, on_complete))
logger.info("Submitted background task %s: %s", task_id, description) logger.info("Submitted background task %s: %s", task_id, description)
return task_id return task_id
async def _run_task(self, task_id: str, coro: Awaitable[Any]) -> None: async def _run_task(self, task_id: str, coro: Awaitable[Any], on_complete: Optional[Callable[[BackgroundTask], Awaitable[None]]] = None) -> None:
"""Execute a task and send notification on completion.""" """Execute a task and send notification on completion."""
async with self._lock: async with self._lock:
task = self._tasks.get(task_id) task = self._tasks.get(task_id)
@ -107,6 +108,12 @@ class TaskRunner:
else: else:
await self._send_notification(task) await self._send_notification(task)
if on_complete and task.status == TaskStatus.COMPLETED:
try:
await on_complete(task)
except Exception:
logger.exception("on_complete callback failed for task %s", task_id)
async def _send_notification(self, task: BackgroundTask) -> None: async def _send_notification(self, task: BackgroundTask) -> None:
"""Send Feishu notification about task completion.""" """Send Feishu notification about task completion."""
from bot.feishu import send_text from bot.feishu import send_text

View File

@ -184,17 +184,15 @@ async def send_file(receive_id: str, receive_id_type: str, file_path: str, file_
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
# Step 1: Upload file → get file_key # Step 1: Upload file → get file_key
with open(path, "rb") as f:
file_data = f.read()
def _upload(): def _upload():
with open(path, "rb") as f:
req = ( req = (
CreateFileRequest.builder() CreateFileRequest.builder()
.request_body( .request_body(
CreateFileRequestBody.builder() CreateFileRequestBody.builder()
.file_type(file_type) .file_type(file_type)
.file_name(file_name) .file_name(file_name)
.file(file_data) .file(f)
.build() .build()
) )
.build() .build()

View File

@ -184,6 +184,14 @@ def start_websocket_client(loop: asyncio.AbstractEventLoop) -> None:
backoff = 1.0 backoff = 1.0
max_backoff = 60.0 max_backoff = 60.0
# lark_oapi.ws.client captures the event loop at module import time.
# In standalone mode uvicorn already owns the main loop, so we create
# a fresh loop for this thread and redirect the lark module to use it.
thread_loop = asyncio.new_event_loop()
asyncio.set_event_loop(thread_loop)
import lark_oapi.ws.client as _lark_ws_client
_lark_ws_client.loop = thread_loop
while True: while True:
try: try:
_ws_connected = False _ws_connected = False

View File

@ -43,6 +43,7 @@ class NodeClient:
self._running = False self._running = False
self._last_heartbeat = time.time() self._last_heartbeat = time.time()
self._reconnect_delay = 1.0 self._reconnect_delay = 1.0
self._forward_tasks: set[asyncio.Task] = set()
async def connect(self) -> bool: async def connect(self) -> bool:
"""Connect to the router WebSocket.""" """Connect to the router WebSocket."""
@ -53,9 +54,9 @@ class NodeClient:
try: try:
self.ws = await websockets.connect( self.ws = await websockets.connect(
self.config.router_url, self.config.router_url,
extra_headers=headers, additional_headers=headers,
ping_interval=30, ping_interval=20,
ping_timeout=10, ping_timeout=60,
) )
logger.info("Connected to router: %s", self.config.router_url) logger.info("Connected to router: %s", self.config.router_url)
self._reconnect_delay = 1.0 self._reconnect_delay = 1.0
@ -145,17 +146,9 @@ class NodeClient:
except Exception as e: except Exception as e:
logger.error("Failed to send status: %s", e) logger.error("Failed to send status: %s", e)
async def handle_message(self, data: str) -> None: async def handle_message_decoded(self, msg: Any) -> None:
"""Handle an incoming message from the router.""" """Handle an already-decoded message from the router."""
try: if isinstance(msg, Heartbeat):
msg = decode(data)
except Exception as e:
logger.error("Failed to decode message: %s", e)
return
if isinstance(msg, ForwardRequest):
await self.handle_forward(msg)
elif isinstance(msg, Heartbeat):
if msg.type == "ping": if msg.type == "ping":
if self.ws: if self.ws:
try: try:
@ -165,7 +158,7 @@ class NodeClient:
elif msg.type == "pong": elif msg.type == "pong":
self._last_heartbeat = time.time() self._last_heartbeat = time.time()
else: else:
logger.debug("Received message type: %s", msg.type) logger.debug("Received message type: %s", type(msg).__name__)
async def receive_loop(self) -> None: async def receive_loop(self) -> None:
"""Main receive loop for incoming messages.""" """Main receive loop for incoming messages."""
@ -174,7 +167,20 @@ class NodeClient:
try: try:
async for data in self.ws: async for data in self.ws:
await self.handle_message(data) try:
msg = decode(data)
except Exception as e:
logger.error("Failed to decode message: %s", e)
continue
if isinstance(msg, ForwardRequest):
# Dispatch as a task so pings are handled without waiting
# for the full agent run to complete.
task = asyncio.create_task(self.handle_forward(msg))
self._forward_tasks.add(task)
task.add_done_callback(self._forward_tasks.discard)
else:
await self.handle_message_decoded(msg)
except websockets.ConnectionClosed as e: except websockets.ConnectionClosed as e:
logger.warning("Connection closed: %s", e) logger.warning("Connection closed: %s", e)
except Exception as e: except Exception as e:
@ -184,14 +190,14 @@ class NodeClient:
"""Periodic heartbeat loop.""" """Periodic heartbeat loop."""
while self._running: while self._running:
await asyncio.sleep(30) await asyncio.sleep(30)
if self.ws and self.ws.open: if self.ws:
await self.send_heartbeat() await self.send_heartbeat()
async def status_loop(self) -> None: async def status_loop(self) -> None:
"""Periodic status update loop.""" """Periodic status update loop."""
while self._running: while self._running:
await asyncio.sleep(60) await asyncio.sleep(60)
if self.ws and self.ws.open: if self.ws:
await self.send_status() await self.send_status()
async def run(self) -> None: async def run(self) -> None:
@ -243,6 +249,10 @@ class NodeClient:
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the client.""" """Stop the client."""
self._running = False self._running = False
for task in list(self._forward_tasks):
task.cancel()
if self._forward_tasks:
await asyncio.gather(*self._forward_tasks, return_exceptions=True)
if self.ws: if self.ws:
await self.ws.close() await self.ws.close()
await manager.stop() await manager.stop()

View File

@ -111,4 +111,6 @@ if __name__ == "__main__":
port=8000, port=8000,
reload=False, reload=False,
log_level="info", log_level="info",
ws_ping_interval=20,
ws_ping_timeout=60,
) )

View File

@ -30,6 +30,8 @@ logger = logging.getLogger(__name__)
SYSTEM_PROMPT_TEMPLATE = """You are PhoneWork, an AI assistant that helps users control Claude Code \ SYSTEM_PROMPT_TEMPLATE = """You are PhoneWork, an AI assistant that helps users control Claude Code \
from their phone via Feishu (飞书). from their phone via Feishu (飞书).
Today's date: {today}
You manage Claude Code sessions. Each session has a conv_id and runs in a project directory. You manage Claude Code sessions. Each session has a conv_id and runs in a project directory.
Base working directory: {working_dir} Base working directory: {working_dir}
@ -46,12 +48,20 @@ Your responsibilities:
4. Close session: call `close_conversation`. 4. Close session: call `close_conversation`.
5. GENERAL QUESTIONS: If the user asks a general question (not about a specific project or file), \ 5. GENERAL QUESTIONS: If the user asks a general question (not about a specific project or file), \
answer directly using your own knowledge. Do NOT create a session for simple Q&A. answer directly using your own knowledge. Do NOT create a session for simple Q&A.
6. WEB / SEARCH: Use the `web` tool when the user needs current information. \
Call it ONCE (or at most twice with a refined query). Then synthesize and reply \
do NOT keep searching in a loop. If the first search returns results, use them.
7. BACKGROUND TASKS: When `create_conversation` or `send_to_conversation` returns a \
"Task #... started" message, the task is running in the background. \
Immediately reply to the user that the task has started and they will be notified. \
Do NOT call `task_status` in a loop waiting for it the system sends a notification when done.
Guidelines: Guidelines:
- Relay Claude Code's output verbatim. - Relay Claude Code's output verbatim.
- If no active session and the user sends a task without naming a directory, ask them which project. - If no active session and the user sends a task without naming a directory, ask them which project.
- For general knowledge questions (e.g., "what is a Python generator?", "explain async/await"), \ - For general knowledge questions (e.g., "what is a Python generator?", "explain async/await"), \
answer directly without creating a session. answer directly without creating a session.
- After using any tool, always produce a final text reply to the user. Never end a turn on a tool call.
- Keep your own words brief let Claude Code's output speak. - Keep your own words brief let Claude Code's output speak.
- Reply in the same language the user uses (Chinese or English). - Reply in the same language the user uses (Chinese or English).
""" """
@ -111,6 +121,8 @@ class OrchestrationAgent:
self._passthrough: dict[str, bool] = defaultdict(lambda: False) self._passthrough: dict[str, bool] = defaultdict(lambda: False)
def _build_system_prompt(self, user_id: str) -> str: def _build_system_prompt(self, user_id: str) -> str:
from datetime import date
today = date.today().strftime("%Y-%m-%d")
conv_id = self._active_conv[user_id] conv_id = self._active_conv[user_id]
if conv_id: if conv_id:
active_line = f"ACTIVE SESSION: conv_id={conv_id!r} ← use this for all follow-up messages" active_line = f"ACTIVE SESSION: conv_id={conv_id!r} ← use this for all follow-up messages"
@ -119,6 +131,7 @@ class OrchestrationAgent:
return SYSTEM_PROMPT_TEMPLATE.format( return SYSTEM_PROMPT_TEMPLATE.format(
working_dir=WORKING_DIR, working_dir=WORKING_DIR,
active_session_line=active_line, active_session_line=active_line,
today=today,
) )
def get_active_conv(self, user_id: str) -> Optional[str]: def get_active_conv(self, user_id: str) -> Optional[str]:
@ -181,6 +194,8 @@ class OrchestrationAgent:
reply = "" reply = ""
try: try:
web_calls = 0
task_status_calls = 0
for iteration in range(MAX_ITERATIONS): for iteration in range(MAX_ITERATIONS):
logger.debug(" LLM call #%d", iteration) logger.debug(" LLM call #%d", iteration)
ai_msg: AIMessage = await self._llm_with_tools.ainvoke(messages) ai_msg: AIMessage = await self._llm_with_tools.ainvoke(messages)
@ -201,6 +216,26 @@ class OrchestrationAgent:
) )
logger.info("%s(%s)", tool_name, args_summary) logger.info("%s(%s)", tool_name, args_summary)
if tool_name == "web":
web_calls += 1
if web_calls > 2:
result = "Web search limit reached. Synthesize from results already obtained."
logger.warning(" web call limit exceeded, blocking")
messages.append(
ToolMessage(content=str(result), tool_call_id=tool_id)
)
continue
if tool_name == "task_status":
task_status_calls += 1
if task_status_calls > 1:
result = "Task is still running in the background. Stop polling and tell the user they will be notified when it completes."
logger.warning(" task_status poll limit exceeded, blocking")
messages.append(
ToolMessage(content=str(result), tool_call_id=tool_id)
)
continue
tool_obj = _TOOL_MAP.get(tool_name) tool_obj = _TOOL_MAP.get(tool_name)
if tool_obj is None: if tool_obj is None:
result = f"Unknown tool: {tool_name}" result = f"Unknown tool: {tool_name}"

View File

@ -553,18 +553,31 @@ class WebTool(BaseTool):
payload = { payload = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 1, "id": 1,
"method": "metaso_web_search", "method": "tools/call",
"params": {"query": query, "scope": scope or "webpage"}, "params": {
"name": "metaso_web_search",
"arguments": {"q": query, "scope": scope or "webpage", "size": 5, "includeSummary": True},
},
} }
resp = await client.post(base_url, json=payload, headers=headers) resp = await client.post(base_url, json=payload, headers=headers)
data = resp.json() data = resp.json()
if "error" in data: if "error" in data:
return json.dumps({"error": data["error"]}, ensure_ascii=False) return json.dumps({"error": data["error"]}, ensure_ascii=False)
results = data.get("result", {}).get("results", [])[:5] content_text = data.get("result", {}).get("content", [{}])[0].get("text", "")
result_data = json.loads(content_text) if content_text else {}
webpages = result_data.get("webpages", [])[:5]
output = [] output = []
for r in results: for r in webpages:
output.append(f"**{r.get('title', 'No title')}**\n{r.get('snippet', '')}\n{r.get('url', '')}") date = r.get("date", "")
return json.dumps({"results": "\n\n".join(output)[:max_chars]}, ensure_ascii=False) title = r.get("title", "No title")
snippet = r.get("snippet", "")[:300]
link = r.get("link", "")
output.append(f"[{date}] **{title}**\n{snippet}\n{link}")
total = result_data.get("total", 0)
return json.dumps({
"total": total,
"results": "\n\n".join(output)[:max_chars],
}, ensure_ascii=False)
elif action == "fetch": elif action == "fetch":
if not url: if not url:
@ -572,15 +585,18 @@ class WebTool(BaseTool):
payload = { payload = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 1, "id": 1,
"method": "metaso_web_reader", "method": "tools/call",
"params": {"url": url, "format": "markdown"}, "params": {
"name": "metaso_web_reader",
"arguments": {"url": url, "format": "markdown"},
},
} }
resp = await client.post(base_url, json=payload, headers=headers) resp = await client.post(base_url, json=payload, headers=headers)
data = resp.json() data = resp.json()
if "error" in data: if "error" in data:
return json.dumps({"error": data["error"]}, ensure_ascii=False) return json.dumps({"error": data["error"]}, ensure_ascii=False)
content = data.get("result", {}).get("content", "") content_text = data.get("result", {}).get("content", [{}])[0].get("text", "")
return json.dumps({"content": content[:max_chars]}, ensure_ascii=False) return json.dumps({"content": content_text[:max_chars]}, ensure_ascii=False)
elif action == "ask": elif action == "ask":
if not query: if not query:
@ -588,15 +604,18 @@ class WebTool(BaseTool):
payload = { payload = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 1, "id": 1,
"method": "metaso_chat", "method": "tools/call",
"params": {"query": query}, "params": {
"name": "metaso_chat",
"arguments": {"message": query},
},
} }
resp = await client.post(base_url, json=payload, headers=headers) resp = await client.post(base_url, json=payload, headers=headers)
data = resp.json() data = resp.json()
if "error" in data: if "error" in data:
return json.dumps({"error": data["error"]}, ensure_ascii=False) return json.dumps({"error": data["error"]}, ensure_ascii=False)
answer = data.get("result", {}).get("answer", "") content_text = data.get("result", {}).get("content", [{}])[0].get("text", "")
return json.dumps({"answer": answer[:max_chars]}, ensure_ascii=False) return json.dumps({"answer": content_text[:max_chars]}, ensure_ascii=False)
else: else:
return json.dumps({"error": f"Unknown action: {action}"}, ensure_ascii=False) return json.dumps({"error": f"Unknown action: {action}"}, ensure_ascii=False)

View File

@ -53,11 +53,11 @@ async def ws_node_endpoint(websocket: WebSocket) -> None:
"""Send periodic pings to the host client.""" """Send periodic pings to the host client."""
try: try:
while True: while True:
await asyncio.sleep(30)
try: try:
await websocket.send_text(encode(Heartbeat(type="ping"))) await websocket.send_text(encode(Heartbeat(type="ping")))
except Exception: except Exception:
break break
await asyncio.sleep(30)
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass

View File

@ -41,6 +41,8 @@ async def run_standalone() -> None:
host="0.0.0.0", host="0.0.0.0",
port=8000, port=8000,
log_level="info", log_level="info",
ws_ping_interval=20,
ws_ping_timeout=60,
) )
server = uvicorn.Server(config_obj) server = uvicorn.Server(config_obj)