Compare commits

..

2 Commits

Author SHA1 Message Date
Yuyao Huang (Sam)
64297e5e27 feat: 实现多主机架构的核心组件
新增路由器、主机客户端和共享协议模块,支持多主机部署模式:
- 路由器作为中央节点管理主机连接和消息路由
- 主机客户端作为工作节点运行本地代理
- 共享协议定义通信消息格式
- 新增独立运行模式standalone.py
- 更新配置系统支持路由模式
2026-03-28 14:08:47 +08:00
Yuyao Huang (Sam)
8ecc701d5e feat: 添加任务调度器、后台任务运行器及多种工具支持
实现后台任务调度器(scheduler.py)和任务运行器(task_runner.py),支持长时间运行任务的异步执行和状态跟踪
新增多种工具支持:Shell命令执行、文件操作(读写/搜索/发送)、网页搜索/问答、定时提醒等
扩展README和ROADMAP文档,描述新功能和未来多主机架构规划
在配置文件中添加METASO_API_KEY支持秘塔AI搜索功能
优化代理逻辑,自动识别通用问题直接回答而不创建会话
2026-03-28 13:45:20 +08:00
25 changed files with 2836 additions and 12 deletions

View File

@ -21,14 +21,16 @@ Feishu bot that lets users control Claude Code CLI from their phone.
| Module | Purpose | | Module | Purpose |
|--------|---------| |--------|---------|
| `main.py` | FastAPI entry point, starts WebSocket client + session manager | | `main.py` | FastAPI entry point, starts WebSocket client + session manager + scheduler |
| `bot/handler.py` | Receives Feishu events via long-connection WebSocket | | `bot/handler.py` | Receives Feishu events via long-connection WebSocket |
| `bot/feishu.py` | Sends text/file/card replies back to Feishu | | `bot/feishu.py` | Sends text/file/card replies back to Feishu |
| `bot/commands.py` | Slash command handler (`/new`, `/status`, `/switch`, `/direct`, `/smart`, etc.) | | `bot/commands.py` | Slash command handler (`/new`, `/status`, `/shell`, `/remind`, `/tasks`, etc.) |
| `orchestrator/agent.py` | LangChain agent with per-user history + direct/smart mode toggle | | `orchestrator/agent.py` | LangChain agent with per-user history + direct/smart mode + direct Q&A |
| `orchestrator/tools.py` | Tools: `create_conversation`, `send_to_conversation`, `list_conversations`, `close_conversation` | | `orchestrator/tools.py` | Tools: session management, shell, file ops, web search, scheduler, task status |
| `agent/manager.py` | Session registry with persistence and idle timeout reaper | | `agent/manager.py` | Session registry with persistence, idle timeout, and auto-background tasks |
| `agent/pty_process.py` | Runs `claude -p` headlessly, manages session continuity via `--resume` | | `agent/pty_process.py` | Runs `claude -p` headlessly, manages session continuity via `--resume` |
| `agent/task_runner.py` | Background task runner with Feishu notifications |
| `agent/scheduler.py` | Reminder scheduler with persistence |
| `agent/audit.py` | Audit log of all interactions | | `agent/audit.py` | Audit log of all interactions |
**Flow:** User message → Feishu WebSocket → Handler → (passthrough or LLM) → Session Manager → `claude -p` → Response back to Feishu **Flow:** User message → Feishu WebSocket → Handler → (passthrough or LLM) → Session Manager → `claude -p` → Response back to Feishu
@ -116,6 +118,10 @@ WORKING_DIR: C:/Users/yourname/projects
# Leave empty to allow all users. # Leave empty to allow all users.
ALLOWED_OPEN_IDS: ALLOWED_OPEN_IDS:
- ou_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx - ou_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# Optional: 秘塔AI Search API key for web search functionality
# Get your key at: https://metaso.cn/search-api/api-keys
METASO_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
``` ```
--- ---
@ -153,6 +159,9 @@ Active sessions: `GET /sessions`
| `/close [n]` | Close active session (or session `<n>`) | | `/close [n]` | Close active session (or session `<n>`) |
| `/direct` | Direct mode: messages go straight to Claude Code (no LLM overhead) | | `/direct` | Direct mode: messages go straight to Claude Code (no LLM overhead) |
| `/smart` | Smart mode: messages go through LLM for intelligent routing (default) | | `/smart` | Smart mode: messages go through LLM for intelligent routing (default) |
| `/shell <cmd>` | Run a shell command directly (bypasses LLM) |
| `/remind <time> <msg>` | Set a reminder (e.g., `/remind 10m check build`) |
| `/tasks` | List background tasks with status |
| `/help` | Show command reference | | `/help` | Show command reference |
### Message Routing Modes ### Message Routing Modes
@ -194,3 +203,40 @@ Claude Code slash commands (like `/help`, `/clear`, `/compact`, `/cost`) are pas
- **User allowlist** - Configure which Feishu users are allowed to use the bot - **User allowlist** - Configure which Feishu users are allowed to use the bot
- **Session isolation** - Each user can only see and access their own sessions - **Session isolation** - Each user can only see and access their own sessions
- **Path sandboxing** - Sessions can only run inside the allowed working directory, blocking path traversal attacks - **Path sandboxing** - Sessions can only run inside the allowed working directory, blocking path traversal attacks
### Versatile Assistant (Milestone 2)
#### Direct Q&A
- Ask general knowledge questions without creating a Claude Code session
- The LLM answers directly using its own knowledge (e.g., "what is a Python generator?")
- Automatic detection of question-like messages
#### Shell Access
- Execute shell commands remotely via `/shell` or through the LLM
- Safety guards block destructive commands (`rm -rf /`, `sudo rm`, `mkfs`, etc.)
- Configurable timeout (max 120 seconds)
#### File Operations
- **Read files** - View file content with line numbers
- **Write files** - Create or append to files
- **List directories** - Browse project structure
- **Search content** - Grep-like search across text files
- **Send files** - Deliver files directly to Feishu chat
#### Background Tasks
- Long-running tasks (timeout > 60s) automatically run in background
- Immediate acknowledgment with task ID
- Feishu notification on completion
- Track task status with `/tasks` command
#### Web Search
- Search the web via 秘塔AI Search (requires `METASO_API_KEY`)
- Fetch and extract content from URLs
- Ask questions with RAG-powered answers
- Supports multiple scopes: webpage, paper, document, video, podcast
#### Scheduling & Reminders
- Set one-time reminders: `/remind 10m check the build`
- Schedule recurring reminders
- Notifications delivered to Feishu
- Persistent across server restarts

View File

@ -187,3 +187,378 @@ args: action ("remind" | "repeat"), delay_seconds (int), interval_seconds (int),
- [ ] M2.5: "Python 3.13 有哪些新特性?" — `web ask` returns RAG answer from metaso - [ ] M2.5: "Python 3.13 有哪些新特性?" — `web ask` returns RAG answer from metaso
- [ ] M2.5: "帮我读取这个URL: https://example.com" — page content extracted as markdown - [ ] M2.5: "帮我读取这个URL: https://example.com" — page content extracted as markdown
- [ ] M2.6: `/remind 10m deploy check` — 10 min later, message arrives in Feishu - [ ] M2.6: `/remind 10m deploy check` — 10 min later, message arrives in Feishu
---
---
## Milestone 3: Multi-Host Architecture (Router / Host Client Split)
**Goal:** Split PhoneWork into two deployable components — a public-facing **Router** and
one or more **Host Clients** behind NAT. A user can be served by multiple nodes simultaneously.
Intelligence is split: the router runs a cheap LLM for routing decisions only; each node runs
the full mailboy LLM for execution. A standalone script preserves the current single-machine
experience.
### Architecture
```
┌──────────┐ WebSocket ┌────────────────────────────────────┐
│ Feishu │◄────────────►│ Router (public VPS) │
│ Cloud │ │ - Feishu event handler │
└──────────┘ │ - Router LLM (routing only) │
│ - Node registry + active node map │
│ - NO mailboy, NO sessions │
└───────────┬────────────────────────┘
│ WebSocket (host clients connect in)
┌───────────┴────────────────────────┐
│ │
┌──────────▼──────────┐ ┌────────────▼────────┐
│ Host Client A │ │ Host Client B │
│ (home-pc) │ │ (work-server) │
│ - Mailboy LLM │ │ - Mailboy LLM │
│ - CC sessions │ │ - CC sessions │
│ - Shell / files │ │ - Shell / files │
│ - Task runner │ │ - Task runner │
└─────────────────────┘ └─────────────────────┘
```
**Key design decisions:**
- Host clients connect TO the router (outbound WebSocket) — NAT-transparent
- A user can be registered on multiple nodes simultaneously
- The **router LLM** decides *which node* to route each message to (cheap, one-shot)
- The **node mailboy LLM** handles the full orchestration loop (sessions, tools, CC)
- Each node maintains its own conversation history per user
- Task completion notifications: node pushes to router → router sends to Feishu
---
### M3.1 — Shared Protocol Module
Foundation for both sides.
**`shared/protocol.py`:**
```python
@dataclass
class RegisterMessage:
type: str = "register"
node_id: str = ""
serves_users: list[str] = field(default_factory=list)
working_dir: str = ""
capabilities: list[str] = field(default_factory=list) # ["claude_code", "shell", "file_ops", "web"]
display_name: str = "" # human-readable, shown in /nodes
@dataclass
class ForwardRequest:
type: str = "forward"
id: str = "" # correlation id, router awaits matching response
user_id: str = ""
chat_id: str = ""
text: str = ""
@dataclass
class ForwardResponse:
type: str = "forward_response"
id: str = ""
reply: str = ""
error: str = ""
@dataclass
class TaskComplete:
type: str = "task_complete"
task_id: str = ""
user_id: str = ""
chat_id: str = ""
result: str = ""
@dataclass
class Heartbeat:
type: str = "ping" | "pong"
```
Serialization: JSON + type-field dispatch. Both sides import from `shared/`.
---
### M3.2 — Host Client: Full Mailboy Node
Each host client is a self-contained assistant: receives a raw user message from the router,
runs the full LLM + tool loop, returns the reply.
**Host client config** (`host_config.yaml`):
```yaml
NODE_ID: home-pc
DISPLAY_NAME: Home PC
ROUTER_URL: wss://router.example.com/ws/node
ROUTER_SECRET: <shared_secret>
# LLM for this node's mailboy
OPENAI_BASE_URL: https://open.bigmodel.cn/api/paas/v4/
OPENAI_API_KEY: <key>
OPENAI_MODEL: glm-4.7
WORKING_DIR: C:/Users/me/projects
METASO_API_KEY: <optional>
# Which Feishu open_ids this node serves (can overlap with other nodes)
SERVES_USERS:
- ou_abc123def456
- ou_xyz789
```
**Startup flow:**
1. Connect WebSocket to `ROUTER_URL` with `Authorization: Bearer <ROUTER_SECRET>`
2. Send `RegisterMessage` → router adds node to registry
3. Enter receive loop:
- `ForwardRequest` → run local mailboy LLM → send `ForwardResponse`
- `ping` → send `pong`
**What the host client runs:**
- Full `orchestrator/agent.py` (mailboy LLM, tool loop, per-user history, active session)
- Full `orchestrator/tools.py` (CC, shell, file ops, web, scheduler — all local)
- `agent/manager.py`, `agent/pty_process.py`, `agent/task_runner.py` — unchanged
Task completion flow:
- Background task finishes → host client pushes `TaskComplete` to router
- Router receives it → calls `send_text(chat_id, result)` via Feishu API
**New files:**
- `host_client/main.py` — entry point, WebSocket connect + receive loop, reconnect
- `host_client/config.py` — loads `host_config.yaml`
**Reused unchanged:**
- `orchestrator/` — entire mailboy stack moves here as-is
- `agent/` — entire session/execution stack moves here as-is
---
### M3.3 — Router: Node Registry + Routing LLM
The router is thin: Feishu integration, node registry, and a small LLM that decides which
node to forward each message to.
**Node registry** (`router/nodes.py`):
- `{node_id: NodeConnection}` — connected nodes
- `NodeConnection`: WebSocket, `node_id`, `serves_users[]`, `capabilities[]`,
`display_name`, `connected_at`, `last_heartbeat`
- `get_nodes_for_user(open_id) -> list[NodeConnection]` — may return multiple
- `get_active_node(user_id) -> NodeConnection | None` — per-user active node preference
- `set_active_node(user_id, node_id)` — updated by router LLM or `/node` command
**Router LLM** (`router/routing_agent.py`):
Lightweight, one-shot routing decision. System prompt:
```
You are a routing assistant. A user has sent a message. Choose which node to forward it to.
Connected nodes for this user:
- home-pc (ACTIVE): sessions=[todo_app, blog], capabilities=[claude_code, shell, file_ops]
- work-server: sessions=[], capabilities=[claude_code, shell]
Rules:
- If the message references an active session, route to the node owning it.
- If the user names a machine explicitly ("on work-server", "@work-server"), route there.
- If only one node is connected, route there without asking.
- If ambiguous with multiple idle nodes, ask the user to clarify.
- For meta commands (/nodes, /help), handle directly without routing.
```
One tool: `route_to(node_id: str)`. No history. No multi-step loop. Single LLM call.
**WebSocket endpoint** (`router/ws.py`):
```
GET /ws/node
Authorization: Bearer <ROUTER_SECRET>
```
- Validates secret → accepts registration → adds to registry
- Forwards `ForwardRequest` → host client
- Receives `ForwardResponse` → resolves pending `asyncio.Future`
- Receives `TaskComplete` → calls `send_text(chat_id, result)` to Feishu
- Heartbeat: ping every 30s, drop if no pong in 10s
**Request correlation** (`router/rpc.py`):
- `forward(node, user_id, chat_id, text) -> str` (reply)
- Assigns UUID `request_id`, stores `Future` in pending map
- Sends `ForwardRequest` over node's WebSocket
- Awaits `Future` with timeout (default 600s for long CC tasks)
- On `ForwardResponse`, resolves Future with `reply` or raises on `error`
**Modified files:**
- `main.py` → mounts `/ws/node`, starts `NodeRegistry`
- `bot/handler.py` → after allowlist check, calls `routing_agent.route(user_id, chat_id, text)`
instead of `agent.run(user_id, text)` directly
- `config.py` → adds `ROUTER_SECRET`, `ROUTER_LLM_*` (can be same or different model)
**New files:**
- `router/nodes.py``NodeRegistry`, `NodeConnection`
- `router/ws.py` — WebSocket endpoint
- `router/rpc.py``forward()` with future correlation
- `router/routing_agent.py` — single-shot routing LLM
---
### M3.4 — Standalone Mode Script
Single-machine users run `python standalone.py` — identical UX to today's `python main.py`.
Internally uses the full M3 architecture with both components in one process.
**`standalone.py`:**
```python
"""
Run router + host client in a single process (localhost mode).
Equivalent to the pre-M3 single-machine setup.
"""
import asyncio, secrets, uvicorn
from router.main import create_app
from host_client.main import NodeClient
async def main():
secret = secrets.token_hex(16)
router_url = "ws://127.0.0.1:8000/ws/node"
# Start FastAPI router in background
config = uvicorn.Config(create_app(router_secret=secret), host="127.0.0.1", port=8000)
server = uvicorn.Server(config)
asyncio.create_task(server.serve())
# Wait for router to be ready
await asyncio.sleep(1.0)
# Start host client connecting to localhost
client = NodeClient.from_keyring(router_url=router_url, secret=secret)
await client.run() # reconnect loop
asyncio.run(main())
```
**Config:** `standalone.py` reads the same `keyring.yaml` as today. The host client inherits
all LLM/CC config from it. User only maintains one config file.
---
### M3.5 — Node Health + User-Facing Status
**`/nodes` slash command** (handled at router, before forwarding):
```
Connected Nodes:
→ home-pc [ACTIVE] sessions=2 online 3h
work-server sessions=0 online 47m
Use "/node <name>" to switch active node.
```
**`/node <name>` slash command** — sets active node for user.
**Router `/health` updates:**
```json
{
"nodes": [
{"node_id": "home-pc", "status": "online", "users": 2, "sessions": 3},
{"node_id": "work-server", "status": "offline", "last_seen": "5m ago"}
]
}
```
**Feishu notifications on node events (sent to all affected users):**
```
⚠️ Node "home-pc" disconnected.
✅ Node "home-pc" reconnected.
```
---
## Final Project Structure (post-M3)
```
PhoneWork/
├── shared/
│ └── protocol.py # Wire protocol (shared by router + host client)
├── router/ # Deployable unit 1: public VPS
│ ├── main.py # FastAPI app factory, mounts /ws/node
│ ├── nodes.py # NodeRegistry, NodeConnection
│ ├── ws.py # WebSocket endpoint for host clients
│ ├── rpc.py # forward(node, user_id, chat_id, text) → reply
│ └── routing_agent.py # Single-shot routing LLM
├── bot/ # Part of router
│ ├── handler.py # Feishu event handler (now calls routing_agent)
│ ├── feishu.py # Send text/file/card to Feishu
│ └── commands.py # /nodes, /node, /help handled here; rest forwarded
├── host_client/ # Deployable unit 2: dev machine
│ ├── main.py # WS connect to router, receive loop, reconnect
│ └── config.py # host_config.yaml loader
├── orchestrator/ # Part of host client (full mailboy)
│ ├── agent.py # Mailboy LLM (unchanged)
│ └── tools.py # Tools: CC, shell, file ops, web, scheduler
├── agent/ # Part of host client (local execution)
│ ├── manager.py # Session registry
│ ├── pty_process.py # Claude Code runner
│ ├── task_runner.py # Background tasks
│ ├── scheduler.py # Reminders
│ └── audit.py # Audit log
├── standalone.py # Runs router + host client in one process
├── config.py # Router config (keyring.yaml)
└── requirements.txt
```
---
## M3 Implementation Order
1. **M3.1** — Shared protocol (foundation)
2. **M3.2** — Host client daemon (wrap existing mailboy + agent stack)
3. **M3.3** — Router (node registry, WS, routing LLM, refactor handler)
4. **M3.4** — Standalone script
5. **M3.5** — Node health, `/nodes`, `/node` commands
---
## M3 Verification Checklist
- [ ] `python standalone.py` — works identically to current `python main.py`
- [ ] Router starts, host client connects, registration logged
- [ ] Feishu message → routing LLM selects node → forwarded → reply returned
- [ ] `/nodes` shows all connected nodes with active marker
- [ ] `/node work-server` — switches active node, confirmed in next message
- [ ] Two nodes serving same user — message routed to active node
- [ ] Kill host client → router marks offline, user sees "Node home-pc is offline"
- [ ] Host client reconnects → re-registered, messages flow again
- [ ] Long CC task on node finishes → router forwards completion notification to Feishu
- [ ] Wrong `ROUTER_SECRET` → connection rejected with 401
---
## M3 Implementation Notes (from M2 code review)
Three concrete details discovered from reading the actual M2 code that must be handled
during M3 implementation:
### 1. `bot/commands.py` accesses node-local state directly
The current `commands.py` calls `agent._active_conv`, `manager.list_sessions()`,
`task_runner.list_tasks()`, `scheduler` — all of which move to the host client in M3.
**Resolution:** At the router, `bot/commands.py` is reduced to two commands:
`/nodes` and `/node <name>`. All other slash commands (`/new`, `/status`, `/close`,
`/switch`, `/direct`, `/smart`, `/shell`, `/tasks`, `/remind`) are forwarded to the
active node as-is — the node's mailboy handles them using its local `commands.py`.
The node's command handler remains unchanged from M2.
### 2. `chat_id` must be forwarded to the node
`bot/handler.py` calls `set_current_chat(chat_id)` before invoking the agent.
In M3, `handler.py` stays at the router but the agent (and `set_current_chat`) moves
to the node. The `chat_id` travels in `ForwardRequest` (already planned), and
`host_client/main.py` must call `set_current_chat(msg.chat_id)` before invoking the
local `agent.run()`. This is essential for `FileSendTool` and `SchedulerTool` to work.
### 3. `orchestrator/tools.py` imports `config.WORKING_DIR`
`_resolve_dir()` imports `WORKING_DIR` from root `config.py`. When `orchestrator/`
moves to the host client, this import must switch to `host_client/config.py`.
In standalone mode, `host_client/config.py` can re-export from root `config.py` to
keep a single `keyring.yaml`.

View File

