From 2a8f745b3dfbda2e472ccf1547f026bd95e7d726 Mon Sep 17 00:00:00 2001 From: "Yuyao Huang (Sam)" Date: Sun, 29 Mar 2026 18:11:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(router):=20=E6=B7=BB=E5=8A=A0=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E8=BF=BD=E8=B8=AA=E5=92=8C=E8=8A=82=E7=82=B9=E9=80=9A?= =?UTF-8?q?=E7=9F=A5=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在ROUTER_MODE启用时跟踪用户消息,并在节点注册/注销时通知相关用户。新增_known_users集合记录活跃用户,重构通知逻辑以支持所有已知用户或特定服务用户的通知。 --- bot/handler.py | 5 +++++ router/nodes.py | 33 ++++++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/bot/handler.py b/bot/handler.py index 60701a3..e8404c2 100644 --- a/bot/handler.py +++ b/bot/handler.py @@ -129,6 +129,11 @@ async def _process_message(user_id: str, chat_id: str, text: str) -> None: await send_text(chat_id, "chat_id", "Sorry, you are not authorized to use this bot.") return + from config import ROUTER_MODE + if ROUTER_MODE: + from router.nodes import get_node_registry + get_node_registry().track_user(user_id) + reply = await handle_command(user_id, text) if reply is not None: if reply: diff --git a/router/nodes.py b/router/nodes.py index 04e178e..c3fe818 100644 --- a/router/nodes.py +++ b/router/nodes.py @@ -59,6 +59,7 @@ class NodeRegistry: self._nodes: dict[str, NodeConnection] = {} self._user_nodes: dict[str, Set[str]] = {} self._active_node: dict[str, str] = {} + self._known_users: Set[str] = set() self._secret = router_secret self._lock = asyncio.Lock() @@ -68,6 +69,20 @@ class NodeRegistry: return True return secret == self._secret + def track_user(self, user_id: str) -> None: + """Record a user as known (has sent at least one message).""" + self._known_users.add(user_id) + + def _get_notifiable_users(self, node: NodeConnection) -> Set[str]: + """Get users to notify about a node event. + + If the node has explicit serves_users, return those. + Otherwise (serves everyone), return all known users. + """ + if node.serves_users: + return node.serves_users + return self._known_users.copy() + async def register(self, ws: Any, msg: RegisterMessage) -> NodeConnection: """Register a new node connection.""" async with self._lock: @@ -95,12 +110,24 @@ class NodeRegistry: msg.capabilities, ) + users_to_notify = self._get_notifiable_users(node) if is_reconnect: - for user_id in msg.serves_users: + for user_id in users_to_notify: asyncio.create_task(self._notify_reconnect(user_id, node.display_name)) + else: + for user_id in users_to_notify: + asyncio.create_task(self._notify_new_node(user_id, node.display_name)) return node + async def _notify_new_node(self, user_id: str, node_name: str) -> None: + """Notify user about a new node coming online.""" + try: + from bot.feishu import send_text + await send_text(user_id, "open_id", f"🟢 Node \"{node_name}\" is online.") + except Exception as e: + logger.error("Failed to send new node notification: %s", e) + async def _notify_reconnect(self, user_id: str, node_name: str) -> None: """Notify user about node reconnect.""" try: @@ -114,7 +141,7 @@ class NodeRegistry: async with self._lock: node = self._nodes.pop(node_id, None) if node: - affected_users = list(node.serves_users) + users_to_notify = self._get_notifiable_users(node) for user_id in node.serves_users: if user_id in self._user_nodes: @@ -128,7 +155,7 @@ class NodeRegistry: logger.info("Node unregistered: %s", node_id) - for user_id in affected_users: + for user_id in users_to_notify: asyncio.create_task(self._notify_disconnect(user_id, node.display_name)) async def _notify_disconnect(self, user_id: str, node_name: str) -> None: