diff --git a/bot/handler.py b/bot/handler.py index 60701a3..e8404c2 100644 --- a/bot/handler.py +++ b/bot/handler.py @@ -129,6 +129,11 @@ async def _process_message(user_id: str, chat_id: str, text: str) -> None: await send_text(chat_id, "chat_id", "Sorry, you are not authorized to use this bot.") return + from config import ROUTER_MODE + if ROUTER_MODE: + from router.nodes import get_node_registry + get_node_registry().track_user(user_id) + reply = await handle_command(user_id, text) if reply is not None: if reply: diff --git a/router/nodes.py b/router/nodes.py index 04e178e..c3fe818 100644 --- a/router/nodes.py +++ b/router/nodes.py @@ -59,6 +59,7 @@ class NodeRegistry: self._nodes: dict[str, NodeConnection] = {} self._user_nodes: dict[str, Set[str]] = {} self._active_node: dict[str, str] = {} + self._known_users: Set[str] = set() self._secret = router_secret self._lock = asyncio.Lock() @@ -68,6 +69,20 @@ class NodeRegistry: return True return secret == self._secret + def track_user(self, user_id: str) -> None: + """Record a user as known (has sent at least one message).""" + self._known_users.add(user_id) + + def _get_notifiable_users(self, node: NodeConnection) -> Set[str]: + """Get users to notify about a node event. + + If the node has explicit serves_users, return those. + Otherwise (serves everyone), return all known users. + """ + if node.serves_users: + return node.serves_users + return self._known_users.copy() + async def register(self, ws: Any, msg: RegisterMessage) -> NodeConnection: """Register a new node connection.""" async with self._lock: @@ -95,12 +110,24 @@ class NodeRegistry: msg.capabilities, ) + users_to_notify = self._get_notifiable_users(node) if is_reconnect: - for user_id in msg.serves_users: + for user_id in users_to_notify: asyncio.create_task(self._notify_reconnect(user_id, node.display_name)) + else: + for user_id in users_to_notify: + asyncio.create_task(self._notify_new_node(user_id, node.display_name)) return node + async def _notify_new_node(self, user_id: str, node_name: str) -> None: + """Notify user about a new node coming online.""" + try: + from bot.feishu import send_text + await send_text(user_id, "open_id", f"🟢 Node \"{node_name}\" is online.") + except Exception as e: + logger.error("Failed to send new node notification: %s", e) + async def _notify_reconnect(self, user_id: str, node_name: str) -> None: """Notify user about node reconnect.""" try: @@ -114,7 +141,7 @@ class NodeRegistry: async with self._lock: node = self._nodes.pop(node_id, None) if node: - affected_users = list(node.serves_users) + users_to_notify = self._get_notifiable_users(node) for user_id in node.serves_users: if user_id in self._user_nodes: @@ -128,7 +155,7 @@ class NodeRegistry: logger.info("Node unregistered: %s", node_id) - for user_id in affected_users: + for user_id in users_to_notify: asyncio.create_task(self._notify_disconnect(user_id, node.display_name)) async def _notify_disconnect(self, user_id: str, node_name: str) -> None: