feat: 重构数据存储路径并优化任务通知机制

将审计日志、会话数据和定时任务文件移动到统一的data目录下
为后台任务添加完成回调功能,优化CC任务完成后的通知流程
更新README和ROADMAP文档,标记已完成的功能项
This commit is contained in:
Yuyao Huang (Sam) 2026-03-29 02:32:48 +08:00
parent 80e4953cf9
commit 52a9d085f7
10 changed files with 200 additions and 58 deletions

8
.gitignore vendored
View File

@ -70,3 +70,11 @@ dmypy.json
# Ruff # Ruff
.ruff_cache/ .ruff_cache/
# Runtime data (sessions, audit logs, scheduled jobs)
data/
# Legacy paths (pre-consolidation)
sessions.json
scheduled_jobs.json
audit/

119
README.md
View File

@ -4,27 +4,55 @@ Feishu bot that lets users control Claude Code CLI from their phone.
## Architecture ## Architecture
PhoneWork uses a **Router + Host Client** architecture that supports both single-machine and multi-host deployments:
``` ```
┌─────────────┐ WebSocket ┌──────────────┐ LangChain ┌─────────────┐ ┌─────────────────┐ ┌──────────┐ WebSocket ┌────────────────────────────────────┐
│ Feishu │ ◄──────────────► │ FastAPI │ ◄──────────────► │ LLM API │ │ Feishu App │ │ Feishu │◄────────────►│ Router (public VPS) │
│ (client) │ │ (server) │ │ (ZhipuAI) │ │ (User's Phone) │◄───────►│ Cloud │ │ - Feishu event handler │
└─────────────┘ └──────────────┘ └─────────────┘ └─────────────────┘ └──────────┘ │ - Router LLM (routing only) │
│ - Node registry + active node map │
└───────────┬────────────────────────┘
┌─────────────┐ │ WebSocket (host clients connect in)
│ Claude Code │ ┌───────────┴────────────────────────┐
│ (headless) │ │ │
└─────────────┘ ┌──────────▼──────────┐ ┌────────────▼────────┐
│ Host Client A │ │ Host Client B │
│ (home-pc) │ │ (work-server) │
│ - Mailboy LLM │ │ - Mailboy LLM │
│ - CC sessions │ │ - CC sessions │
│ - Shell / files │ │ - Shell / files │
└─────────────────────┘ └─────────────────────┘
``` ```
**Key design decisions:**
- Host clients connect TO the router (outbound WebSocket) — NAT-transparent
- A user can be registered on multiple nodes simultaneously
- The **router LLM** decides *which node* to route each message to
- The **node mailboy LLM** handles the full orchestration loop
- Each node maintains its own conversation history per user
**Deployment modes:**
- **Standalone (`python standalone.py`):** Runs router + host client at localhost. Same architecture, simpler setup for single-machine use.
- **Multi-host:** Router on a public VPS, host clients behind NAT on different machines.
**Components:** **Components:**
| Module | Purpose | | Module | Purpose |
|--------|---------| |--------|---------|
| `main.py` | FastAPI entry point, starts WebSocket client + session manager + scheduler | | `standalone.py` | Single-process entry point: runs router + host client together |
| `main.py` | FastAPI entry point for router-only mode |
| `shared/protocol.py` | Wire protocol for router-host communication |
| `router/main.py` | FastAPI app factory, mounts `/ws/node` endpoint |
| `router/nodes.py` | Node registry, connection management, user-to-node mapping |
| `router/ws.py` | WebSocket endpoint for host clients, heartbeat, message routing |
| `router/rpc.py` | Request correlation with asyncio.Future, timeout handling |
| `router/routing_agent.py` | Single-shot routing LLM to decide which node handles each message |
| `host_client/main.py` | WebSocket client connecting to router, message handling, reconnection |
| `host_client/config.py` | Host client configuration loader |
| `bot/handler.py` | Receives Feishu events via long-connection WebSocket | | `bot/handler.py` | Receives Feishu events via long-connection WebSocket |
| `bot/feishu.py` | Sends text/file/card replies back to Feishu | | `bot/feishu.py` | Sends text/file replies back to Feishu |
| `bot/commands.py` | Slash command handler (`/new`, `/status`, `/shell`, `/remind`, `/tasks`, etc.) | | `bot/commands.py` | Slash command handler (`/new`, `/status`, `/shell`, `/remind`, `/tasks`, `/nodes`, `/node`) |
| `orchestrator/agent.py` | LangChain agent with per-user history + direct/smart mode + direct Q&A | | `orchestrator/agent.py` | LangChain agent with per-user history + direct/smart mode + direct Q&A |
| `orchestrator/tools.py` | Tools: session management, shell, file ops, web search, scheduler, task status | | `orchestrator/tools.py` | Tools: session management, shell, file ops, web search, scheduler, task status |
| `agent/manager.py` | Session registry with persistence, idle timeout, and auto-background tasks | | `agent/manager.py` | Session registry with persistence, idle timeout, and auto-background tasks |
@ -122,6 +150,33 @@ ALLOWED_OPEN_IDS:
# Optional: 秘塔AI Search API key for web search functionality # Optional: 秘塔AI Search API key for web search functionality
# Get your key at: https://metaso.cn/search-api/api-keys # Get your key at: https://metaso.cn/search-api/api-keys
METASO_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx METASO_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# Optional: Multi-host mode configuration
# Set ROUTER_MODE to true to enable router mode (deploy on public VPS)
ROUTER_MODE: false
ROUTER_SECRET: your-shared-secret-for-router-host-auth
```
### Host Client Configuration (for multi-host mode)
Create `host_config.yaml` for each host client:
```yaml
NODE_ID: home-pc
DISPLAY_NAME: Home PC
ROUTER_URL: wss://router.example.com/ws/node
ROUTER_SECRET: <shared_secret>
OPENAI_BASE_URL: https://open.bigmodel.cn/api/paas/v4/
OPENAI_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_MODEL: glm-4.7
WORKING_DIR: C:/Users/me/projects
METASO_API_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# Which Feishu open_ids this node serves
SERVES_USERS:
- ou_abc123def456
``` ```
--- ---
@ -162,6 +217,8 @@ Active sessions: `GET /sessions`
| `/shell <cmd>` | Run a shell command directly (bypasses LLM) | | `/shell <cmd>` | Run a shell command directly (bypasses LLM) |
| `/remind <time> <msg>` | Set a reminder (e.g., `/remind 10m check build`) | | `/remind <time> <msg>` | Set a reminder (e.g., `/remind 10m check build`) |
| `/tasks` | List background tasks with status | | `/tasks` | List background tasks with status |
| `/nodes` | List connected host nodes (multi-host mode) |
| `/node <name>` | Switch active node (multi-host mode) |
| `/help` | Show command reference | | `/help` | Show command reference |
### Message Routing Modes ### Message Routing Modes
@ -240,3 +297,39 @@ Claude Code slash commands (like `/help`, `/clear`, `/compact`, `/cost`) are pas
- Schedule recurring reminders - Schedule recurring reminders
- Notifications delivered to Feishu - Notifications delivered to Feishu
- Persistent across server restarts - Persistent across server restarts
### Multi-Host Architecture (Milestone 3)
#### Deployment Options
**Single-Machine Mode:**
```bash
python standalone.py
```
Runs both router and host client in one process. Identical UX to pre-M3 setup.
**Router Mode (Public VPS):**
```bash
# Set ROUTER_MODE: true in keyring.yaml
python main.py
```
Runs only the router: Feishu handler + routing LLM + node registry.
**Host Client Mode (Behind NAT):**
```bash
# Create host_config.yaml with ROUTER_URL and ROUTER_SECRET
python -m host_client.main
```
Connects to router via WebSocket, runs full mailboy stack locally.
#### Node Management
- `/nodes` — View all connected host nodes with status
- `/node <name>` — Switch active node for your user
- Automatic routing: LLM decides which node handles each message
- Health monitoring: Router tracks node heartbeats
- Reconnection: Host clients auto-reconnect on disconnect
#### Security
- Shared secret authentication between router and host clients
- User isolation: Each node only serves configured users
- Path sandboxing: Sessions restricted to WORKING_DIR

View File

@ -1,9 +1,9 @@
# PhoneWork — Roadmap # PhoneWork — Roadmap
## Milestone 2: Mailboy as a Versatile Assistant ## Milestone 2: Mailboy as a Versatile Assistant (COMPLETED)
**Goal:** Elevate the mailboy (GLM-4.7 orchestrator) from a mere Claude Code relay into a **Goal:** Elevate the mailboy (GLM-4.7 orchestrator) from a mere Claude Code relay into a
fully capable phone assistant. Users should be able to control their machine, manage files, fully capable phone assistant. Users can control their machine, manage files,
search the web, get direct answers, and track long-running tasks — all without necessarily search the web, get direct answers, and track long-running tasks — all without necessarily
opening a Claude Code session. opening a Claude Code session.
@ -177,21 +177,21 @@ args: action ("remind" | "repeat"), delay_seconds (int), interval_seconds (int),
## Verification Checklist ## Verification Checklist
- [ ] M2.1: Ask "what is a Python generator?" — mailboy replies directly, no tool call - [x] M2.1: Ask "what is a Python generator?" — mailboy replies directly, no tool call
- [ ] M2.2: Send "check git status in todo_app" — `ShellTool` runs, output returned - [x] M2.2: Send "check git status in todo_app" — `ShellTool` runs, output returned
- [ ] M2.2: Send "rm -rf /" — blocked by safety guard - [x] M2.2: Send "rm -rf /" — blocked by safety guard
- [ ] M2.3: Send "show me the last 50 lines of audit/abc123.jsonl" — file content returned - [x] M2.3: Send "show me the last 50 lines of audit/abc123.jsonl" — file content returned
- [ ] M2.3: Send "send me the sessions.json file" — file arrives in Feishu chat - [x] M2.3: Send "send me the sessions.json file" — file arrives in Feishu chat
- [ ] M2.4: Start a long CC task (e.g. `--timeout 120`) — bot replies immediately, notifies on finish - [x] M2.4: Start a long CC task (e.g. `--timeout 120`) — bot replies immediately, notifies on finish
- [ ] M2.4: `/tasks` — lists running task with elapsed time - [x] M2.4: `/tasks` — lists running task with elapsed time
- [ ] M2.5: "Python 3.13 有哪些新特性?" — `web ask` returns RAG answer from metaso - [x] M2.5: "Python 3.13 有哪些新特性?" — `web ask` returns RAG answer from metaso
- [ ] M2.5: "帮我读取这个URL: https://example.com" — page content extracted as markdown - [x] M2.5: "帮我读取这个URL: https://example.com" — page content extracted as markdown
- [ ] M2.6: `/remind 10m deploy check` — 10 min later, message arrives in Feishu - [x] M2.6: `/remind 10m deploy check` — 10 min later, message arrives in Feishu
--- ---
--- ---
## Milestone 3: Multi-Host Architecture (Router / Host Client Split) ## Milestone 3: Multi-Host Architecture (Router / Host Client Split) (COMPLETED)
**Goal:** Split PhoneWork into two deployable components — a public-facing **Router** and **Goal:** Split PhoneWork into two deployable components — a public-facing **Router** and
one or more **Host Clients** behind NAT. A user can be served by multiple nodes simultaneously. one or more **Host Clients** behind NAT. A user can be served by multiple nodes simultaneously.
@ -519,16 +519,16 @@ PhoneWork/
## M3 Verification Checklist ## M3 Verification Checklist
- [ ] `python standalone.py` — works identically to current `python main.py` - [x] `python standalone.py` — works identically to current `python main.py`
- [ ] Router starts, host client connects, registration logged - [x] Router starts, host client connects, registration logged
- [ ] Feishu message → routing LLM selects node → forwarded → reply returned - [x] Feishu message → routing LLM selects node → forwarded → reply returned
- [ ] `/nodes` shows all connected nodes with active marker - [x] `/nodes` shows all connected nodes with active marker
- [ ] `/node work-server` — switches active node, confirmed in next message - [x] `/node work-server` — switches active node, confirmed in next message
- [ ] Two nodes serving same user — message routed to active node - [x] Two nodes serving same user — message routed to active node
- [ ] Kill host client → router marks offline, user sees "Node home-pc is offline" - [x] Kill host client → router marks offline, user sees "Node home-pc is offline"
- [ ] Host client reconnects → re-registered, messages flow again - [x] Host client reconnects → re-registered, messages flow again
- [ ] Long CC task on node finishes → router forwards completion notification to Feishu - [x] Long CC task on node finishes → router forwards completion notification to Feishu
- [ ] Wrong `ROUTER_SECRET` → connection rejected with 401 - [x] Wrong `ROUTER_SECRET` → connection rejected with 401
--- ---

View File

@ -10,7 +10,7 @@ from typing import Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
AUDIT_DIR = Path(__file__).parent.parent / "audit" AUDIT_DIR = Path(__file__).parent.parent / "data" / "audit"
def _ensure_audit_dir() -> None: def _ensure_audit_dir() -> None:

View File

@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
DEFAULT_IDLE_TIMEOUT = 30 * 60 DEFAULT_IDLE_TIMEOUT = 30 * 60
DEFAULT_CC_TIMEOUT = 300.0 DEFAULT_CC_TIMEOUT = 300.0
PERSISTENCE_FILE = Path(__file__).parent.parent / "sessions.json" PERSISTENCE_FILE = Path(__file__).parent.parent / "data" / "sessions.json"
@dataclass @dataclass
@ -105,7 +105,7 @@ class SessionManager:
if cc_timeout > 60: if cc_timeout > 60:
from agent.task_runner import task_runner from agent.task_runner import task_runner
from orchestrator.tools import get_current_chat from orchestrator.tools import get_current_chat, set_current_chat, set_current_user
chat_id = get_current_chat() chat_id = get_current_chat()
@ -126,10 +126,29 @@ class SessionManager:
) )
return output return output
async def on_task_complete(task) -> None:
if not chat_id or not user_id or not task.result:
return
set_current_user(user_id)
set_current_chat(chat_id)
from orchestrator.agent import agent
follow_up = (
f"CC task completed. Output:\n{task.result}\n\n"
f"Original request was: {message}\n\n"
"If the user asked you to send a file, use send_file now. "
"Otherwise just acknowledge completion."
)
reply = await agent.run(user_id, follow_up)
if reply:
from bot.feishu import send_text
await send_text(chat_id, "chat_id", reply)
task_id = await task_runner.submit( task_id = await task_runner.submit(
run_task, run_task(),
description=f"CC session {conv_id}: {message[:50]}", description=f"CC session {conv_id}: {message[:50]}",
notify_chat_id=chat_id, notify_chat_id=chat_id,
user_id=user_id,
on_complete=on_task_complete,
) )
return f"⏳ Task #{task_id} started (timeout: {int(cc_timeout)}s). I'll notify you when it's done." return f"⏳ Task #{task_id} started (timeout: {int(cc_timeout)}s). I'll notify you when it's done."
@ -183,6 +202,7 @@ class SessionManager:
def _save(self) -> None: def _save(self) -> None:
try: try:
data = {cid: s.to_dict() for cid, s in self._sessions.items()} data = {cid: s.to_dict() for cid, s in self._sessions.items()}
PERSISTENCE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f: with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2) json.dump(data, f, indent=2)
logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE) logger.debug("Saved %d sessions to %s", len(data), PERSISTENCE_FILE)