@ -103,6 +103,36 @@ class SessionManager:
session.started = True session.started = True
self._save() self._save()
if cc_timeout > 60:
from agent.task_runner import task_runner
from orchestrator.tools import get_current_chat
chat_id = get_current_chat()
async def run_task():
output = await run_claude(
message,
cwd=cwd,
cc_session_id=cc_session_id,
resume=not first_message,
timeout=cc_timeout,
)
log_interaction(
conv_id=conv_id,
prompt=message,
response=output,
cwd=cwd,
user_id=user_id,
)
return output
task_id = await task_runner.submit(
run_task,
description=f"CC session {conv_id}: {message[:50]}",
notify_chat_id=chat_id,
)
return f"⏳ Task #{task_id} started (timeout: {int(cc_timeout)}s). I'll notify you when it's done."
output = await run_claude( output = await run_claude(
message, message,
cwd=cwd, cwd=cwd,

274
agent/scheduler.py Normal file
View File

@ -0,0 +1,274 @@
"""Simple scheduler for reminders and recurring tasks."""
from __future__ import annotations
import asyncio
import json
import logging
import time
import uuid
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__)
PERSISTENCE_FILE = Path(__file__).parent.parent / "scheduled_jobs.json"
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class ScheduledJob:
job_id: str
description: str
scheduled_at: float
delay_seconds: float
status: JobStatus = JobStatus.PENDING
is_recurring: bool = False
interval_seconds: Optional[float] = None
max_runs: int = 1
runs_completed: int = 0
notify_chat_id: Optional[str] = None
def to_dict(self) -> dict:
data = asdict(self)
data["status"] = self.status.value
return data
@classmethod
def from_dict(cls, data: dict) -> "ScheduledJob":
data = data.copy()
data["status"] = JobStatus(data.get("status", "pending"))
return cls(**data)
class Scheduler:
"""Singleton that manages scheduled jobs with Feishu notifications."""
def __init__(self) -> None:
self._jobs: Dict[str, ScheduledJob] = {}
self._tasks: Dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock()
self._started = False
async def start(self) -> None:
"""Load persisted jobs and reschedule pending ones."""
self._load()
self._started = True
now = time.time()
for job in list(self._jobs.values()):
if job.status == JobStatus.PENDING:
elapsed = now - job.scheduled_at
if job.is_recurring:
remaining = job.interval_seconds - (elapsed % job.interval_seconds) if job.interval_seconds else 0
task = asyncio.create_task(self._run_recurring(job.job_id, skip_initial=True, initial_delay=remaining))
self._tasks[job.job_id] = task
else:
remaining = max(0, job.delay_seconds - elapsed)
if remaining <= 0:
asyncio.create_task(self._run_once(job.job_id))
else:
task = asyncio.create_task(self._run_once(job.job_id, initial_delay=remaining))
self._tasks[job.job_id] = task
logger.info("Scheduler started with %d jobs", len(self._jobs))
def _load(self) -> None:
"""Load jobs from persistence file."""
if not PERSISTENCE_FILE.exists():
return
try:
with open(PERSISTENCE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for job_id, job_data in data.items():
job = ScheduledJob.from_dict(job_data)
if job.status not in (JobStatus.COMPLETED, JobStatus.CANCELLED):
self._jobs[job_id] = job
logger.info("Loaded %d jobs from %s", len(self._jobs), PERSISTENCE_FILE)
except Exception:
logger.exception("Failed to load scheduled jobs")
def _save(self) -> None:
"""Save jobs to persistence file."""
try:
data = {jid: job.to_dict() for jid, job in self._jobs.items()}
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception:
logger.exception("Failed to save scheduled jobs")
async def schedule_once(
self,
delay_seconds: float,
message: str,
notify_chat_id: Optional[str] = None,
) -> str:
"""Schedule a one-time reminder."""
job_id = str(uuid.uuid4())[:8]
job = ScheduledJob(
job_id=job_id,
description=message,
scheduled_at=time.time(),
delay_seconds=delay_seconds,
notify_chat_id=notify_chat_id,
)
async with self._lock:
self._jobs[job_id] = job
self._save()
task = asyncio.create_task(self._run_once(job_id))
self._tasks[job_id] = task
logger.info("Scheduled job %s: %s (in %ds)", job_id, message[:50], delay_seconds)
return job_id
async def schedule_recurring(
self,
interval_seconds: float,
message: str,
max_runs: int = 10,
notify_chat_id: Optional[str] = None,
) -> str:
"""Schedule a recurring reminder."""
job_id = str(uuid.uuid4())[:8]
job = ScheduledJob(
job_id=job_id,
description=message,
scheduled_at=time.time(),
delay_seconds=interval_seconds,
is_recurring=True,
interval_seconds=interval_seconds,
max_runs=max_runs,
notify_chat_id=notify_chat_id,
)
async with self._lock:
self._jobs[job_id] = job
self._save()
task = asyncio.create_task(self._run_recurring(job_id))
self._tasks[job_id] = task
logger.info("Scheduled recurring job %s: %s (every %ds, %d runs)",
job_id, message[:50], interval_seconds, max_runs)
return job_id
async def _run_once(self, job_id: str, initial_delay: Optional[float] = None) -> None:
"""Execute a one-time job after delay."""
job = self._jobs.get(job_id)
if not job:
return
delay = initial_delay if initial_delay is not None else job.delay_seconds
await asyncio.sleep(delay)
async with self._lock:
job.status = JobStatus.RUNNING
self._save()
await self._send_notification(job, job.description)
async with self._lock:
job.status = JobStatus.COMPLETED
job.runs_completed = 1
self._save()
logger.info("Job %s completed", job_id)
async def _run_recurring(self, job_id: str, skip_initial: bool = False, initial_delay: Optional[float] = None) -> None:
"""Execute a recurring job."""
job = self._jobs.get(job_id)
if not job:
return
interval = job.interval_seconds or 60
if skip_initial and initial_delay is not None:
await asyncio.sleep(initial_delay)
for run in range(job.max_runs):
if not (skip_initial and run == 0):
await asyncio.sleep(interval)
async with self._lock:
if job.status == JobStatus.CANCELLED:
break
job.status = JobStatus.RUNNING
self._save()
await self._send_notification(job, f"[{job.runs_completed + 1}/{job.max_runs}] {job.description}")
async with self._lock:
job.runs_completed += 1
if job.runs_completed < job.max_runs:
job.status = JobStatus.PENDING
else:
job.status = JobStatus.COMPLETED
self._save()
logger.info("Recurring job %s finished (%d runs)", job_id, job.runs_completed)
async def _send_notification(self, job: ScheduledJob, message: str) -> None:
"""Send Feishu notification."""
if not job.notify_chat_id:
return
from bot.feishu import send_text
try:
await send_text(job.notify_chat_id, "chat_id", f"⏰ **Reminder**\n\n{message}")
except Exception:
logger.exception("Failed to send notification for job %s", job.job_id)
def cancel(self, job_id: str) -> bool:
"""Cancel a scheduled job."""
job = self._jobs.get(job_id)
if not job:
return False
job.status = JobStatus.CANCELLED
self._save()
task = self._tasks.get(job_id)
if task:
task.cancel()
logger.info("Cancelled job %s", job_id)
return True
async def stop(self) -> None:
"""Stop all tasks and clear state."""
for task in self._tasks.values():
task.cancel()
self._tasks.clear()
async with self._lock:
self._jobs.clear()
if PERSISTENCE_FILE.exists():
PERSISTENCE_FILE.unlink()
logger.info("Scheduler stopped")
def list_jobs(self, limit: int = 20) -> list[dict]:
jobs = sorted(
[j for j in self._jobs.values() if j.status != JobStatus.COMPLETED],
key=lambda j: j.scheduled_at,
)[:limit]
return [
{
"job_id": j.job_id,
"description": j.description[:50],
"status": j.status.value,
"is_recurring": j.is_recurring,
"runs_completed": j.runs_completed,
"max_runs": j.max_runs,
}
for j in jobs
]
scheduler = Scheduler()

158
agent/task_runner.py Normal file
View File

@ -0,0 +1,158 @@
"""Background task runner for long-running operations."""
from __future__ import annotations
import asyncio
import logging
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__)
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class BackgroundTask:
task_id: str
description: str
started_at: float
status: TaskStatus = TaskStatus.PENDING
completed_at: Optional[float] = None
result: Optional[str] = None
error: Optional[str] = None
notify_chat_id: Optional[str] = None
user_id: Optional[str] = None
@property
def elapsed(self) -> float:
if self.completed_at:
return self.completed_at - self.started_at
return time.time() - self.started_at
class TaskRunner:
"""Singleton that manages background tasks with Feishu notifications."""
def __init__(self) -> None:
self._tasks: Dict[str, BackgroundTask] = {}
self._lock = asyncio.Lock()
self._notification_handler: Optional[Callable] = None
def set_notification_handler(self, handler: Optional[Callable]) -> None:
"""Set custom notification handler for M3 mode (host client -> router)."""
self._notification_handler = handler
async def submit(
self,
coro: Callable[[], Any],
description: str,
notify_chat_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> str:
"""Submit a coroutine as a background task."""
task_id = str(uuid.uuid4())[:8]
task = BackgroundTask(
task_id=task_id,
description=description,
started_at=time.time(),
status=TaskStatus.PENDING,
notify_chat_id=notify_chat_id,
user_id=user_id,
)
async with self._lock:
self._tasks[task_id] = task
asyncio.create_task(self._run_task(task_id, coro))
logger.info("Submitted background task %s: %s", task_id, description)
return task_id
async def _run_task(self, task_id: str, coro: Callable[[], Any]) -> None:
"""Execute a task and send notification on completion."""
async with self._lock:
task = self._tasks.get(task_id)
if not task:
return
task.status = TaskStatus.RUNNING
try:
result = await coro
async with self._lock:
task.status = TaskStatus.COMPLETED
task.completed_at = time.time()
task.result = str(result)[:2000] if result else None
logger.info("Task %s completed in %.1fs", task_id, task.elapsed)
except Exception as exc:
async with self._lock:
task.status = TaskStatus.FAILED
task.completed_at = time.time()
task.error = str(exc)[:500]
logger.exception("Task %s failed: %s", task_id, exc)
if task.notify_chat_id:
if self._notification_handler:
await self._notification_handler(task)
else:
await self._send_notification(task)
async def _send_notification(self, task: BackgroundTask) -> None:
"""Send Feishu notification about task completion."""
from bot.feishu import send_text
if task.status == TaskStatus.COMPLETED:
emoji = ""
status_text = "done"
else:
emoji = ""
status_text = "failed"
elapsed = int(task.elapsed)
msg = f"{emoji} Task #{task.task_id} {status_text} ({elapsed}s)\n{task.description}"
if task.result:
truncated = task.result[:800]
if len(task.result) > 800:
truncated += "..."
msg += f"\n\n```\n{truncated}\n```"
elif task.error:
msg += f"\n\n**Error:** {task.error}"
try:
await send_text(task.notify_chat_id, "chat_id", msg)
except Exception:
logger.exception("Failed to send notification for task %s", task.task_id)
def get_task(self, task_id: str) -> Optional[BackgroundTask]:
return self._tasks.get(task_id)
def list_tasks(self, limit: int = 20) -> list[dict]:
tasks = sorted(
self._tasks.values(),
key=lambda t: t.started_at,
reverse=True,
)[:limit]
return [
{
"task_id": t.task_id,
"description": t.description,
"status": t.status.value,
"elapsed": int(t.elapsed),
"started_at": t.started_at,
}
for t in tasks
]
task_runner = TaskRunner()

View File

@ -10,8 +10,10 @@ import uuid
from typing import Optional, Tuple from typing import Optional, Tuple
from agent.manager import manager from agent.manager import manager
from agent.scheduler import scheduler
from agent.task_runner import task_runner
from orchestrator.agent import agent from orchestrator.agent import agent
from orchestrator.tools import set_current_user, get_current_user from orchestrator.tools import set_current_user, get_current_user, get_current_chat
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -59,6 +61,14 @@ async def handle_command(user_id: str, text: str) -> Optional[str]:
return _cmd_direct(user_id) return _cmd_direct(user_id)
elif cmd == "/smart": elif cmd == "/smart":
return _cmd_smart(user_id) return _cmd_smart(user_id)
elif cmd == "/tasks":
return _cmd_tasks()
elif cmd == "/shell":
return await _cmd_shell(args)
elif cmd == "/remind":
return await _cmd_remind(args)
elif cmd in ("/nodes", "/node"):
return await _cmd_nodes(user_id, args)
else: else:
return None return None
@ -203,6 +213,106 @@ def _cmd_smart(user_id: str) -> str:
return "✓ Smart mode ON. Messages go through LLM for intelligent routing." return "✓ Smart mode ON. Messages go through LLM for intelligent routing."
def _cmd_tasks() -> str:
"""List background tasks."""
tasks = task_runner.list_tasks()
if not tasks:
return "No background tasks."
lines = ["**Background Tasks:**\n"]
for t in tasks:
status_emoji = {"completed": "", "failed": "", "running": "", "pending": "⏸️"}.get(
t["status"], ""
)
lines.append(f"{status_emoji} #{t['task_id']} - {t['description'][:50]} ({t['elapsed']}s)")
return "\n".join(lines)
async def _cmd_shell(args: str) -> str:
"""Execute a shell command directly."""
if not args:
return "Usage: /shell <command>\nExample: /shell git status"
from orchestrator.tools import ShellTool, get_current_chat
tool = ShellTool()
result = await tool._arun(command=args)
try:
data = json.loads(result)
if "error" in data:
return f"{data['error']}"
output = []
if data.get("stdout"):
output.append(data["stdout"])
if data.get("stderr"):
output.append(f"[stderr] {data['stderr']}")
output.append(f"[exit code: {data.get('exit_code', '?')}]")
return "\n".join(output) if output else "(no output)"
except json.JSONDecodeError:
return result
async def _cmd_remind(args: str) -> str:
"""Set a reminder."""
if not args:
return "Usage: /remind <time> <message>\nExample: /remind 10m check the build\nTime format: 30s, 10m, 1h"
parts = args.split(None, 1)
if len(parts) < 2:
return "Usage: /remind <time> <message>\nExample: /remind 10m check the build"
time_str, message = parts
match = re.match(r'^(\d+)(s|m|h)$', time_str.lower())
if not match:
return "Invalid time format. Use: 30s, 10m, 1h"
value = int(match.group(1))
unit = match.group(2)
seconds = value * {'s': 1, 'm': 60, 'h': 3600}[unit]
chat_id = get_current_chat()
job_id = await scheduler.schedule_once(
delay_seconds=seconds,
message=message,
notify_chat_id=chat_id,
)
return f"⏰ Reminder #{job_id} set for {value}{unit} from now"
async def _cmd_nodes(user_id: str, args: str) -> str:
"""List nodes or switch active node."""
from config import ROUTER_MODE
if not ROUTER_MODE:
return "Not in router mode. Run standalone.py for multi-host support."
from router.nodes import get_node_registry
registry = get_node_registry()
if args:
args = args.strip()
if registry.set_active_node(user_id, args):
return f"✓ Active node set to: {args}"
return f"Error: Node '{args}' not found"
nodes = registry.list_nodes()
if not nodes:
return "No nodes connected."
active_node_id = None
active_node = registry.get_active_node(user_id)
if active_node:
active_node_id = active_node.node_id
lines = ["**Connected Nodes:**\n"]
for n in nodes:
marker = "" if n["node_id"] == active_node_id else " "
status = "🟢" if n["status"] == "online" else "🔴"
lines.append(f"{marker}{n['display_name']} {status} sessions={n['sessions']}")
lines.append("\nUse `/node <name>` to switch active node.")
return "\n".join(lines)
def _cmd_help() -> str: def _cmd_help() -> str:
"""Show help.""" """Show help."""
return """**Commands:** return """**Commands:**
@ -212,5 +322,10 @@ def _cmd_help() -> str:
/switch <n> - Switch to session by number /switch <n> - Switch to session by number
/direct - Direct mode: messages Claude Code (no LLM overhead) /direct - Direct mode: messages Claude Code (no LLM overhead)
/smart - Smart mode: messages LLM routing (default) /smart - Smart mode: messages LLM routing (default)
/shell <cmd> - Run shell command (bypasses LLM)
/remind <time> <msg> - Set reminder (e.g. /remind 10m check build)
/tasks - List background tasks
/nodes - List connected host nodes
/node <name> - Switch active node
/retry - Retry last message /retry - Retry last message
/help - Show this help""" /help - Show this help"""

View File

@ -15,6 +15,7 @@ from bot.commands import handle_command
from bot.feishu import send_text from bot.feishu import send_text
from config import FEISHU_APP_ID, FEISHU_APP_SECRET, is_user_allowed from config import FEISHU_APP_ID, FEISHU_APP_SECRET, is_user_allowed
from orchestrator.agent import agent from orchestrator.agent import agent
from orchestrator.tools import set_current_chat
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -85,15 +86,55 @@ def _handle_message(data: P2ImMessageReceiveV1) -> None:
async def _process_message(user_id: str, chat_id: str, text: str) -> None: async def _process_message(user_id: str, chat_id: str, text: str) -> None:
"""Process message: check allowlist, then commands, then agent.""" """Process message: check allowlist, then commands, then route to node or local agent."""
try: try:
set_current_chat(chat_id)
if not is_user_allowed(user_id): if not is_user_allowed(user_id):
logger.warning("Rejected message from unauthorized user: ...%s", user_id[-8:]) logger.warning("Rejected message from unauthorized user: ...%s", user_id[-8:])
await send_text(chat_id, "chat_id", "Sorry, you are not authorized to use this bot.") await send_text(chat_id, "chat_id", "Sorry, you are not authorized to use this bot.")
return return
reply = await handle_command(user_id, text) reply = await handle_command(user_id, text)
if reply is None: if reply is not None:
if reply:
await send_text(chat_id, "chat_id", reply)
return
from config import ROUTER_MODE
if ROUTER_MODE:
from router.routing_agent import route
from router.rpc import forward
from router.nodes import get_node_registry
node_id, reason = await route(user_id, chat_id, text)
if node_id is None:
await send_text(chat_id, "chat_id", f"No host available: {reason}")
return
if node_id == "meta":
registry = get_node_registry()
nodes = registry.list_nodes()
if nodes:
lines = ["Connected Nodes:"]
for n in nodes:
marker = "" if n.get("node_id") == registry.get_active_node(user_id) else " "
lines.append(f"{marker}{n['display_name']} sessions={n['sessions']} {n['status']}")
lines.append("\nUse \"/node <name>\" to switch active node.")
await send_text(chat_id, "chat_id", "\n".join(lines))
else:
await send_text(chat_id, "chat_id", "No nodes connected.")
return
try:
reply = await forward(node_id, user_id, chat_id, text)
if reply:
await send_text(chat_id, "chat_id", reply)
except Exception as e:
logger.exception("Failed to forward to node %s", node_id)
await send_text(chat_id, "chat_id", f"Error communicating with node: {e}")
else:
reply = await agent.run(user_id, text) reply = await agent.run(user_id, text)
if reply: if reply:
await send_text(chat_id, "chat_id", reply) await send_text(chat_id, "chat_id", reply)

View File

@ -1,6 +1,6 @@
import yaml import yaml
from pathlib import Path from pathlib import Path
from typing import List from typing import List, Optional
_CONFIG_PATH = Path(__file__).parent / "keyring.yaml" _CONFIG_PATH = Path(__file__).parent / "keyring.yaml"
@ -18,6 +18,10 @@ OPENAI_BASE_URL: str = _cfg["OPENAI_BASE_URL"]
OPENAI_API_KEY: str = _cfg["OPENAI_API_KEY"] OPENAI_API_KEY: str = _cfg["OPENAI_API_KEY"]
OPENAI_MODEL: str = _cfg.get("OPENAI_MODEL", "glm-4.7") OPENAI_MODEL: str = _cfg.get("OPENAI_MODEL", "glm-4.7")
WORKING_DIR: Path = Path(_cfg.get("WORKING_DIR", Path.home())).expanduser().resolve() WORKING_DIR: Path = Path(_cfg.get("WORKING_DIR", Path.home())).expanduser().resolve()
METASO_API_KEY: str = _cfg.get("METASO_API_KEY", "")
ROUTER_MODE: bool = _cfg.get("ROUTER_MODE", False)
ROUTER_SECRET: str = _cfg.get("ROUTER_SECRET", "")
ALLOWED_OPEN_IDS: List[str] = _cfg.get("ALLOWED_OPEN_IDS", []) ALLOWED_OPEN_IDS: List[str] = _cfg.get("ALLOWED_OPEN_IDS", [])
if ALLOWED_OPEN_IDS and not isinstance(ALLOWED_OPEN_IDS, list): if ALLOWED_OPEN_IDS and not isinstance(ALLOWED_OPEN_IDS, list):

6
host_client/__init__.py Normal file
View File

@ -0,0 +1,6 @@
"""Host client module - connects to router and runs local mailboy."""
from host_client.config import HostConfig, get_host_config
from host_client.main import NodeClient
__all__ = ["HostConfig", "get_host_config", "NodeClient"]

97
host_client/config.py Normal file
View File

@ -0,0 +1,97 @@
"""Host client configuration loader.
Loads host_config.yaml which contains:
- NODE_ID, DISPLAY_NAME
- ROUTER_URL, ROUTER_SECRET
- LLM config (OPENAI_*)
- WORKING_DIR, METASO_API_KEY
- SERVES_USERS list
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import List, Optional
import yaml
class HostConfig:
"""Configuration for a host client node."""
def __init__(self, config_path: Optional[Path] = None):
config_path = config_path or Path(__file__).parent.parent / "host_config.yaml"
self._load(config_path)
def _load(self, config_path: Path) -> None:
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
self.node_id: str = data.get("NODE_ID", "unknown-node")
self.display_name: str = data.get("DISPLAY_NAME", self.node_id)
self.router_url: str = data.get("ROUTER_URL", "ws://127.0.0.1:8000/ws/node")
self.router_secret: str = data.get("ROUTER_SECRET", "")
self.openai_base_url: str = data.get(
"OPENAI_BASE_URL", "https://open.bigmodel.cn/api/paas/v4/"
)
self.openai_api_key: str = data.get("OPENAI_API_KEY", "")
self.openai_model: str = data.get("OPENAI_MODEL", "glm-4.7")
self.working_dir: str = data.get("WORKING_DIR", str(Path.home() / "projects"))
self.metaso_api_key: Optional[str] = data.get("METASO_API_KEY")
serves_users = data.get("SERVES_USERS", [])
self.serves_users: List[str] = serves_users if isinstance(serves_users, list) else []
self.capabilities: List[str] = data.get(
"CAPABILITIES",
["claude_code", "shell", "file_ops", "web", "scheduler"],
)
@classmethod
def from_keyring(cls, keyring_path: Optional[Path] = None) -> "HostConfig":
"""Create config from keyring.yaml (for standalone mode)."""
keyring_path = keyring_path or Path(__file__).parent.parent / "keyring.yaml"
if not keyring_path.exists():
raise FileNotFoundError(f"keyring.yaml not found: {keyring_path}")
with open(keyring_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
config = cls.__new__(cls)
config.node_id = data.get("NODE_ID", "local-node")
config.display_name = data.get("DISPLAY_NAME", "Local Machine")
config.router_url = data.get("ROUTER_URL", "ws://127.0.0.1:8000/ws/node")
config.router_secret = data.get("ROUTER_SECRET", "")
config.openai_base_url = data.get(
"OPENAI_BASE_URL", "https://open.bigmodel.cn/api/paas/v4/"
)
config.openai_api_key = data.get("OPENAI_API_KEY", "")
config.openai_model = data.get("OPENAI_MODEL", "glm-4.7")
config.working_dir = data.get("WORKING_DIR", str(Path.home() / "projects"))
config.metaso_api_key = data.get("METASO_API_KEY")
serves_users = data.get("ALLOWED_OPEN_IDS", [])
config.serves_users = serves_users if isinstance(serves_users, list) else []
config.capabilities = ["claude_code", "shell", "file_ops", "web", "scheduler"]
return config
host_config: Optional[HostConfig] = None
def get_host_config() -> HostConfig:
"""Get the global host config instance."""
global host_config
if host_config is None:
host_config = HostConfig()
return host_config

280
host_client/main.py Normal file
View File

@ -0,0 +1,280 @@
"""Host client main module.
Connects to the router via WebSocket, receives forwarded messages,
runs the local mailboy LLM, and sends responses back.
"""
from __future__ import annotations
import asyncio
import logging
import secrets
import time
from typing import Optional
import websockets
from websockets.client import WebSocketClientProtocol
from agent.manager import manager
from agent.scheduler import scheduler
from agent.task_runner import task_runner
from host_client.config import HostConfig, get_host_config
from orchestrator.agent import run as run_mailboy
from orchestrator.tools import set_current_user, set_current_chat
from shared import (
RegisterMessage,
ForwardRequest,
ForwardResponse,
TaskComplete,
Heartbeat,
NodeStatus,
encode,
decode,
)
logger = logging.getLogger(__name__)
class NodeClient:
"""WebSocket client that connects to the router and handles messages."""
def __init__(self, config: HostConfig):
self.config = config
self.ws: Optional[WebSocketClientProtocol] = None
self._running = False
self._last_heartbeat = time.time()
self._reconnect_delay = 1.0
async def connect(self) -> bool:
"""Connect to the router WebSocket."""
headers = {}
if self.config.router_secret:
headers["Authorization"] = f"Bearer {self.config.router_secret}"
try:
self.ws = await websockets.connect(
self.config.router_url,
extra_headers=headers,
ping_interval=30,
ping_timeout=10,
)
logger.info("Connected to router: %s", self.config.router_url)
self._reconnect_delay = 1.0
return True
except Exception as e:
logger.error("Failed to connect to router: %s", e)
return False
async def register(self) -> bool:
"""Send registration message to the router."""
if not self.ws:
return False
msg = RegisterMessage(
node_id=self.config.node_id,
display_name=self.config.display_name,
serves_users=self.config.serves_users,
working_dir=self.config.working_dir,
capabilities=self.config.capabilities,
)
try:
await self.ws.send(encode(msg))
logger.info("Sent registration for node: %s", self.config.node_id)
return True
except Exception as e:
logger.error("Failed to send registration: %s", e)
return False
async def handle_forward(self, request: ForwardRequest) -> None:
"""Handle a forwarded message from the router."""
logger.info("Received forward request %s from user %s", request.id, request.user_id)
set_current_user(request.user_id)
set_current_chat(request.chat_id)
try:
reply = await run_mailboy(request.user_id, request.text)
response = ForwardResponse(
id=request.id,
reply=reply,
error="",
)
except Exception as e:
logger.exception("Error processing forward request %s", request.id)
response = ForwardResponse(
id=request.id,
reply="",
error=str(e),
)
if self.ws:
try:
await self.ws.send(encode(response))
except Exception as e:
logger.error("Failed to send response: %s", e)
async def send_heartbeat(self) -> None:
"""Send a ping heartbeat to the router."""
if self.ws:
try:
await self.ws.send(encode(Heartbeat(type="ping")))
self._last_heartbeat = time.time()
except Exception as e:
logger.error("Failed to send heartbeat: %s", e)
async def send_status(self) -> None:
"""Send node status update to the router."""
if not self.ws:
return
sessions = manager.list_sessions()
active_sessions = [
{"conv_id": s["conv_id"], "working_dir": s["working_dir"]}
for s in sessions
]
status = NodeStatus(
node_id=self.config.node_id,
sessions=len(sessions),
active_sessions=active_sessions,
)
try:
await self.ws.send(encode(status))
except Exception as e:
logger.error("Failed to send status: %s", e)
async def handle_message(self, data: str) -> None:
"""Handle an incoming message from the router."""
try:
msg = decode(data)
except Exception as e:
logger.error("Failed to decode message: %s", e)
return
if isinstance(msg, ForwardRequest):
await self.handle_forward(msg)
elif isinstance(msg, Heartbeat):
if msg.type == "ping":
if self.ws:
try:
await self.ws.send(encode(Heartbeat(type="pong")))
except Exception as e:
logger.error("Failed to send pong: %s", e)
elif msg.type == "pong":
self._last_heartbeat = time.time()
else:
logger.debug("Received message type: %s", msg.type)
async def receive_loop(self) -> None:
"""Main receive loop for incoming messages."""
if not self.ws:
return
try:
async for data in self.ws:
await self.handle_message(data)
except websockets.ConnectionClosed as e:
logger.warning("Connection closed: %s", e)
except Exception as e:
logger.exception("Error in receive loop: %s", e)
async def heartbeat_loop(self) -> None:
"""Periodic heartbeat loop."""
while self._running:
await asyncio.sleep(30)
if self.ws and self.ws.open:
await self.send_heartbeat()
async def status_loop(self) -> None:
"""Periodic status update loop."""
while self._running:
await asyncio.sleep(60)
if self.ws and self.ws.open:
await self.send_status()
async def run(self) -> None:
"""Main run loop with reconnection."""
self._running = True
await manager.start()
await scheduler.start()
task_runner.set_notification_handler(self._send_task_complete)
while self._running:
if await self.connect():
if await self.register():
try:
await asyncio.gather(
self.receive_loop(),
self.heartbeat_loop(),
self.status_loop(),
)
except Exception:
pass
if self._running:
logger.info("Reconnecting in %.1f seconds...", self._reconnect_delay)
await asyncio.sleep(self._reconnect_delay)
self._reconnect_delay = min(self._reconnect_delay * 2, 60)
async def _send_task_complete(self, task) -> None:
"""Send TaskComplete notification to router."""
if not self.ws:
return
from shared import TaskComplete, encode
msg = TaskComplete(
task_id=task.task_id,
user_id=task.user_id or "",
chat_id=task.notify_chat_id or "",
result=task.result or task.error or "",
)
try:
await self.ws.send(encode(msg))
logger.info("Sent TaskComplete for task %s", task.task_id)
except Exception as e:
logger.error("Failed to send TaskComplete: %s", e)
async def stop(self) -> None:
"""Stop the client."""
self._running = False
if self.ws:
await self.ws.close()
await manager.stop()
await scheduler.stop()
logger.info("Node client stopped")
@classmethod
def from_keyring(cls, router_url: Optional[str] = None, secret: Optional[str] = None) -> "NodeClient":
"""Create a client from keyring.yaml (for standalone mode)."""
config = HostConfig.from_keyring()
if router_url:
config.router_url = router_url
if secret:
config.router_secret = secret
return cls(config)
async def main() -> None:
"""Entry point for standalone host client."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
client = NodeClient(get_host_config())
try:
await client.run()
except KeyboardInterrupt:
await client.stop()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,6 +1,7 @@
WORKING_DIR: "/path/to/working/directory"
FEISHU_APP_ID: your_feishu_app_id FEISHU_APP_ID: your_feishu_app_id
FEISHU_APP_SECRET: your_feishu_app_secret FEISHU_APP_SECRET: your_feishu_app_secret
OPENAI_BASE_URL: https://api.openai.com/v1/ OPENAI_BASE_URL: https://api.openai.com/v1/
OPENAI_API_KEY: your_openai_api_key OPENAI_API_KEY: your_openai_api_key
OPENAI_MODEL: gpt-4 OPENAI_MODEL: gpt-4
WORKING_DIR: "/path/to/working/directory" METASO_API_KEY: your_metaso_api_key

View File

@ -89,6 +89,8 @@ async def list_sessions() -> list:
@app.on_event("startup") @app.on_event("startup")
async def startup_event() -> None: async def startup_event() -> None:
await manager.start() await manager.start()
from agent.scheduler import scheduler
await scheduler.start()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
start_websocket_client(loop) start_websocket_client(loop)
logger.info("PhoneWork started") logger.info("PhoneWork started")
@ -97,6 +99,8 @@ async def startup_event() -> None:
@app.on_event("shutdown") @app.on_event("shutdown")
async def shutdown_event() -> None: async def shutdown_event() -> None:
await manager.stop() await manager.stop()
from agent.scheduler import scheduler
await scheduler.stop()
logger.info("PhoneWork shut down") logger.info("PhoneWork shut down")

View File

@ -44,10 +44,14 @@ Your responsibilities:
2. Follow-up to ACTIVE session: call `send_to_conversation` with the active conv_id shown above. 2. Follow-up to ACTIVE session: call `send_to_conversation` with the active conv_id shown above.
3. List sessions: call `list_conversations`. 3. List sessions: call `list_conversations`.
4. Close session: call `close_conversation`. 4. Close session: call `close_conversation`.
5. GENERAL QUESTIONS: If the user asks a general question (not about a specific project or file), \
answer directly using your own knowledge. Do NOT create a session for simple Q&A.
Guidelines: Guidelines:
- Relay Claude Code's output verbatim. - Relay Claude Code's output verbatim.
- If no active session and the user sends a task without naming a directory, ask them which project. - If no active session and the user sends a task without naming a directory, ask them which project.
- For general knowledge questions (e.g., "what is a Python generator?", "explain async/await"), \
answer directly without creating a session.
- Keep your own words brief let Claude Code's output speak. - Keep your own words brief let Claude Code's output speak.
- Reply in the same language the user uses (Chinese or English). - Reply in the same language the user uses (Chinese or English).
""" """
@ -55,6 +59,35 @@ Guidelines:
MAX_ITERATIONS = 10 MAX_ITERATIONS = 10
_TOOL_MAP = {t.name: t for t in TOOLS} _TOOL_MAP = {t.name: t for t in TOOLS}
QUESTION_PATTERNS = [
r'\?$', # ends with ?
r'$', # ends with Chinese ?
r'\b(what|how|why|when|where|who|which|explain|describe|tell me|can you|could you|is there|are there|do you know)\b',
r'(什么|怎么|为什么|何时|哪里|谁|哪个|解释|描述|告诉我|能否|可以|有没有|是不是)',
]
def _is_general_question(text: str) -> bool:
"""Check if text looks like a general knowledge question (not a project task)."""
text_lower = text.lower().strip()
project_indicators = [
'create', 'make', 'build', 'fix', 'update', 'delete', 'remove', 'add',
'implement', 'refactor', 'test', 'run', 'execute', 'start', 'stop',
'project', 'folder', 'directory', 'file', 'code', 'session',
'创建', '制作', '构建', '修复', '更新', '删除', '添加', '实现', '重构', '测试', '运行', '项目', '文件夹', '文件', '代码',
]
for indicator in project_indicators:
if indicator in text_lower:
return False
for pattern in QUESTION_PATTERNS:
if re.search(pattern, text_lower, re.IGNORECASE):
return True
return False
class OrchestrationAgent: class OrchestrationAgent:
"""Per-user agent with conversation history and active session tracking.""" """Per-user agent with conversation history and active session tracking."""
@ -123,6 +156,23 @@ class OrchestrationAgent:
logger.exception("Passthrough error for user=%s", user_id) logger.exception("Passthrough error for user=%s", user_id)
return f"[Error] {exc}" return f"[Error] {exc}"
# Direct Q&A: if no active session and message looks like a general question, answer directly
if not active_conv and _is_general_question(text):
logger.debug(" → direct Q&A (no tools)")
llm_no_tools = ChatOpenAI(
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
model=OPENAI_MODEL,
temperature=0.7,
)
qa_prompt = (
"You are a helpful assistant. Answer the user's question concisely and accurately. "
"Reply in the same language the user uses.\n\n"
f"Question: {text}"
)
response = await llm_no_tools.ainvoke([HumanMessage(content=qa_prompt)])
return response.content or ""
messages: List[BaseMessage] = ( messages: List[BaseMessage] = (
[SystemMessage(content=self._build_system_prompt(user_id))] [SystemMessage(content=self._build_system_prompt(user_id))]
+ self._history[user_id] + self._history[user_id]

View File

@ -15,6 +15,7 @@ from agent.manager import manager
from config import WORKING_DIR from config import WORKING_DIR
_current_user_id: ContextVar[Optional[str]] = ContextVar("current_user_id", default=None) _current_user_id: ContextVar[Optional[str]] = ContextVar("current_user_id", default=None)
_current_chat_id: ContextVar[Optional[str]] = ContextVar("current_chat_id", default=None)
def set_current_user(user_id: Optional[str]) -> None: def set_current_user(user_id: Optional[str]) -> None:
@ -25,6 +26,14 @@ def get_current_user() -> Optional[str]:
return _current_user_id.get() return _current_user_id.get()
def set_current_chat(chat_id: Optional[str]) -> None:
_current_chat_id.set(chat_id)
def get_current_chat() -> Optional[str]:
return _current_chat_id.get()
def _resolve_dir(working_dir: str) -> Path: def _resolve_dir(working_dir: str) -> Path:
""" """
Resolve working_dir to an absolute path under WORKING_DIR. Resolve working_dir to an absolute path under WORKING_DIR.
@ -188,10 +197,516 @@ class CloseConversationTool(BaseTool):
return str(e) return str(e)
BLOCKED_PATTERNS = [
r'\brm\s+-rf\s+/',
r'\brm\s+-rf\s+~',
r'\bformat\s+',
r'\bmkfs\b',
r'\bshutdown\b',
r'\breboot\b',
r'\bdd\s+if=',
r':\(\)\{:\|:&\};:',
r'\bchmod\s+777\s+/',
r'\bchown\s+.*\s+/',
r'\b>\s*/dev/sd',
r'\bkill\s+-9\s+1\b',
r'\bsudo\s+rm\b',
r'\bsu\s+-c\b',
r'\bsudo\s+chmod\b',
r'\bsudo\s+chown\b',
r'\bsudo\s+dd\b',
r'\b>\s*/dev/null\s+2>&1\s*&\s*;', # fork via backgrounding
]
def _is_command_safe(command: str) -> tuple[bool, str]:
"""Check if command is safe to execute."""
import re
for pattern in BLOCKED_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return False, f"Blocked: command matches dangerous pattern"
return True, ""
class ShellInput(BaseModel):
command: str = Field(..., description="Shell command to execute")
cwd: Optional[str] = Field(None, description="Working directory (default: WORKING_DIR)")
timeout: Optional[int] = Field(30, description="Timeout in seconds (max 120)")
class ShellTool(BaseTool):
name: str = "run_shell"
description: str = (
"Execute a shell command on the host machine and return stdout/stderr. "
"Use for: git status, ls, cat, grep, pip list, etc. "
"Destructive commands (rm -rf /, format, shutdown) are blocked."
)
args_schema: Type[BaseModel] = ShellInput
def _run(self, command: str, cwd: Optional[str] = None, timeout: Optional[int] = 30) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, command: str, cwd: Optional[str] = None, timeout: Optional[int] = 30) -> str:
import asyncio
import shutil
is_safe, reason = _is_command_safe(command)
if not is_safe:
return json.dumps({"error": reason}, ensure_ascii=False)
timeout = min(timeout or 30, 120)
work_dir = WORKING_DIR
if cwd:
try:
work_dir = _resolve_dir(cwd)
except ValueError as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
try:
proc = await asyncio.create_subprocess_shell(
command,
cwd=str(work_dir),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout,
)
return json.dumps({
"stdout": stdout.decode("utf-8", errors="replace")[:4000],
"stderr": stderr.decode("utf-8", errors="replace")[:1000],
"exit_code": proc.returncode,
"cwd": str(work_dir),
}, ensure_ascii=False)
except asyncio.TimeoutError:
return json.dumps({"error": f"Command timed out after {timeout}s"}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
class FileReadInput(BaseModel):
path: str = Field(..., description="File path relative to working directory")
start_line: Optional[int] = Field(None, description="Start line (1-indexed)")
end_line: Optional[int] = Field(None, description="End line (inclusive)")
class FileReadTool(BaseTool):
name: str = "read_file"
description: str = "Read a file from the working directory. Returns file content with line numbers."
args_schema: Type[BaseModel] = FileReadInput
def _run(self, path: str, start_line: Optional[int] = None, end_line: Optional[int] = None) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, path: str, start_line: Optional[int] = None, end_line: Optional[int] = None) -> str:
try:
file_path = _resolve_dir(path)
if not file_path.is_file():
return json.dumps({"error": f"Not a file: {path}"}, ensure_ascii=False)
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
total_lines = len(lines)
start = max(1, start_line or 1) - 1
end = min(total_lines, end_line or total_lines)
result_lines = []
for i in range(start, end):
result_lines.append(f"{i+1:4d} | {lines[i].rstrip()}")
return json.dumps({
"path": str(file_path),
"lines": f"{start+1}-{end}",
"total_lines": total_lines,
"content": "\n".join(result_lines[-500:]),
}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
class FileWriteInput(BaseModel):
path: str = Field(..., description="File path relative to working directory")
content: str = Field(..., description="Content to write")
mode: Optional[str] = Field("overwrite", description="Write mode: 'overwrite' or 'append'")
class FileWriteTool(BaseTool):
name: str = "write_file"
description: str = "Write content to a file in the working directory. Use mode='append' to add to existing file."
args_schema: Type[BaseModel] = FileWriteInput
def _run(self, path: str, content: str, mode: Optional[str] = "overwrite") -> str:
raise NotImplementedError("Use async version")
async def _arun(self, path: str, content: str, mode: Optional[str] = "overwrite") -> str:
try:
file_path = _resolve_dir(path)
file_path.parent.mkdir(parents=True, exist_ok=True)
write_mode = "a" if mode == "append" else "w"
with open(file_path, write_mode, encoding="utf-8") as f:
f.write(content)
return json.dumps({
"success": True,
"path": str(file_path),
"bytes_written": len(content.encode("utf-8")),
}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
class FileListInput(BaseModel):
path: Optional[str] = Field(None, description="Directory path (default: working directory)")
pattern: Optional[str] = Field(None, description="Glob pattern (e.g. '*.py')")
class FileListTool(BaseTool):
name: str = "list_files"
description: str = "List files in a directory. Use pattern to filter (e.g. '*.py')."
args_schema: Type[BaseModel] = FileListInput
def _run(self, path: Optional[str] = None, pattern: Optional[str] = None) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, path: Optional[str] = None, pattern: Optional[str] = None) -> str:
try:
dir_path = _resolve_dir(path or ".")
if not dir_path.is_dir():
return json.dumps({"error": f"Not a directory: {path}"}, ensure_ascii=False)
if pattern:
files = list(dir_path.glob(pattern))[:100]
else:
files = list(dir_path.iterdir())[:100]
result = []
for f in sorted(files):
result.append({
"name": f.name,
"type": "dir" if f.is_dir() else "file",
"size": f.stat().st_size if f.is_file() else None,
})
return json.dumps({
"path": str(dir_path),
"files": result,
}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
class FileSearchInput(BaseModel):
path: str = Field(..., description="Directory path to search in")
pattern: str = Field(..., description="Search pattern (regex supported)")
max_results: Optional[int] = Field(50, description="Max number of results")
class FileSearchTool(BaseTool):
name: str = "search_files"
description: str = (
"Search for text pattern in files under a directory (grep-like). "
"Returns matching lines with file paths and line numbers."
)
args_schema: Type[BaseModel] = FileSearchInput
def _run(self, path: str, pattern: str, max_results: Optional[int] = 50) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, path: str, pattern: str, max_results: Optional[int] = 50) -> str:
import re
try:
dir_path = _resolve_dir(path)
if not dir_path.is_dir():
return json.dumps({"error": f"Not a directory: {path}"}, ensure_ascii=False)
try:
regex = re.compile(pattern, re.IGNORECASE)
except re.error as e:
return json.dumps({"error": f"Invalid regex pattern: {e}"}, ensure_ascii=False)
results = []
text_extensions = {'.py', '.js', '.ts', '.tsx', '.jsx', '.java', '.c', '.cpp', '.h',
'.go', '.rs', '.rb', '.php', '.cs', '.swift', '.kt', '.scala',
'.txt', '.md', '.json', '.yaml', '.yml', '.toml', '.ini', '.cfg',
'.html', '.css', '.scss', '.sass', '.less', '.xml', '.sql',
'.sh', '.bash', '.zsh', '.ps1', '.bat', '.cmd'}
for file_path in dir_path.rglob("*"):
if not file_path.is_file():
continue
if file_path.suffix.lower() not in text_extensions:
continue
if any(part.startswith('.') for part in file_path.parts):
continue
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
for line_num, line in enumerate(f, 1):
if regex.search(line):
rel_path = file_path.relative_to(dir_path)
results.append({
"file": str(rel_path),
"line": line_num,
"content": line.rstrip()[:200],
})
if len(results) >= max_results:
break
if len(results) >= max_results:
break
except Exception:
continue
return json.dumps({
"path": str(dir_path),
"pattern": pattern,
"total_matches": len(results),
"results": results,
}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
class FileSendInput(BaseModel):
path: str = Field(..., description="File path to send")
class FileSendTool(BaseTool):
name: str = "send_file"
description: str = "Send a file to the user via Feishu. Returns confirmation message."
args_schema: Type[BaseModel] = FileSendInput
def _run(self, path: str) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, path: str) -> str:
try:
file_path = _resolve_dir(path)
if not file_path.is_file():
return json.dumps({"error": f"Not a file: {path}"}, ensure_ascii=False)
chat_id = get_current_chat()
if not chat_id:
return json.dumps({"error": "No chat context available"}, ensure_ascii=False)
from bot.feishu import send_file
await send_file(chat_id, "chat_id", str(file_path))
return json.dumps({
"success": True,
"path": str(file_path),
"size": file_path.stat().st_size,
"message": f"File sent: {file_path.name}",
}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
class WebInput(BaseModel):
action: str = Field(..., description="Action: 'search', 'fetch', or 'ask'")
query: Optional[str] = Field(None, description="Search query or question")
url: Optional[str] = Field(None, description="URL to fetch (for 'fetch' action)")
scope: Optional[str] = Field("webpage", description="Search scope: webpage, paper, document, video, podcast")
max_chars: Optional[int] = Field(2000, description="Max characters in response")
class WebTool(BaseTool):
name: str = "web"
description: str = (
"Search the web, fetch URLs, or ask questions using 秘塔AI Search. "
"Actions: 'search' (web search), 'fetch' (extract content from URL), 'ask' (RAG Q&A). "
"Requires METASO_API_KEY in keyring.yaml."
)
args_schema: Type[BaseModel] = WebInput
def _run(self, action: str, query: Optional[str] = None, url: Optional[str] = None,
scope: Optional[str] = "webpage", max_chars: Optional[int] = 2000) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, action: str, query: Optional[str] = None, url: Optional[str] = None,
scope: Optional[str] = "webpage", max_chars: Optional[int] = 2000) -> str:
from config import METASO_API_KEY
if not METASO_API_KEY:
return json.dumps({"error": "METASO_API_KEY not configured. Add it to keyring.yaml."}, ensure_ascii=False)
import httpx
base_url = "https://metaso.cn/api/mcp"
headers = {
"Authorization": f"Bearer {METASO_API_KEY}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
if action == "search":
if not query:
return json.dumps({"error": "query required for search"}, ensure_ascii=False)
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "metaso_web_search",
"params": {"query": query, "scope": scope or "webpage"},
}
resp = await client.post(base_url, json=payload, headers=headers)
data = resp.json()
if "error" in data:
return json.dumps({"error": data["error"]}, ensure_ascii=False)
results = data.get("result", {}).get("results", [])[:5]
output = []
for r in results:
output.append(f"**{r.get('title', 'No title')}**\n{r.get('snippet', '')}\n{r.get('url', '')}")
return json.dumps({"results": "\n\n".join(output)[:max_chars]}, ensure_ascii=False)
elif action == "fetch":
if not url:
return json.dumps({"error": "url required for fetch"}, ensure_ascii=False)
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "metaso_web_reader",
"params": {"url": url, "format": "markdown"},
}
resp = await client.post(base_url, json=payload, headers=headers)
data = resp.json()
if "error" in data:
return json.dumps({"error": data["error"]}, ensure_ascii=False)
content = data.get("result", {}).get("content", "")
return json.dumps({"content": content[:max_chars]}, ensure_ascii=False)
elif action == "ask":
if not query:
return json.dumps({"error": "query required for ask"}, ensure_ascii=False)
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "metaso_chat",
"params": {"query": query},
}
resp = await client.post(base_url, json=payload, headers=headers)
data = resp.json()
if "error" in data:
return json.dumps({"error": data["error"]}, ensure_ascii=False)
answer = data.get("result", {}).get("answer", "")
return json.dumps({"answer": answer[:max_chars]}, ensure_ascii=False)
else:
return json.dumps({"error": f"Unknown action: {action}"}, ensure_ascii=False)
except httpx.TimeoutException:
return json.dumps({"error": "Request timed out"}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
class SchedulerInput(BaseModel):
action: str = Field(..., description="Action: 'remind' or 'repeat'")
delay_seconds: Optional[int] = Field(None, description="Delay in seconds (for 'remind')")
interval_seconds: Optional[int] = Field(None, description="Interval in seconds (for 'repeat')")
message: str = Field(..., description="Reminder message")
max_runs: Optional[int] = Field(5, description="Max runs for recurring (default 5)")
class SchedulerTool(BaseTool):
name: str = "scheduler"
description: str = (
"Schedule reminders. Use 'remind' for one-time, 'repeat' for recurring. "
"Notifications sent to current chat."
)
args_schema: Type[BaseModel] = SchedulerInput
def _run(self, action: str, message: str, delay_seconds: Optional[int] = None,
interval_seconds: Optional[int] = None, max_runs: Optional[int] = 5) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, action: str, message: str, delay_seconds: Optional[int] = None,
interval_seconds: Optional[int] = None, max_runs: Optional[int] = 5) -> str:
from agent.scheduler import scheduler
chat_id = get_current_chat()
if action == "remind":
if not delay_seconds:
return json.dumps({"error": "delay_seconds required for remind"}, ensure_ascii=False)
job_id = await scheduler.schedule_once(
delay_seconds=delay_seconds,
message=message,
notify_chat_id=chat_id,
)
return json.dumps({
"success": True,
"job_id": job_id,
"message": f"Reminder set for {delay_seconds}s from now",
}, ensure_ascii=False)
elif action == "repeat":
if not interval_seconds:
return json.dumps({"error": "interval_seconds required for repeat"}, ensure_ascii=False)
job_id = await scheduler.schedule_recurring(
interval_seconds=interval_seconds,
message=message,
max_runs=max_runs or 5,
notify_chat_id=chat_id,
)
return json.dumps({
"success": True,
"job_id": job_id,
"message": f"Recurring reminder set every {interval_seconds}s ({max_runs} times)",
}, ensure_ascii=False)
else:
return json.dumps({"error": f"Unknown action: {action}"}, ensure_ascii=False)
class TaskStatusInput(BaseModel):
task_id: str = Field(..., description="Task ID to check")
class TaskStatusTool(BaseTool):
name: str = "task_status"
description: str = "Check the status of a background task. Returns current status and result if completed."
args_schema: Type[BaseModel] = TaskStatusInput
def _run(self, task_id: str) -> str:
raise NotImplementedError("Use async version")
async def _arun(self, task_id: str) -> str:
from agent.task_runner import task_runner
task = task_runner.get_task(task_id)
if not task:
return json.dumps({"error": f"Task {task_id} not found"}, ensure_ascii=False)
return json.dumps({
"task_id": task.task_id,
"description": task.description,
"status": task.status.value,
"elapsed": int(task.elapsed),
"started_at": task.started_at,
"completed_at": task.completed_at,
"result": task.result[:500] if task.result else None,
"error": task.error,
}, ensure_ascii=False)
# Module-level tool list for easy import # Module-level tool list for easy import
TOOLS = [ TOOLS = [
CreateConversationTool(), CreateConversationTool(),
SendToConversationTool(), SendToConversationTool(),
ListConversationsTool(), ListConversationsTool(),
CloseConversationTool(), CloseConversationTool(),
ShellTool(),
FileReadTool(),
FileWriteTool(),
FileListTool(),
FileSearchTool(),
FileSendTool(),
WebTool(),
SchedulerTool(),
TaskStatusTool(),
] ]

View File

@ -7,3 +7,4 @@ langchain-community>=0.2.0
pywinpty>=2.0.0 pywinpty>=2.0.0
pyyaml>=6.0.0 pyyaml>=6.0.0
rich>=13.0.0 rich>=13.0.0
httpx>=0.27.0

15
router/__init__.py Normal file
View File

@ -0,0 +1,15 @@
"""Router module - public-facing component of PhoneWork."""
from router.nodes import NodeRegistry, NodeConnection, get_node_registry
from router.main import create_app
from router.rpc import forward
from router.routing_agent import route
__all__ = [
"NodeRegistry",
"NodeConnection",
"get_node_registry",
"create_app",
"forward",
"route",
]

75
router/main.py Normal file
View File

@ -0,0 +1,75 @@
"""Router main module - FastAPI app factory.
Creates the FastAPI application with:
- Feishu WebSocket client
- Node WebSocket endpoint
- Health check endpoints
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from bot.handler import start_websocket_client
from router.nodes import NodeRegistry, get_node_registry
from router.ws import ws_node_endpoint
logger = logging.getLogger(__name__)
def create_app(router_secret: Optional[str] = None) -> FastAPI:
"""Create the FastAPI application.
Args:
router_secret: Secret for authenticating host client connections
"""
app = FastAPI(title="PhoneWork Router", version="3.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
registry = get_node_registry()
if router_secret:
registry._secret = router_secret
@app.get("/health")
async def health():
nodes = registry.list_nodes()
online_nodes = [n for n in nodes if n["status"] == "online"]
return {
"status": "ok",
"nodes": nodes,
"online_nodes": len(online_nodes),
"total_nodes": len(nodes),
"pending_requests": 0,
}
@app.get("/nodes")
async def list_nodes():
return registry.list_nodes()
@app.websocket("/ws/node")
async def ws_node(websocket: WebSocket):
await ws_node_endpoint(websocket)
@app.on_event("startup")
async def startup():
import asyncio
loop = asyncio.get_event_loop()
start_websocket_client(loop)
logger.info("Router started")
@app.on_event("shutdown")
async def shutdown():
logger.info("Router shut down")
return app

209
router/nodes.py Normal file
View File

@ -0,0 +1,209 @@
"""Node registry for managing connected host clients.
Maintains:
- Connected nodes with their WebSocket connections
- User-to-node mapping (which users each node serves)
- Active node preference per user
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
from shared import RegisterMessage, NodeStatus
logger = logging.getLogger(__name__)
@dataclass
class NodeConnection:
"""Represents a connected host client."""
node_id: str
ws: Any
display_name: str = ""
serves_users: Set[str] = field(default_factory=set)
working_dir: str = ""
capabilities: List[str] = field(default_factory=list)
connected_at: float = field(default_factory=time.time)
last_heartbeat: float = field(default_factory=time.time)
sessions: int = 0
active_sessions: List[Dict[str, Any]] = field(default_factory=list)
@property
def is_online(self) -> bool:
"""Check if node is still considered online (heartbeat within 60s)."""
return time.time() - self.last_heartbeat < 60
def to_dict(self) -> Dict[str, Any]:
"""Serialize for API responses."""
return {
"node_id": self.node_id,
"display_name": self.display_name,
"status": "online" if self.is_online else "offline",
"users": len(self.serves_users),
"sessions": self.sessions,
"capabilities": self.capabilities,
"connected_at": self.connected_at,
"last_heartbeat": self.last_heartbeat,
}
class NodeRegistry:
"""Registry of connected host clients."""
def __init__(self, router_secret: str = ""):
self._nodes: Dict[str, NodeConnection] = {}
self._user_nodes: Dict[str, Set[str]] = {}
self._active_node: Dict[str, str] = {}
self._secret = router_secret
self._lock = asyncio.Lock()
def validate_secret(self, secret: str) -> bool:
"""Validate router secret."""
if not self._secret:
return True
return secret == self._secret
async def register(self, ws: Any, msg: RegisterMessage) -> NodeConnection:
"""Register a new node connection."""
async with self._lock:
is_reconnect = msg.node_id in self._nodes
node = NodeConnection(
node_id=msg.node_id,
ws=ws,
display_name=msg.display_name or msg.node_id,
serves_users=set(msg.serves_users),
working_dir=msg.working_dir,
capabilities=msg.capabilities,
)
self._nodes[msg.node_id] = node
for user_id in msg.serves_users:
if user_id not in self._user_nodes:
self._user_nodes[user_id] = set()
self._user_nodes[user_id].add(msg.node_id)
logger.info(
"Node registered: %s (users: %s, capabilities: %s)",
msg.node_id,
msg.serves_users,
msg.capabilities,
)
if is_reconnect:
for user_id in msg.serves_users:
asyncio.create_task(self._notify_reconnect(user_id, node.display_name))
return node
async def _notify_reconnect(self, user_id: str, node_name: str) -> None:
"""Notify user about node reconnect."""
try:
from bot.feishu import send_text
await send_text(user_id, "open_id", f"✅ Node \"{node_name}\" reconnected.")
except Exception as e:
logger.error("Failed to send reconnect notification: %s", e)
async def unregister(self, node_id: str) -> None:
"""Unregister a node connection."""
async with self._lock:
node = self._nodes.pop(node_id, None)
if node:
affected_users = list(node.serves_users)
for user_id in node.serves_users:
if user_id in self._user_nodes:
self._user_nodes[user_id].discard(node_id)
if not self._user_nodes[user_id]:
del self._user_nodes[user_id]
for user_id in list(self._active_node.keys()):
if self._active_node[user_id] == node_id:
del self._active_node[user_id]
logger.info("Node unregistered: %s", node_id)
for user_id in affected_users:
asyncio.create_task(self._notify_disconnect(user_id, node.display_name))
async def _notify_disconnect(self, user_id: str, node_name: str) -> None:
"""Notify user about node disconnect."""
try:
from bot.feishu import send_text
await send_text(user_id, "open_id", f"⚠️ Node \"{node_name}\" disconnected.")
except Exception as e:
logger.error("Failed to send disconnect notification: %s", e)
async def update_status(self, msg: NodeStatus) -> None:
"""Update node status from heartbeat."""
async with self._lock:
node = self._nodes.get(msg.node_id)
if node:
node.sessions = msg.sessions
node.active_sessions = msg.active_sessions
node.last_heartbeat = time.time()
async def update_heartbeat(self, node_id: str) -> None:
"""Update node heartbeat timestamp."""
async with self._lock:
node = self._nodes.get(node_id)
if node:
node.last_heartbeat = time.time()
def get_node(self, node_id: str) -> Optional[NodeConnection]:
"""Get a node by ID."""
return self._nodes.get(node_id)
def get_nodes_for_user(self, user_id: str) -> List[NodeConnection]:
"""Get all nodes that serve a user."""
node_ids = self._user_nodes.get(user_id, set())
return [self._nodes[nid] for nid in node_ids if nid in self._nodes]
def get_active_node(self, user_id: str) -> Optional[NodeConnection]:
"""Get the active node for a user."""
node_id = self._active_node.get(user_id)
if node_id:
return self._nodes.get(node_id)
nodes = self.get_nodes_for_user(user_id)
if nodes:
online = [n for n in nodes if n.is_online]
if online:
return online[0]
return None
def set_active_node(self, user_id: str, node_id: str) -> bool:
"""Set the active node for a user."""
if node_id not in self._nodes:
return False
self._active_node[user_id] = node_id
logger.info("Active node for user %s set to %s", user_id, node_id)
return True
def list_nodes(self) -> List[Dict[str, Any]]:
"""List all nodes with their status."""
return [node.to_dict() for node in self._nodes.values()]
def get_affected_users(self, node_id: str) -> List[str]:
"""Get users affected by a node disconnect."""
node = self._nodes.get(node_id)
if node:
return list(node.serves_users)
return []
node_registry: Optional[NodeRegistry] = None
def get_node_registry() -> NodeRegistry:
"""Get the global node registry instance."""
global node_registry
if node_registry is None:
node_registry = NodeRegistry()
return node_registry

128
router/routing_agent.py Normal file
View File

@ -0,0 +1,128 @@
"""Routing LLM for deciding which node to forward messages to.
This is a lightweight, one-shot LLM call that decides routing.
No history, no multi-step loop. Single call with one tool.
"""
from __future__ import annotations
import json
import logging
from typing import List, Optional
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from config import OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL
from router.nodes import NodeConnection, get_node_registry
logger = logging.getLogger(__name__)
ROUTING_SYSTEM_PROMPT = """You are a routing assistant. A user has sent a message. \
Choose which node to forward it to.
Connected nodes for this user:
{nodes_info}
Rules:
- If the message references an active session on a node, route to that node.
- If the user names a machine explicitly ("on work-server", "@home-pc"), route there.
- If only one node is connected, route there without asking.
- If ambiguous with multiple idle nodes, ask the user to clarify.
- For meta commands (/nodes, /help, /status), respond with "meta" as the node_id.
Respond with a JSON object:
{{"node_id": "<node_id>", "reason": "<brief reason>"}}
"""
def _format_nodes_info(nodes: List[NodeConnection], active_node_id: Optional[str] = None) -> str:
"""Format node information for the routing prompt."""
lines = []
for node in nodes:
marker = " [ACTIVE]" if node.node_id == active_node_id else ""
sessions = ", ".join(
s.get("working_dir", "unknown") for s in node.active_sessions[:3]
) or "none"
lines.append(
f"- {node.display_name or node.node_id}{marker}: "
f"sessions=[{sessions}], capabilities={node.capabilities}"
)
return "\n".join(lines)
async def route(user_id: str, chat_id: str, text: str) -> tuple[Optional[str], str]:
"""Determine which node to route a message to.
Args:
user_id: User's Feishu open_id
chat_id: Chat ID for context
text: User's message text
Returns:
Tuple of (node_id, reason). node_id is None if no suitable node found.
"""
registry = get_node_registry()
nodes = registry.get_nodes_for_user(user_id)
if not nodes:
return None, "No nodes available for this user"
online_nodes = [n for n in nodes if n.is_online]
if not online_nodes:
return None, "All nodes for this user are offline"
if len(online_nodes) == 1:
return online_nodes[0].node_id, "Only one node available"
if text.strip().startswith("/"):
return "meta", "Meta command"
active_node = registry.get_active_node(user_id)
active_node_id = active_node.node_id if active_node else None
nodes_info = _format_nodes_info(online_nodes, active_node_id)
try:
llm = ChatOpenAI(
model=OPENAI_MODEL,
openai_api_key=OPENAI_API_KEY,
openai_api_base=OPENAI_BASE_URL,
temperature=0,
)
prompt = ROUTING_SYSTEM_PROMPT.format(nodes_info=nodes_info)
messages = [
SystemMessage(content=prompt),
HumanMessage(content=text),
]
response = await llm.ainvoke(messages)
content = response.content.strip()
if content.startswith("```"):
content = content.split("\n", 1)[1]
content = content.rsplit("```", 1)[0]
result = json.loads(content)
node_id = result.get("node_id")
reason = result.get("reason", "")
if node_id == "meta":
return "meta", reason
for node in online_nodes:
if node.node_id == node_id or node.display_name == node_id:
return node.node_id, reason
if active_node:
return active_node.node_id, f"Defaulting to active node (LLM suggested unavailable: {node_id})"
return online_nodes[0].node_id, f"Defaulting to first available node (LLM suggested: {node_id})"
except Exception as e:
logger.warning("Routing LLM failed: %s, falling back to active node", e)
if active_node:
return active_node.node_id, "Fallback to active node"
return online_nodes[0].node_id, "Fallback to first available node"

109
router/rpc.py Normal file
View File

@ -0,0 +1,109 @@
"""RPC module for forwarding requests to host clients.
Handles:
- Request correlation with asyncio.Future
- Timeout management
- Response routing
"""
from __future__ import annotations
import asyncio
import logging
import uuid
from typing import Any, Dict, Optional
from shared import ForwardRequest, ForwardResponse, TaskComplete, encode
from router.nodes import get_node_registry
logger = logging.getLogger(__name__)
_pending_requests: Dict[str, asyncio.Future] = {}
_default_timeout = 600.0
async def forward(
node_id: str,
user_id: str,
chat_id: str,
text: str,
timeout: float = _default_timeout,
) -> str:
"""Forward a message to a host client and wait for response.
Args:
node_id: Target node ID
user_id: User's Feishu open_id
chat_id: Chat ID for context
text: Message text to forward
timeout: Timeout in seconds (default 600s for long CC tasks)
Returns:
Reply text from the host client
Raises:
asyncio.TimeoutError: If no response within timeout
RuntimeError: If node is not connected
"""
registry = get_node_registry()
node = registry.get_node(node_id)
if not node or not node.ws:
raise RuntimeError(f"Node not connected: {node_id}")
request_id = str(uuid.uuid4())
future: asyncio.Future = asyncio.get_event_loop().create_future()
_pending_requests[request_id] = future
request = ForwardRequest(
id=request_id,
user_id=user_id,
chat_id=chat_id,
text=text,
)
try:
await node.ws.send_text(encode(request))
logger.debug("Forwarded request %s to node %s", request_id, node_id)
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
logger.warning("Request %s timed out after %ss", request_id, timeout)
raise
except Exception as e:
logger.error("Failed to forward request %s: %s", request_id, e)
raise
finally:
_pending_requests.pop(request_id, None)
async def resolve_response(response: ForwardResponse) -> None:
"""Resolve a pending request with a response."""
future = _pending_requests.get(response.id)
if future and not future.done():
if response.error:
future.set_exception(RuntimeError(response.error))
else:
future.set_result(response.reply)
logger.debug("Resolved request %s", response.id)
async def handle_task_complete(msg: TaskComplete) -> None:
"""Handle a task completion notification from a host client."""
logger.info("Task %s completed for user %s", msg.task_id, msg.user_id)
from bot.feishu import send_text
try:
await send_text(msg.chat_id, "chat_id", msg.result)
except Exception as e:
logger.error("Failed to send task completion notification: %s", e)
def get_pending_count() -> int:
"""Get the number of pending requests."""
return len(_pending_requests)

102
router/ws.py Normal file
View File

@ -0,0 +1,102 @@
"""WebSocket endpoint for host client connections.
Handles:
- Connection authentication
- Node registration
- Message forwarding
- Heartbeat
"""
from __future__ import annotations
import asyncio
import logging
from typing import Optional
from fastapi import WebSocket, WebSocketDisconnect, WebSocketException
from router.nodes import get_node_registry
from router.rpc import handle_task_complete
from shared import (
RegisterMessage,
ForwardRequest,
ForwardResponse,
TaskComplete,
Heartbeat,
NodeStatus,
decode,
encode,
)
logger = logging.getLogger(__name__)
async def ws_node_endpoint(websocket: WebSocket) -> None:
"""WebSocket endpoint for host client connections."""
await websocket.accept()
registry = get_node_registry()
secret = websocket.headers.get("authorization", "")
if secret.startswith("Bearer "):
secret = secret[7:]
if not registry.validate_secret(secret):
logger.warning("Invalid router secret, rejecting connection")
await websocket.close(code=4001, reason="Invalid secret")
return
node_id: Optional[str] = None
heartbeat_task: Optional[asyncio.Task] = None
async def send_heartbeat():
"""Send periodic pings to the host client."""
try:
while True:
await asyncio.sleep(30)
try:
await websocket.send_text(encode(Heartbeat(type="ping")))
except Exception:
break
except asyncio.CancelledError:
pass
try:
async for data in websocket.iter_text():
try:
msg = decode(data)
except Exception as e:
logger.error("Failed to decode message: %s", e)
continue
if isinstance(msg, RegisterMessage):
node_id = msg.node_id
await registry.register(websocket, msg)
heartbeat_task = asyncio.create_task(send_heartbeat())
elif isinstance(msg, ForwardResponse):
from router.rpc import resolve_response
await resolve_response(msg)
elif isinstance(msg, TaskComplete):
await handle_task_complete(msg)
elif isinstance(msg, Heartbeat):
if msg.type == "pong" and node_id:
await registry.update_heartbeat(node_id)
elif isinstance(msg, NodeStatus):
await registry.update_status(msg)
else:
logger.debug("Received unhandled message type: %s", type(msg).__name__)
except WebSocketDisconnect:
logger.info("WebSocket disconnected")
except Exception as e:
logger.exception("WebSocket error: %s", e)
finally:
if heartbeat_task:
heartbeat_task.cancel()
if node_id:
await registry.unregister(node_id)

23
shared/__init__.py Normal file
View File

@ -0,0 +1,23 @@
"""Shared module for Router <-> Host Client communication."""
from shared.protocol import (
RegisterMessage,
ForwardRequest,
ForwardResponse,
TaskComplete,
Heartbeat,
NodeStatus,
encode,
decode,
)
__all__ = [
"RegisterMessage",
"ForwardRequest",
"ForwardResponse",
"TaskComplete",
"Heartbeat",
"NodeStatus",
"encode",
"decode",
]

94
shared/protocol.py Normal file
View File

@ -0,0 +1,94 @@
"""Shared protocol module for Router <-> Host Client communication.
All message types are dataclasses that serialize to/from JSON.
Both router and host client import from this module.
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class RegisterMessage:
"""Host client -> Router: Register this node."""
type: str = "register"
node_id: str = ""
serves_users: List[str] = field(default_factory=list)
working_dir: str = ""
capabilities: List[str] = field(default_factory=list)
display_name: str = ""
@dataclass
class ForwardRequest:
"""Router -> Host client: Forward a user message."""
type: str = "forward"
id: str = ""
user_id: str = ""
chat_id: str = ""
text: str = ""
@dataclass
class ForwardResponse:
"""Host client -> Router: Reply to a forwarded message."""
type: str = "forward_response"
id: str = ""
reply: str = ""
error: str = ""
@dataclass
class TaskComplete:
"""Host client -> Router: Background task finished."""
type: str = "task_complete"
task_id: str = ""
user_id: str = ""
chat_id: str = ""
result: str = ""
@dataclass
class Heartbeat:
"""Bidirectional ping/pong."""
type: str = "ping"
@dataclass
class NodeStatus:
"""Host client -> Router: Periodic status update."""
type: str = "node_status"
node_id: str = ""
sessions: int = 0
active_sessions: List[Dict[str, Any]] = field(default_factory=list)
MESSAGE_TYPES = {
"register": RegisterMessage,
"forward": ForwardRequest,
"forward_response": ForwardResponse,
"task_complete": TaskComplete,
"ping": Heartbeat,
"pong": Heartbeat,
"node_status": NodeStatus,
}
def encode(msg: Any) -> str:
"""Encode a message to JSON string."""
if hasattr(msg, "type"):
return json.dumps(asdict(msg), ensure_ascii=False)
raise ValueError(f"Invalid message type: {type(msg)}")
def decode(data: str) -> Any:
"""Decode a JSON string to a message object."""
obj = json.loads(data)
msg_type = obj.get("type")
if msg_type not in MESSAGE_TYPES:
raise ValueError(f"Unknown message type: {msg_type}")
cls = MESSAGE_TYPES[msg_type]
return cls(**{k: v for k, v in obj.items() if k in cls.__dataclass_fields__})

72
standalone.py Normal file
View File

@ -0,0 +1,72 @@
"""Run router + host client in a single process (localhost mode).
Equivalent to the pre-M3 single-machine setup.
Users run `python standalone.py` and get the exact same experience as `python main.py`,
but the code paths use the multi-host architecture internally.
"""
from __future__ import annotations
import asyncio
import logging
import secrets
import sys
import uvicorn
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
async def run_standalone() -> None:
"""Run router + host client in a single process."""
secret = secrets.token_hex(16)
router_url = "ws://127.0.0.1:8000/ws/node"
from router.main import create_app
from host_client.main import NodeClient
from host_client.config import HostConfig
config = HostConfig.from_keyring()
config.router_url = router_url
config.router_secret = secret
app = create_app(router_secret=secret)
config_obj = uvicorn.Config(
app,
host="0.0.0.0",
port=8000,
log_level="info",
)
server = uvicorn.Server(config_obj)
async def run_server():
await server.serve()
async def run_client():
await asyncio.sleep(1.5)
client = NodeClient(config)
try:
await client.run()
except Exception as e:
logger.exception("Host client error: %s", e)
server.should_exit = True
await asyncio.gather(run_server(), run_client())
def main() -> None:
"""Entry point."""
logger.info("Starting PhoneWork in standalone mode...")
try:
asyncio.run(run_standalone())
except KeyboardInterrupt:
logger.info("Shutting down...")
if __name__ == "__main__":
main()