PhoneWork/router/rpc.py
Yuyao Huang (Sam) a3622ce26d refactor: 替换 asyncio.get_event_loop 为 get_running_loop 并优化会话卡片
- 将多处 asyncio.get_event_loop() 替换为更安全的 asyncio.get_running_loop()
- 重构 Feishu 卡片功能,新增 build_sessions_card 方法显示所有会话
- 优化文件路径处理逻辑,支持绝对路径和相对路径
- 在健康检查接口中添加 pending_requests 计数
- 更新会话状态命令以支持卡片显示
2026-03-28 14:59:33 +08:00

110 lines
3.0 KiB
Python

"""RPC module for forwarding requests to host clients.
Handles:
- Request correlation with asyncio.Future
- Timeout management
- Response routing
"""
from __future__ import annotations
import asyncio
import logging
import uuid
from typing import Any, Dict, Optional
from shared import ForwardRequest, ForwardResponse, TaskComplete, encode
from router.nodes import get_node_registry
logger = logging.getLogger(__name__)
_pending_requests: dict[str, asyncio.Future[str]] = {}
_default_timeout = 600.0
async def forward(
node_id: str,
user_id: str,
chat_id: str,
text: str,
timeout: float = _default_timeout,
) -> str:
"""Forward a message to a host client and wait for response.
Args:
node_id: Target node ID
user_id: User's Feishu open_id
chat_id: Chat ID for context
text: Message text to forward
timeout: Timeout in seconds (default 600s for long CC tasks)
Returns:
Reply text from the host client
Raises:
asyncio.TimeoutError: If no response within timeout
RuntimeError: If node is not connected
"""
registry = get_node_registry()
node = registry.get_node(node_id)
if not node or not node.ws:
raise RuntimeError(f"Node not connected: {node_id}")
request_id = str(uuid.uuid4())
future: asyncio.Future[str] = asyncio.get_running_loop().create_future()
_pending_requests[request_id] = future
request = ForwardRequest(
id=request_id,
user_id=user_id,
chat_id=chat_id,
text=text,
)
try:
await node.ws.send_text(encode(request))
logger.debug("Forwarded request %s to node %s", request_id, node_id)
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
logger.warning("Request %s timed out after %ss", request_id, timeout)
raise
except Exception as e:
logger.error("Failed to forward request %s: %s", request_id, e)
raise
finally:
_pending_requests.pop(request_id, None)
async def resolve_response(response: ForwardResponse) -> None:
"""Resolve a pending request with a response."""
future = _pending_requests.get(response.id)
if future and not future.done():
if response.error:
future.set_exception(RuntimeError(response.error))
else:
future.set_result(response.reply)
logger.debug("Resolved request %s", response.id)
async def handle_task_complete(msg: TaskComplete) -> None:
"""Handle a task completion notification from a host client."""
logger.info("Task %s completed for user %s", msg.task_id, msg.user_id)
from bot.feishu import send_text
try:
await send_text(msg.chat_id, "chat_id", msg.result)
except Exception as e:
logger.error("Failed to send task completion notification: %s", e)
def get_pending_count() -> int:
"""Get the number of pending requests."""
return len(_pending_requests)