View File

@ -14,7 +14,7 @@ from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
PERSISTENCE_FILE = Path(__file__).parent.parent / "scheduled_jobs.json" PERSISTENCE_FILE = Path(__file__).parent.parent / "data" / "scheduled_jobs.json"
class JobStatus(str, Enum): class JobStatus(str, Enum):
@ -98,6 +98,7 @@ class Scheduler:
"""Save jobs to persistence file.""" """Save jobs to persistence file."""
try: try:
data = {jid: job.to_dict() for jid, job in self._jobs.items()} data = {jid: job.to_dict() for jid, job in self._jobs.items()}
PERSISTENCE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f: with open(PERSISTENCE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False) json.dump(data, f, indent=2, ensure_ascii=False)
except Exception: except Exception:

View File

@ -57,6 +57,7 @@ class TaskRunner:
description: str, description: str,
notify_chat_id: Optional[str] = None, notify_chat_id: Optional[str] = None,
user_id: Optional[str] = None, user_id: Optional[str] = None,
on_complete: Optional[Callable[[BackgroundTask], Awaitable[None]]] = None,
) -> str: ) -> str:
"""Submit a coroutine as a background task.""" """Submit a coroutine as a background task."""
task_id = str(uuid.uuid4())[:8] task_id = str(uuid.uuid4())[:8]
@ -72,11 +73,11 @@ class TaskRunner:
async with self._lock: async with self._lock:
self._tasks[task_id] = task self._tasks[task_id] = task
asyncio.create_task(self._run_task(task_id, coro)) asyncio.create_task(self._run_task(task_id, coro, on_complete))
logger.info("Submitted background task %s: %s", task_id, description) logger.info("Submitted background task %s: %s", task_id, description)
return task_id return task_id
async def _run_task(self, task_id: str, coro: Awaitable[Any]) -> None: async def _run_task(self, task_id: str, coro: Awaitable[Any], on_complete: Optional[Callable[[BackgroundTask], Awaitable[None]]] = None) -> None:
"""Execute a task and send notification on completion.""" """Execute a task and send notification on completion."""
async with self._lock: async with self._lock:
task = self._tasks.get(task_id) task = self._tasks.get(task_id)
@ -107,6 +108,12 @@ class TaskRunner:
else: else:
await self._send_notification(task) await self._send_notification(task)
if on_complete and task.status == TaskStatus.COMPLETED:
try:
await on_complete(task)
except Exception:
logger.exception("on_complete callback failed for task %s", task_id)
async def _send_notification(self, task: BackgroundTask) -> None: async def _send_notification(self, task: BackgroundTask) -> None:
"""Send Feishu notification about task completion.""" """Send Feishu notification about task completion."""
from bot.feishu import send_text from bot.feishu import send_text

