"""Feishu event handler using lark-oapi long-connection (WebSocket) mode.""" from __future__ import annotations import asyncio import json import logging import threading import lark_oapi as lark from lark_oapi.api.im.v1 import P2ImMessageReceiveV1 from bot.feishu import send_text from config import FEISHU_APP_ID, FEISHU_APP_SECRET from orchestrator.agent import agent logger = logging.getLogger(__name__) # Keep a reference to the running event loop so sync callbacks can schedule coroutines _main_loop: asyncio.AbstractEventLoop | None = None def _handle_message(data: P2ImMessageReceiveV1) -> None: """ Synchronous callback invoked by the lark-oapi SDK on every incoming message. We schedule async work onto the main event loop. """ try: message = data.event.message sender = data.event.sender # Log raw event for debugging logger.debug( "event type=%r chat_type=%r content=%r", getattr(message, "message_type", None), getattr(message, "chat_type", None), (getattr(message, "content", None) or "")[:100], ) # Only handle text messages if message.message_type != "text": logger.info("Skipping non-text message_type=%r", message.message_type) return # Extract fields chat_id: str = message.chat_id raw_content: str = message.content or "{}" content_obj = json.loads(raw_content) text: str = content_obj.get("text", "").strip() # Strip @mentions (Feishu injects "@bot_name " at start of group messages) import re text = re.sub(r"@\S+\s*", "", text).strip() open_id: str = "" if sender and sender.sender_id: open_id = sender.sender_id.open_id or "" if not text: logger.info("Empty text after stripping, ignoring") return logger.info("✉ ...%s → %r", open_id[-8:], text[:80]) # Use open_id as user identifier for per-user history in the orchestrator user_id = open_id or chat_id if _main_loop is None: logger.error("Main event loop not set; cannot process message") return # Schedule async processing; fire-and-forget asyncio.run_coroutine_threadsafe( _process_message(user_id, chat_id, text), _main_loop, ) except Exception: logger.exception("Error in _handle_message") async def _process_message(user_id: str, chat_id: str, text: str) -> None: """Run the orchestration agent and send the reply back to Feishu.""" try: reply = await agent.run(user_id, text) if reply: await send_text(chat_id, "chat_id", reply) except Exception: logger.exception("Error processing message for user %s", user_id) def _handle_any(data: lark.CustomizedEvent) -> None: """Catch-all handler to log any event the SDK receives.""" logger.info("RAW CustomizedEvent: %s", lark.JSON.marshal(data)[:500]) def build_event_handler() -> lark.EventDispatcherHandler: """Construct the EventDispatcherHandler with all registered callbacks.""" handler = ( lark.EventDispatcherHandler.builder("", "") .register_p2_im_message_receive_v1(_handle_message) .register_p1_customized_event("im.message.receive_v1", _handle_any) .build() ) return handler def start_websocket_client(loop: asyncio.AbstractEventLoop) -> None: """ Start the lark-oapi WebSocket long-connection client in a background thread. Must be called after the asyncio event loop is running. """ global _main_loop _main_loop = loop event_handler = build_event_handler() ws_client = lark.ws.Client( FEISHU_APP_ID, FEISHU_APP_SECRET, event_handler=event_handler, log_level=lark.LogLevel.INFO, ) def _run() -> None: logger.info("Starting Feishu long-connection client...") ws_client.start() # blocks until disconnected thread = threading.Thread(target=_run, daemon=True, name="feishu-ws") thread.start() logger.info("Feishu WebSocket thread started")