The module has long since moved away from PTY technology, using claude -p with --output-format stream-json and --resume instead. Rename to cc_runner to accurately reflect what it does. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
168 lines
5.5 KiB
Python
168 lines
5.5 KiB
Python
"""
|
|
Master test fixtures for PhoneWork BDD tests.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
TESTS_DIR = Path(__file__).parent
|
|
CASSETTES_DIR = TESTS_DIR / "cassettes"
|
|
CASSETTES_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
# ── Feishu send mock ─────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def feishu_calls():
|
|
"""
|
|
Capture all calls to bot.feishu send functions.
|
|
Lazy imports inside commands.py pull from bot.feishu at call time,
|
|
so patching the module attributes is sufficient.
|
|
"""
|
|
captured: dict[str, list] = {"texts": [], "cards": [], "files": []}
|
|
|
|
async def mock_send_text(receive_id, receive_id_type, text):
|
|
captured["texts"].append({"receive_id": receive_id, "text": text})
|
|
|
|
async def mock_send_card(receive_id, receive_id_type, card):
|
|
captured["cards"].append({"receive_id": receive_id, "card": card})
|
|
|
|
async def mock_send_file(receive_id, receive_id_type, file_path, file_type="stream"):
|
|
captured["files"].append({"receive_id": receive_id, "file_path": file_path})
|
|
|
|
with patch("bot.feishu.send_text", side_effect=mock_send_text), \
|
|
patch("bot.feishu.send_card", side_effect=mock_send_card), \
|
|
patch("bot.feishu.send_file", side_effect=mock_send_file):
|
|
yield captured
|
|
|
|
|
|
# ── run_claude mock ──────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def mock_run_claude():
|
|
"""
|
|
Replace run_claude in both its definition and its import site in manager.py.
|
|
Default return value is a short CC-style output string.
|
|
"""
|
|
mock = AsyncMock(return_value="Claude Code: task complete.")
|
|
with patch("agent.cc_runner.run_claude", mock), \
|
|
patch("agent.manager.run_claude", mock):
|
|
yield mock
|
|
|
|
|
|
# ── Singleton state resets ───────────────────────────────────────────────────
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_manager():
|
|
from agent.manager import manager
|
|
manager._sessions.clear()
|
|
yield
|
|
manager._sessions.clear()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_agent():
|
|
from orchestrator.agent import agent
|
|
agent._history.clear()
|
|
agent._active_conv.clear()
|
|
agent._passthrough.clear()
|
|
agent._user_locks.clear()
|
|
yield
|
|
agent._history.clear()
|
|
agent._active_conv.clear()
|
|
agent._passthrough.clear()
|
|
agent._user_locks.clear()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_task_runner():
|
|
from agent.task_runner import task_runner
|
|
task_runner._tasks.clear()
|
|
yield
|
|
task_runner._tasks.clear()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_scheduler():
|
|
from agent.scheduler import scheduler
|
|
for task in list(getattr(scheduler, "_tasks", {}).values()):
|
|
task.cancel()
|
|
scheduler._jobs.clear()
|
|
yield
|
|
for task in list(getattr(scheduler, "_tasks", {}).values()):
|
|
task.cancel()
|
|
scheduler._jobs.clear()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_contextvars():
|
|
from orchestrator.tools import set_current_user, set_current_chat
|
|
set_current_user(None)
|
|
set_current_chat(None)
|
|
yield
|
|
set_current_user(None)
|
|
set_current_chat(None)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_reply(pytestconfig):
|
|
"""Clear _reply before each test so stale values don't leak between scenarios."""
|
|
pytestconfig._reply = None
|
|
yield
|
|
|
|
|
|
# ── Working directory isolation ──────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def tmp_working_dir(tmp_path, monkeypatch):
|
|
import config
|
|
import orchestrator.tools as tools_mod
|
|
monkeypatch.setattr(config, "WORKING_DIR", tmp_path)
|
|
monkeypatch.setattr(tools_mod, "WORKING_DIR", tmp_path)
|
|
(tmp_path / "myproject").mkdir()
|
|
return tmp_path
|
|
|
|
|
|
# ── VCR cassette factory ─────────────────────────────────────────────────────
|
|
|
|
def make_vcr_cassette(cassette_name: str):
|
|
"""
|
|
Return a vcrpy context manager for the given cassette name.
|
|
Set VCR_RECORD_MODE=new_episodes locally to record; CI uses 'none'.
|
|
Authorization headers are stripped so cassettes are safe to commit.
|
|
If the cassette doesn't exist in 'none' mode, the test is skipped.
|
|
"""
|
|
import os
|
|
try:
|
|
import vcr
|
|
except ImportError:
|
|
import pytest
|
|
pytest.skip("vcrpy not installed")
|
|
|
|
record_mode = os.environ.get("VCR_RECORD_MODE", "none")
|
|
cassette_path = CASSETTES_DIR / cassette_name
|
|
cassette_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if record_mode == "none" and not cassette_path.exists():
|
|
import pytest
|
|
pytest.skip(f"No cassette recorded yet: {cassette_name}. Run with VCR_RECORD_MODE=new_episodes to record.")
|
|
|
|
my_vcr = vcr.VCR(
|
|
record_mode=record_mode,
|
|
match_on=["method", "scheme", "host", "port", "path", "body"],
|
|
filter_headers=["authorization", "x-api-key"],
|
|
decode_compressed_response=True,
|
|
)
|
|
return my_vcr.use_cassette(str(cassette_path))
|
|
|
|
|
|
@pytest.fixture
|
|
def vcr_cassette():
|
|
return make_vcr_cassette
|