"""RPC module for forwarding requests to host clients. Handles: - Request correlation with asyncio.Future - Timeout management - Response routing """ from __future__ import annotations import asyncio import logging import uuid from typing import Any, Dict, Optional from shared import ForwardRequest, ForwardResponse, TaskComplete, encode from router.nodes import get_node_registry logger = logging.getLogger(__name__) _pending_requests: dict[str, asyncio.Future[str]] = {} _default_timeout = 600.0 async def forward( node_id: str, user_id: str, chat_id: str, text: str, timeout: float = _default_timeout, ) -> str: """Forward a message to a host client and wait for response. Args: node_id: Target node ID user_id: User's Feishu open_id chat_id: Chat ID for context text: Message text to forward timeout: Timeout in seconds (default 600s for long CC tasks) Returns: Reply text from the host client Raises: asyncio.TimeoutError: If no response within timeout RuntimeError: If node is not connected """ registry = get_node_registry() node = registry.get_node(node_id) if not node or not node.ws: raise RuntimeError(f"Node not connected: {node_id}") request_id = str(uuid.uuid4()) future: asyncio.Future[str] = asyncio.get_running_loop().create_future() _pending_requests[request_id] = future request = ForwardRequest( id=request_id, user_id=user_id, chat_id=chat_id, text=text, ) try: await node.ws.send_text(encode(request)) logger.debug("Forwarded request %s to node %s", request_id, node_id) result = await asyncio.wait_for(future, timeout=timeout) return result except asyncio.TimeoutError: logger.warning("Request %s timed out after %ss", request_id, timeout) raise except Exception as e: logger.error("Failed to forward request %s: %s", request_id, e) raise finally: _pending_requests.pop(request_id, None) async def resolve_response(response: ForwardResponse) -> None: """Resolve a pending request with a response.""" future = _pending_requests.get(response.id) if future and not future.done(): if response.error: future.set_exception(RuntimeError(response.error)) else: future.set_result(response.reply) logger.debug("Resolved request %s", response.id) async def handle_task_complete(msg: TaskComplete) -> None: """Handle a task completion notification from a host client.""" logger.info("Task %s completed for user %s", msg.task_id, msg.user_id) from bot.feishu import send_text try: await send_text(msg.chat_id, "chat_id", msg.result) except Exception as e: logger.error("Failed to send task completion notification: %s", e) def get_pending_count() -> int: """Get the number of pending requests.""" return len(_pending_requests)