View File

@ -184,22 +184,20 @@ async def send_file(receive_id: str, receive_id_type: str, file_path: str, file_
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
# Step 1: Upload file → get file_key # Step 1: Upload file → get file_key
with open(path, "rb") as f:
file_data = f.read()
def _upload(): def _upload():
req = ( with open(path, "rb") as f:
CreateFileRequest.builder() req = (
.request_body( CreateFileRequest.builder()
CreateFileRequestBody.builder() .request_body(
.file_type(file_type) CreateFileRequestBody.builder()
.file_name(file_name) .file_type(file_type)
.file(file_data) .file_name(file_name)
.file(f)
.build()
)
.build() .build()
) )
.build() return _client.im.v1.file.create(req)
)
return _client.im.v1.file.create(req)
upload_resp = await loop.run_in_executor(None, _upload) upload_resp = await loop.run_in_executor(None, _upload)

View File

@ -190,14 +190,14 @@ class NodeClient:
"""Periodic heartbeat loop.""" """Periodic heartbeat loop."""
while self._running: while self._running:
await asyncio.sleep(30) await asyncio.sleep(30)
if self.ws and self.ws.open: if self.ws:
await self.send_heartbeat() await self.send_heartbeat()
async def status_loop(self) -> None: async def status_loop(self) -> None:
"""Periodic status update loop.""" """Periodic status update loop."""
while self._running: while self._running:
await asyncio.sleep(60) await asyncio.sleep(60)
if self.ws and self.ws.open: if self.ws:
await self.send_status() await self.send_status()
async def run(self) -> None: async def run(self) -> None:

View File

@ -51,6 +51,10 @@ Your responsibilities:
6. WEB / SEARCH: Use the `web` tool when the user needs current information. \ 6. WEB / SEARCH: Use the `web` tool when the user needs current information. \
Call it ONCE (or at most twice with a refined query). Then synthesize and reply \ Call it ONCE (or at most twice with a refined query). Then synthesize and reply \
do NOT keep searching in a loop. If the first search returns results, use them. do NOT keep searching in a loop. If the first search returns results, use them.
7. BACKGROUND TASKS: When `create_conversation` or `send_to_conversation` returns a \
"Task #... started" message, the task is running in the background. \
Immediately reply to the user that the task has started and they will be notified. \
Do NOT call `task_status` in a loop waiting for it the system sends a notification when done.
Guidelines: Guidelines:
- Relay Claude Code's output verbatim. - Relay Claude Code's output verbatim.
@ -191,6 +195,7 @@ class OrchestrationAgent:
reply = "" reply = ""
try: try:
web_calls = 0 web_calls = 0
task_status_calls = 0
for iteration in range(MAX_ITERATIONS): for iteration in range(MAX_ITERATIONS):
logger.debug(" LLM call #%d", iteration) logger.debug(" LLM call #%d", iteration)
ai_msg: AIMessage = await self._llm_with_tools.ainvoke(messages) ai_msg: AIMessage = await self._llm_with_tools.ainvoke(messages)
@ -221,6 +226,16 @@ class OrchestrationAgent:
) )
continue continue
if tool_name == "task_status":
task_status_calls += 1
if task_status_calls > 1:
result = "Task is still running in the background. Stop polling and tell the user they will be notified when it completes."
logger.warning(" task_status poll limit exceeded, blocking")
messages.append(
ToolMessage(content=str(result), tool_call_id=tool_id)
)
continue
tool_obj = _TOOL_MAP.get(tool_name) tool_obj = _TOOL_MAP.get(tool_name)
if tool_obj is None: if tool_obj is None:
result = f"Unknown tool: {tool_name}" result = f"Unknown tool: {tool_name}"