PhoneWork/tests/step_defs/test_commands.py
Yuyao Huang (Sam) cbeafa35a5 feat(perm): 添加会话权限模式管理功能
实现会话权限模式管理功能,包括:
1. 在 pty_process 中定义三种权限模式标志
2. 添加 /perm 命令用于修改会话权限模式
3. 新增 run_command 工具用于执行 bot 控制命令
4. 在会话管理中支持权限模式设置
5. 添加完整的测试用例和文档说明
2026-03-29 06:46:45 +08:00

80 lines
2.7 KiB
Python

"""
Step definitions for all slash command features.
"""
from __future__ import annotations
import time
from pytest_bdd import scenarios, given, when, then, parsers
from tests.step_defs.common_steps import * # noqa: F401,F403 — import shared steps
scenarios(
"../features/commands/new.feature",
"../features/commands/status.feature",
"../features/commands/switch.feature",
"../features/commands/close.feature",
"../features/commands/direct_smart.feature",
"../features/commands/shell.feature",
"../features/commands/remind.feature",
"../features/commands/tasks.feature",
"../features/commands/nodes.feature",
"../features/commands/help.feature",
"../features/commands/perm.feature",
)
# ── When: send slash command ─────────────────────────────────────────────────
@when(parsers.parse('user sends "{text}"'))
def send_command(text, pytestconfig, feishu_calls, mock_run_claude):
import asyncio
from bot.commands import handle_command
user_id = getattr(pytestconfig, "_test_user_id", "user_abc123")
reply = asyncio.get_event_loop().run_until_complete(handle_command(user_id, text))
pytestconfig._reply = reply
# ── Given: task runner state ─────────────────────────────────────────────────
@given(parsers.parse('there is a running task "{task_id}" described as "{desc}"'))
def add_running_task(task_id, desc):
from agent.task_runner import task_runner, BackgroundTask, TaskStatus
task = BackgroundTask(
task_id=task_id,
description=desc,
started_at=time.time(),
status=TaskStatus.RUNNING,
)
task_runner._tasks[task_id] = task
@given(parsers.parse('there is a completed task "{task_id}" described as "{desc}"'))
def add_completed_task(task_id, desc):
from agent.task_runner import task_runner, BackgroundTask, TaskStatus
now = time.time()
task = BackgroundTask(
task_id=task_id,
description=desc,
started_at=now - 5,
status=TaskStatus.COMPLETED,
completed_at=now,
result="success",
)
task_runner._tasks[task_id] = task
@given(parsers.parse('there is a failed task "{task_id}" described as "{desc}"'))
def add_failed_task(task_id, desc):
from agent.task_runner import task_runner, BackgroundTask, TaskStatus
now = time.time()
task = BackgroundTask(
task_id=task_id,
description=desc,
started_at=now - 3,
status=TaskStatus.FAILED,
completed_at=now,
error="subprocess failed",
)
task_runner._tasks[task_id] = task