- Add question card builder and answer handling in feishu.py - Extend SDKSession with pending question state and answer method - Update card callback handler to support question answers - Add test cases for question flow and card responses - Document usage with test_can_use_tool_ask.py example
115 lines
3.8 KiB
Python
115 lines
3.8 KiB
Python
"""
|
|
Shared test fixtures for PhoneWork tests.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
|
|
# ── Feishu send mock ─────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def feishu_calls():
|
|
"""Capture all calls to bot.feishu send functions."""
|
|
captured: dict[str, list] = {"texts": [], "cards": [], "markdowns": [], "files": []}
|
|
|
|
async def mock_send_text(receive_id, receive_id_type, text):
|
|
captured["texts"].append(text)
|
|
|
|
async def mock_send_markdown(receive_id, receive_id_type, content):
|
|
captured["markdowns"].append(content)
|
|
|
|
async def mock_send_card(receive_id, receive_id_type, card):
|
|
captured["cards"].append(card)
|
|
|
|
async def mock_send_file(receive_id, receive_id_type, file_path, file_type="stream"):
|
|
captured["files"].append(file_path)
|
|
|
|
with patch("bot.feishu.send_text", side_effect=mock_send_text), \
|
|
patch("bot.feishu.send_markdown", side_effect=mock_send_markdown), \
|
|
patch("bot.feishu.send_card", side_effect=mock_send_card), \
|
|
patch("bot.feishu.send_file", side_effect=mock_send_file), \
|
|
patch("bot.handler.send_text", side_effect=mock_send_text), \
|
|
patch("bot.handler.send_markdown", side_effect=mock_send_markdown):
|
|
yield captured
|
|
|
|
|
|
# ── Singleton state resets ───────────────────────────────────────────────────
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_manager(tmp_path):
|
|
from agent.manager import manager
|
|
import agent.manager as mgr_mod
|
|
# Redirect persistence to tmp_path
|
|
original_file = mgr_mod.PERSISTENCE_FILE
|
|
mgr_mod.PERSISTENCE_FILE = tmp_path / "sessions.json"
|
|
manager._sessions.clear()
|
|
yield
|
|
manager._sessions.clear()
|
|
mgr_mod.PERSISTENCE_FILE = original_file
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_agent():
|
|
from orchestrator.agent import agent
|
|
agent._history.clear()
|
|
agent._active_conv.clear()
|
|
agent._passthrough.clear()
|
|
agent._user_locks.clear()
|
|
yield
|
|
agent._history.clear()
|
|
agent._active_conv.clear()
|
|
agent._passthrough.clear()
|
|
agent._user_locks.clear()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_task_runner():
|
|
from agent.task_runner import task_runner
|
|
task_runner._tasks.clear()
|
|
yield
|
|
task_runner._tasks.clear()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_scheduler(tmp_path):
|
|
from agent.scheduler import scheduler
|
|
import agent.scheduler as sched_mod
|
|
# Redirect persistence to tmp_path so tests don't pollute production data
|
|
original_file = sched_mod.PERSISTENCE_FILE
|
|
sched_mod.PERSISTENCE_FILE = tmp_path / "scheduled_jobs.json"
|
|
for task in list(getattr(scheduler, "_tasks", {}).values()):
|
|
task.cancel()
|
|
scheduler._jobs.clear()
|
|
yield
|
|
for task in list(getattr(scheduler, "_tasks", {}).values()):
|
|
task.cancel()
|
|
scheduler._jobs.clear()
|
|
sched_mod.PERSISTENCE_FILE = original_file
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_contextvars():
|
|
from orchestrator.tools import set_current_user, set_current_chat
|
|
set_current_user(None)
|
|
set_current_chat(None)
|
|
yield
|
|
set_current_user(None)
|
|
set_current_chat(None)
|
|
|
|
|
|
# ── Working directory isolation ──────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def tmp_working_dir(tmp_path, monkeypatch):
|
|
import config
|
|
import orchestrator.tools as tools_mod
|
|
monkeypatch.setattr(config, "WORKING_DIR", tmp_path)
|
|
monkeypatch.setattr(tools_mod, "WORKING_DIR", tmp_path)
|
|
(tmp_path / "myproject").mkdir()
|
|
return tmp_path
|