diff --git a/CLAUDE.md b/CLAUDE.md index 02bb435..1912bd3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -18,6 +18,7 @@ When writing code that uses `claude-agent-sdk`, **first read `docs/claude/`**: - `test_query_read_edit.py` — `query()` one-shot: Read + Edit with `allowed_tools` - `test_client_write_resume.py` — `ClaudeSDKClient`: Write + session resume via `resume=session_id` - `test_hooks_audit_deny.py` — Hooks: `PostToolUse` audit + `PreToolUse` deny +- `test_can_use_tool_ask.py` — `can_use_tool`: intercept `AskUserQuestion`, pre-fill answers via `updated_input` ### Key rules (verified by testing) diff --git a/agent/manager.py b/agent/manager.py index 0ff4ab2..3e176fc 100644 --- a/agent/manager.py +++ b/agent/manager.py @@ -162,6 +162,12 @@ class SessionManager: if session and session.sdk_session: await session.sdk_session.approve(approved) + async def answer_question(self, conv_id: str, answers: dict[str, str]) -> None: + """Resolve a pending AskUserQuestion with user's answers.""" + session = self._sessions.get(conv_id) + if session and session.sdk_session: + await session.sdk_session.answer_question(answers) + # --- Close, list, permission --- async def close(self, conv_id: str, user_id: Optional[str] = None) -> bool: diff --git a/agent/sdk_session.py b/agent/sdk_session.py index c5d5546..96df133 100644 --- a/agent/sdk_session.py +++ b/agent/sdk_session.py @@ -48,6 +48,7 @@ class SessionProgress: last_result: str = "" error: str = "" pending_approval: str = "" # non-empty → waiting for approval, value is tool description + pending_question: dict | None = None # non-None → waiting for user answer to AskUserQuestion class SDKSession: @@ -97,6 +98,10 @@ class SDKSession: self._pending_approval: asyncio.Future | None = None self._pending_approval_desc: str = "" + # AskUserQuestion mechanism + self._pending_question: asyncio.Future | None = None + self._pending_question_data: dict | None = None # {questions: [...], conv_id: ...} + async def start(self) -> None: """Create and connect the ClaudeSDKClient, start the message loop.""" from agent.sdk_hooks import build_hooks @@ -169,6 +174,7 @@ class SDKSession: last_result=self._last_result[:1000], error=self._error, pending_approval=self._pending_approval_desc, + pending_question=self._pending_question_data, ) async def interrupt(self) -> None: @@ -189,6 +195,15 @@ class SDKSession: if self._pending_approval and not self._pending_approval.done(): self._pending_approval.set_result(approved) + async def answer_question(self, answers: dict[str, str]) -> None: + """Resolve a pending AskUserQuestion with user's selected answers. + + Args: + answers: maps question text → selected option label. + """ + if self._pending_question and not self._pending_question.done(): + self._pending_question.set_result(answers) + async def close(self) -> None: """Disconnect and clean up.""" if self._message_loop_task and not self._message_loop_task.done(): @@ -280,15 +295,64 @@ class SDKSession: async def _permission_callback( self, tool_name: str, input_data: dict, context: ToolPermissionContext ) -> PermissionResult: - """can_use_tool — send approval card to Feishu, wait for card callback.""" + """can_use_tool — route to question card or approval card based on tool type.""" # Auto-allow read-only tools if tool_name in ("Read", "Glob", "Grep", "WebSearch", "WebFetch"): return PermissionResultAllow() + # AskUserQuestion: show options to user, collect answer, return via updated_input + if tool_name == "AskUserQuestion": + return await self._handle_ask_user_question(input_data) + if not self.chat_id: return PermissionResultAllow() - # Send approval card + # Regular tools: approval flow + return await self._handle_tool_approval(tool_name, input_data) + + async def _handle_ask_user_question(self, input_data: dict) -> PermissionResult: + """Handle AskUserQuestion: send question card, wait for answer, return updated_input.""" + if not self.chat_id: + return PermissionResultAllow() + + questions = input_data.get("questions", []) + if not questions: + return PermissionResultAllow() + + from bot.feishu import send_card, build_question_card + + # Build and send question card + self._pending_question_data = {"questions": questions, "conv_id": self.conv_id} + card = build_question_card( + conv_id=self.conv_id, + questions=questions, + ) + await send_card(self.chat_id, "chat_id", card) + + # Wait for user's answer (via card callback or text reply) + loop = asyncio.get_running_loop() + self._pending_question = loop.create_future() + try: + answers = await asyncio.wait_for( + self._pending_question, timeout=APPROVAL_TIMEOUT + ) + except asyncio.TimeoutError: + answers = {} + from bot.feishu import send_markdown + await send_markdown(self.chat_id, "chat_id", "⏰ 问题超时,已跳过。") + finally: + self._pending_question_data = None + + # Pre-fill answers in the tool input + modified_input = dict(input_data) + if "answers" not in modified_input or not isinstance(modified_input.get("answers"), dict): + modified_input["answers"] = {} + modified_input["answers"].update(answers) + + return PermissionResultAllow(updated_input=modified_input) + + async def _handle_tool_approval(self, tool_name: str, input_data: dict) -> PermissionResult: + """Handle regular tool approval: send approval card, wait for approve/deny.""" from bot.feishu import send_card, build_approval_card summary = self._format_tool_summary(tool_name, input_data) @@ -302,7 +366,6 @@ class SDKSession: ) await send_card(self.chat_id, "chat_id", card) - # Wait for card callback or text reply y/n loop = asyncio.get_running_loop() self._pending_approval = loop.create_future() try: @@ -312,18 +375,14 @@ class SDKSession: except asyncio.TimeoutError: approved = False from bot.feishu import send_markdown - await send_markdown(self.chat_id, "chat_id", "⏰ 审批超时,已自动拒绝。") finally: self._pending_approval_desc = "" from agent.audit import log_permission_decision - log_permission_decision( - conv_id=self.conv_id, - tool_name=tool_name, - tool_input=input_data, - approved=approved, + conv_id=self.conv_id, tool_name=tool_name, + tool_input=input_data, approved=approved, ) if approved: return PermissionResultAllow() diff --git a/bot/feishu.py b/bot/feishu.py index 6c98057..3f7574f 100644 --- a/bot/feishu.py +++ b/bot/feishu.py @@ -190,6 +190,64 @@ def build_approval_card(conv_id: str, tool_name: str, summary: str, timeout: int } +def build_question_card(conv_id: str, questions: list[dict]) -> dict: + """Build a question card for AskUserQuestion (schema 2.0). + + Each question's options become buttons. The first question is shown + prominently; multi-question support shows them sequentially. + """ + elements: list[dict] = [] + + for i, q in enumerate(questions): + question_text = q.get("question", "") + header = q.get("header", "") + options = q.get("options", []) + + if header: + elements.append({ + "tag": "markdown", + "content": f"**{header}**\n{question_text}", + }) + else: + elements.append({ + "tag": "markdown", + "content": f"**{question_text}**", + }) + + for opt in options: + label = opt.get("label", "") + desc = opt.get("description", "") + button_text = f"{label} — {desc}" if desc else label + # Truncate long button text + if len(button_text) > 60: + button_text = button_text[:57] + "..." + elements.append({ + "tag": "button", + "text": {"tag": "plain_text", "content": button_text}, + "type": "default", + "value": { + "action": "answer_question", + "conv_id": conv_id, + "question": question_text, + "answer": label, + }, + }) + + elements.append({ + "tag": "div", + "text": {"tag": "plain_text", "content": "也可直接输入文字回复"}, + }) + + return { + "schema": "2.0", + "header": { + "title": {"tag": "plain_text", "content": "❓ Claude Code 提问"}, + "template": "blue", + }, + "body": {"elements": elements}, + } + + async def send_file(receive_id: str, receive_id_type: str, file_path: str, file_type: str = "stream") -> None: """ Upload a local file to Feishu and send it as a file message. diff --git a/bot/handler.py b/bot/handler.py index 921edb0..543afe5 100644 --- a/bot/handler.py +++ b/bot/handler.py @@ -148,6 +148,27 @@ async def _process_message(user_id: str, chat_id: str, text: str) -> None: await send_text(chat_id, "chat_id", label) return + # Text answer fallback: any text reply when a question is pending + from orchestrator.agent import agent as _agent + from agent.manager import manager as _manager + conv_id = _agent.get_active_conv(user_id) + if conv_id: + session = _manager._sessions.get(conv_id) + if ( + session + and session.sdk_session + and session.sdk_session._pending_question + and not session.sdk_session._pending_question.done() + and session.sdk_session._pending_question_data + ): + # Use text as answer to the first pending question + questions = session.sdk_session._pending_question_data.get("questions", []) + if questions: + q_text = questions[0].get("question", "") + await _manager.answer_question(conv_id, {q_text: text.strip()}) + await send_text(chat_id, "chat_id", f"✅ 已回答: {text.strip()}") + return + from config import ROUTER_MODE if ROUTER_MODE: from router.nodes import get_node_registry @@ -215,77 +236,108 @@ def _handle_any(data: lark.CustomizedEvent) -> None: logger.info("RAW CustomizedEvent: %s", marshaled[:500]) -def _handle_card_action(data: lark.CustomizedEvent) -> dict | None: - """Handle Feishu card button clicks (approval approve/deny). +def _handle_card_action(data: "P2CardActionTrigger") -> "P2CardActionTriggerResponse": + """Handle Feishu card button clicks via register_p2_card_action_trigger. Per docs/feishu/card_callback_communication.md: - Must respond within 3 seconds - - Return toast + updated card to give user visual feedback + - Return P2CardActionTriggerResponse with toast + updated card """ + from lark_oapi.event.callback.model.p2_card_action_trigger import ( + CallBackCard, CallBackToast, P2CardActionTriggerResponse, + ) + + def _response(toast_type: str, toast_text: str, card_data: dict) -> P2CardActionTriggerResponse: + resp = P2CardActionTriggerResponse() + toast = CallBackToast() + toast.type = toast_type + toast.content = toast_text + resp.toast = toast + card = CallBackCard() + card.type = "raw" + card.data = card_data + resp.card = card + return resp + + def _empty_response() -> P2CardActionTriggerResponse: + return P2CardActionTriggerResponse() + try: - marshaled = lark.JSON.marshal(data) - if not marshaled: - return None + event = data.event + if not event: + return _empty_response() - payload = json.loads(marshaled) if isinstance(marshaled, str) else marshaled - event = payload.get("event", {}) - action = event.get("action", {}) - value = action.get("value", {}) + action = event.action + if not action: + return _empty_response() - action_type = value.get("action") # "approve" or "deny" + value: dict = action.value or {} + action_type = value.get("action") conv_id = value.get("conv_id") if not action_type or not conv_id: logger.debug("Card action without action/conv_id: %s", value) - return None + return _empty_response() - approved = action_type == "approve" - operator_open_id = event.get("operator", {}).get("open_id", "") + operator_open_id = (event.operator.open_id or "") if event.operator else "" logger.info( - "Card action: %s for session %s by %s", + "Card action: %s for session %s by ...%s", action_type, conv_id, operator_open_id[-8:], ) - # Dispatch approval to SDKSession (async, fire-and-forget) + # --- AskUserQuestion answer --- + if action_type == "answer_question": + question = value.get("question", "") + answer = value.get("answer", "") + if _main_loop: + asyncio.run_coroutine_threadsafe( + _handle_question_answer_async(conv_id, question, answer), _main_loop + ) + return _response( + "success", f"已选择: {answer}", + { + "schema": "2.0", + "header": { + "title": {"tag": "plain_text", "content": "❓ Claude Code 提问"}, + "template": "green", + }, + "body": { + "elements": [ + {"tag": "markdown", "content": f"**{question}**\n\n✅ 已选择: **{answer}**"}, + ], + }, + }, + ) + + # --- Tool approval --- + approved = action_type == "approve" if _main_loop: asyncio.run_coroutine_threadsafe( _handle_approval_async(conv_id, approved), _main_loop ) - # Respond to callback within 3s: toast + updated card showing result if approved: toast_type, toast_text = "success", "✅ 已批准" - card_status = "✅ **已批准**" - template = "green" + card_status, template = "✅ **已批准**", "green" else: toast_type, toast_text = "warning", "❌ 已拒绝" - card_status = "❌ **已拒绝**" - template = "red" + card_status, template = "❌ **已拒绝**", "red" - return { - "toast": { - "type": toast_type, - "content": toast_text, - }, - "card": { - "type": "raw", - "data": { - "schema": "2.0", - "header": { - "title": {"tag": "plain_text", "content": "🔐 权限审批"}, - "template": template, - }, - "body": { - "elements": [ - {"tag": "markdown", "content": card_status}, - ], - }, + return _response( + toast_type, toast_text, + { + "schema": "2.0", + "header": { + "title": {"tag": "plain_text", "content": "🔐 权限审批"}, + "template": template, }, + "body": {"elements": [{"tag": "markdown", "content": card_status}]}, }, - } + ) + except Exception: logger.exception("Error handling card action") - return None + return P2CardActionTriggerResponse() async def _handle_approval_async(conv_id: str, approved: bool) -> None: @@ -294,13 +346,19 @@ async def _handle_approval_async(conv_id: str, approved: bool) -> None: await manager.approve(conv_id, approved) +async def _handle_question_answer_async(conv_id: str, question: str, answer: str) -> None: + """Process a question answer from card callback.""" + from agent.manager import manager + await manager.answer_question(conv_id, {question: answer}) + + def build_event_handler() -> lark.EventDispatcherHandler: """Construct the EventDispatcherHandler with all registered callbacks.""" handler = ( lark.EventDispatcherHandler.builder("", "") .register_p2_im_message_receive_v1(_handle_message) .register_p1_customized_event("im.message.receive_v1", _handle_any) - .register_p1_customized_event("card.action.trigger", _handle_card_action) + .register_p2_card_action_trigger(_handle_card_action) .build() ) return handler diff --git a/docs/claude/test_can_use_tool_ask.py b/docs/claude/test_can_use_tool_ask.py new file mode 100644 index 0000000..dc047be --- /dev/null +++ b/docs/claude/test_can_use_tool_ask.py @@ -0,0 +1,160 @@ +"""Example: can_use_tool callback — intercept AskUserQuestion and provide answers. + +Demonstrates: +- can_use_tool callback for tool permission control +- Detecting AskUserQuestion tool calls +- Using PermissionResultAllow(updated_input=...) to pre-fill user answers +- Auto-allowing read-only tools + +This pattern is used by the secretary model to forward CC questions +to Feishu users and relay their responses back. + +Usage: + cd docs/claude + ../../.venv/Scripts/python test_can_use_tool_ask.py +""" + +from __future__ import annotations + +import asyncio +import json + +from _sdk_test_common import auth_env, make_tmpdir, remove_tmpdir, setup_auth + +# Simulated user responses (in production, these come from Feishu card callbacks) +SIMULATED_ANSWERS: dict[str, str] = {} +permission_log: list[dict] = [] + + +async def permission_callback(tool_name, input_data, context): + """can_use_tool callback that intercepts AskUserQuestion. + + For AskUserQuestion: + - Extracts questions and options from input_data + - Provides pre-filled answers via updated_input + - Returns PermissionResultAllow with the modified input + + For other tools: + - Read-only tools: auto-allow + - Write tools: auto-allow (for testing) + """ + from claude_agent_sdk import PermissionResultAllow, PermissionResultDeny + + permission_log.append({ + "tool": tool_name, + "input_keys": list(input_data.keys()), + }) + + if tool_name == "AskUserQuestion": + questions = input_data.get("questions", []) + print(f"\n 🔔 AskUserQuestion intercepted! ({len(questions)} questions)") + + # Build answers dict: {question_text: selected_option_label} + answers = {} + for q in questions: + question_text = q.get("question", "") + options = q.get("options", []) + multi = q.get("multiSelect", False) + + print(f" Q: {question_text}") + for opt in options: + print(f" - {opt['label']}: {opt.get('description', '')[:60]}") + + # In production: send card to Feishu, wait for user selection + # Here: use first option as simulated answer + if options: + selected = SIMULATED_ANSWERS.get(question_text, options[0]["label"]) + answers[question_text] = selected + print(f" → Selected: {selected}") + + # Pre-fill answers in the tool input via updated_input + modified_input = dict(input_data) + if "answers" not in modified_input: + modified_input["answers"] = {} + if isinstance(modified_input["answers"], dict): + modified_input["answers"].update(answers) + + print(f" → updated_input.answers = {answers}") + return PermissionResultAllow(updated_input=modified_input) + + # Auto-allow everything else for this test + return PermissionResultAllow() + + +async def main() -> int: + ok, msg = setup_auth() + print(msg) + if not ok: + return 1 + + from claude_agent_sdk import ( + AssistantMessage, + ClaudeAgentOptions, + ClaudeSDKClient, + ResultMessage, + SystemMessage, + TextBlock, + ToolUseBlock, + ) + + tmpdir = make_tmpdir("ask-test-") + try: + print("--- Test: can_use_tool intercepts AskUserQuestion ---") + print(f" tmpdir: {tmpdir}") + + opts = ClaudeAgentOptions( + cwd=str(tmpdir), + permission_mode="default", + can_use_tool=permission_callback, + max_turns=5, + env=auth_env(), + ) + + # Ask Claude to ask the user a question — this will trigger AskUserQuestion + prompt = ( + "I need you to ask the user a question using the AskUserQuestion tool. " + "Ask them: 'Which programming language do you prefer?' " + "with options: 'Python' (great for AI), 'TypeScript' (great for web). " + "Then tell me what they chose." + ) + + print(f"\n Prompt: {prompt[:100]}...") + + async with ClaudeSDKClient(opts) as client: + await client.query(prompt) + + ask_seen = False + result_text = "" + + async for msg in client.receive_response(): + if isinstance(msg, AssistantMessage): + for block in msg.content: + if isinstance(block, TextBlock): + print(f" Claude: {block.text[:200]}") + elif isinstance(block, ToolUseBlock): + if block.name == "AskUserQuestion": + ask_seen = True + print(f" [ToolUse] AskUserQuestion called!") + else: + print(f" [ToolUse] {block.name}") + elif isinstance(msg, ResultMessage): + result_text = msg.result or "" + print(f"\n Result: {result_text[:300]}") + + print(f"\n Permission log ({len(permission_log)} entries):") + for entry in permission_log: + print(f" {entry['tool']}: keys={entry['input_keys']}") + + if ask_seen: + print("\n ✅ PASS: AskUserQuestion was intercepted by can_use_tool") + else: + print("\n ⚠️ AskUserQuestion was not called (Claude may have answered directly)") + print(" This is OK — it means Claude didn't need to ask.") + + return 0 + finally: + remove_tmpdir(tmpdir) + + +if __name__ == "__main__": + raise SystemExit(asyncio.run(main())) diff --git a/tests/conftest.py b/tests/conftest.py index 4fee88a..7e9c351 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,11 +41,16 @@ def feishu_calls(): # ── Singleton state resets ─────────────────────────────────────────────────── @pytest.fixture(autouse=True) -def reset_manager(): +def reset_manager(tmp_path): from agent.manager import manager + import agent.manager as mgr_mod + # Redirect persistence to tmp_path + original_file = mgr_mod.PERSISTENCE_FILE + mgr_mod.PERSISTENCE_FILE = tmp_path / "sessions.json" manager._sessions.clear() yield manager._sessions.clear() + mgr_mod.PERSISTENCE_FILE = original_file @pytest.fixture(autouse=True) @@ -71,8 +76,12 @@ def reset_task_runner(): @pytest.fixture(autouse=True) -def reset_scheduler(): +def reset_scheduler(tmp_path): from agent.scheduler import scheduler + import agent.scheduler as sched_mod + # Redirect persistence to tmp_path so tests don't pollute production data + original_file = sched_mod.PERSISTENCE_FILE + sched_mod.PERSISTENCE_FILE = tmp_path / "scheduled_jobs.json" for task in list(getattr(scheduler, "_tasks", {}).values()): task.cancel() scheduler._jobs.clear() @@ -80,6 +89,7 @@ def reset_scheduler(): for task in list(getattr(scheduler, "_tasks", {}).values()): task.cancel() scheduler._jobs.clear() + sched_mod.PERSISTENCE_FILE = original_file @pytest.fixture(autouse=True) diff --git a/tests/test_sdk_migration.py b/tests/test_sdk_migration.py index 255e7a5..03586ad 100644 --- a/tests/test_sdk_migration.py +++ b/tests/test_sdk_migration.py @@ -787,83 +787,153 @@ class TestBuildApprovalCard: class TestCardCallbackResponse: - """Test _handle_card_action returns proper callback response per + """Test _handle_card_action returns proper P2CardActionTriggerResponse per docs/feishu/card_callback_communication.md.""" - def _make_card_event(self, action: str, conv_id: str) -> object: - """Create a mock CustomizedEvent with card action payload.""" - import lark_oapi as lark - mock_event = MagicMock() - payload = json.dumps({ - "schema": "2.0", - "header": { - "event_id": "evt_123", - "event_type": "card.action.trigger", - }, - "event": { - "operator": { - "open_id": "ou_test_user_123", - }, - "action": { - "value": {"action": action, "conv_id": conv_id}, - "tag": "button", - }, - }, - }) - # lark.JSON.marshal returns the JSON string - with patch("lark_oapi.JSON.marshal", return_value=payload): - yield payload + def _make_trigger(self, action: str, conv_id: str, **extra) -> "P2CardActionTrigger": + from lark_oapi.event.callback.model.p2_card_action_trigger import ( + CallBackAction, CallBackOperator, P2CardActionTrigger, P2CardActionTriggerData, + ) + value = {"action": action, "conv_id": conv_id, **extra} + act = CallBackAction() + act.value = value + act.tag = "button" + op = CallBackOperator() + op.open_id = "ou_test_user" + data = P2CardActionTriggerData() + data.action = act + data.operator = op + trigger = P2CardActionTrigger() + trigger.event = data + return trigger def test_approve_returns_toast_and_card(self): from bot.handler import _handle_card_action - payload = json.dumps({ - "event": { - "operator": {"open_id": "ou_test"}, - "action": {"value": {"action": "approve", "conv_id": "c1"}, "tag": "button"}, - }, - }) - with patch("lark_oapi.JSON.marshal", return_value=payload), \ - patch("bot.handler._main_loop", new=MagicMock()): - result = _handle_card_action(MagicMock()) - - assert result is not None - # Toast - assert result["toast"]["type"] == "success" - assert "批准" in result["toast"]["content"] - # Updated card - assert result["card"]["type"] == "raw" - card_data = result["card"]["data"] - assert card_data["schema"] == "2.0" - assert card_data["header"]["template"] == "green" - assert "批准" in card_data["body"]["elements"][0]["content"] + trigger = self._make_trigger("approve", "c1") + with patch("bot.handler._main_loop", new=MagicMock()): + resp = _handle_card_action(trigger) + assert resp.toast is not None + assert resp.toast.type == "success" + assert "批准" in resp.toast.content + assert resp.card is not None + assert resp.card.type == "raw" + assert resp.card.data["header"]["template"] == "green" def test_deny_returns_warning_toast(self): from bot.handler import _handle_card_action - payload = json.dumps({ - "event": { - "operator": {"open_id": "ou_test"}, - "action": {"value": {"action": "deny", "conv_id": "c1"}, "tag": "button"}, - }, - }) - with patch("lark_oapi.JSON.marshal", return_value=payload), \ - patch("bot.handler._main_loop", new=MagicMock()): - result = _handle_card_action(MagicMock()) + trigger = self._make_trigger("deny", "c1") + with patch("bot.handler._main_loop", new=MagicMock()): + resp = _handle_card_action(trigger) + assert resp.toast.type == "warning" + assert "拒绝" in resp.toast.content + assert resp.card.data["header"]["template"] == "red" - assert result is not None - assert result["toast"]["type"] == "warning" - assert "拒绝" in result["toast"]["content"] - assert result["card"]["data"]["header"]["template"] == "red" - - def test_missing_value_returns_none(self): + def test_missing_value_returns_empty_response(self): from bot.handler import _handle_card_action - payload = json.dumps({ - "event": { - "action": {"value": {}, "tag": "button"}, - }, - }) - with patch("lark_oapi.JSON.marshal", return_value=payload): - result = _handle_card_action(MagicMock()) - assert result is None + from lark_oapi.event.callback.model.p2_card_action_trigger import ( + CallBackAction, P2CardActionTrigger, P2CardActionTriggerData, + ) + act = CallBackAction() + act.value = {} # no action/conv_id + data = P2CardActionTriggerData() + data.action = act + trigger = P2CardActionTrigger() + trigger.event = data + resp = _handle_card_action(trigger) + assert resp.toast is None + assert resp.card is None + + def test_answer_question_returns_success_toast(self): + from bot.handler import _handle_card_action + trigger = self._make_trigger( + "answer_question", "c1", + question="Which lang?", answer="Python" + ) + with patch("bot.handler._main_loop", new=MagicMock()): + resp = _handle_card_action(trigger) + assert resp.toast.type == "success" + assert "Python" in resp.toast.content + assert "Python" in resp.card.data["body"]["elements"][0]["content"] + + +# =========================================================================== +# 8c. AskUserQuestion flow +# =========================================================================== + + +class TestAskUserQuestion: + + def test_build_question_card(self): + from bot.feishu import build_question_card + questions = [ + { + "question": "Which language?", + "header": "Language", + "options": [ + {"label": "Python", "description": "Great for AI"}, + {"label": "TypeScript", "description": "Great for web"}, + ], + "multiSelect": False, + } + ] + card = build_question_card("c1", questions) + assert card["schema"] == "2.0" + assert "提问" in card["header"]["title"]["content"] + + elements = card["body"]["elements"] + buttons = [e for e in elements if e["tag"] == "button"] + assert len(buttons) == 2 + assert buttons[0]["value"]["action"] == "answer_question" + assert buttons[0]["value"]["question"] == "Which language?" + assert buttons[0]["value"]["answer"] == "Python" + assert buttons[1]["value"]["answer"] == "TypeScript" + + @pytest.mark.asyncio + async def test_answer_question_resolves_future(self): + from agent.sdk_session import SDKSession + s = SDKSession("c1", "/tmp", "u1") + loop = asyncio.get_running_loop() + s._pending_question = loop.create_future() + await s.answer_question({"Which language?": "Python"}) + assert s._pending_question.done() + assert s._pending_question.result() == {"Which language?": "Python"} + + def test_card_callback_answer_question(self): + from bot.handler import _handle_card_action + from lark_oapi.event.callback.model.p2_card_action_trigger import ( + CallBackAction, CallBackOperator, P2CardActionTrigger, P2CardActionTriggerData, + ) + act = CallBackAction() + act.value = {"action": "answer_question", "conv_id": "c1", + "question": "Which lang?", "answer": "Python"} + act.tag = "button" + op = CallBackOperator() + op.open_id = "ou_test" + data = P2CardActionTriggerData() + data.action = act + data.operator = op + trigger = P2CardActionTrigger() + trigger.event = data + + with patch("bot.handler._main_loop", new=MagicMock()): + resp = _handle_card_action(trigger) + + assert resp.toast.type == "success" + assert "Python" in resp.toast.content + assert "Python" in resp.card.data["body"]["elements"][0]["content"] + + def test_progress_shows_pending_question(self): + from agent.sdk_session import SDKSession + s = SDKSession("c1", "/tmp", "u1") + s._busy = True + s._started_at = time.time() + s._pending_question_data = { + "questions": [{"question": "Pick a color?", "options": [{"label": "Red"}, {"label": "Blue"}]}], + "conv_id": "c1", + } + p = s.get_progress() + assert p.pending_question is not None + assert p.pending_question["questions"][0]["question"] == "Pick a color?" # ===========================================================================