""" Shared test fixtures for PhoneWork tests. """ from __future__ import annotations import asyncio from pathlib import Path from unittest.mock import AsyncMock, patch import pytest # ── Feishu send mock ───────────────────────────────────────────────────────── @pytest.fixture def feishu_calls(): """Capture all calls to bot.feishu send functions.""" captured: dict[str, list] = {"texts": [], "cards": [], "markdowns": [], "files": []} async def mock_send_text(receive_id, receive_id_type, text): captured["texts"].append(text) async def mock_send_markdown(receive_id, receive_id_type, content): captured["markdowns"].append(content) async def mock_send_card(receive_id, receive_id_type, card): captured["cards"].append(card) async def mock_send_file(receive_id, receive_id_type, file_path, file_type="stream"): captured["files"].append(file_path) with patch("bot.feishu.send_text", side_effect=mock_send_text), \ patch("bot.feishu.send_markdown", side_effect=mock_send_markdown), \ patch("bot.feishu.send_card", side_effect=mock_send_card), \ patch("bot.feishu.send_file", side_effect=mock_send_file), \ patch("bot.handler.send_text", side_effect=mock_send_text), \ patch("bot.handler.send_markdown", side_effect=mock_send_markdown): yield captured # ── Singleton state resets ─────────────────────────────────────────────────── @pytest.fixture(autouse=True) def reset_manager(tmp_path): from agent.manager import manager import agent.manager as mgr_mod # Redirect persistence to tmp_path original_file = mgr_mod.PERSISTENCE_FILE mgr_mod.PERSISTENCE_FILE = tmp_path / "sessions.json" manager._sessions.clear() yield manager._sessions.clear() mgr_mod.PERSISTENCE_FILE = original_file @pytest.fixture(autouse=True) def reset_agent(): from orchestrator.agent import agent agent._history.clear() agent._active_conv.clear() agent._passthrough.clear() agent._user_locks.clear() yield agent._history.clear() agent._active_conv.clear() agent._passthrough.clear() agent._user_locks.clear() @pytest.fixture(autouse=True) def reset_task_runner(): from agent.task_runner import task_runner task_runner._tasks.clear() yield task_runner._tasks.clear() @pytest.fixture(autouse=True) def reset_scheduler(tmp_path): from agent.scheduler import scheduler import agent.scheduler as sched_mod # Redirect persistence to tmp_path so tests don't pollute production data original_file = sched_mod.PERSISTENCE_FILE sched_mod.PERSISTENCE_FILE = tmp_path / "scheduled_jobs.json" for task in list(getattr(scheduler, "_tasks", {}).values()): task.cancel() scheduler._jobs.clear() yield for task in list(getattr(scheduler, "_tasks", {}).values()): task.cancel() scheduler._jobs.clear() sched_mod.PERSISTENCE_FILE = original_file @pytest.fixture(autouse=True) def reset_contextvars(): from orchestrator.tools import set_current_user, set_current_chat set_current_user(None) set_current_chat(None) yield set_current_user(None) set_current_chat(None) # ── Working directory isolation ────────────────────────────────────────────── @pytest.fixture def tmp_working_dir(tmp_path, monkeypatch): import config import orchestrator.tools as tools_mod monkeypatch.setattr(config, "WORKING_DIR", tmp_path) monkeypatch.setattr(tools_mod, "WORKING_DIR", tmp_path) (tmp_path / "myproject").mkdir() return tmp_path