实现会话权限模式管理功能,包括: 1. 在 pty_process 中定义三种权限模式标志 2. 添加 /perm 命令用于修改会话权限模式 3. 新增 run_command 工具用于执行 bot 控制命令 4. 在会话管理中支持权限模式设置 5. 添加完整的测试用例和文档说明
80 lines
2.7 KiB
Python
80 lines
2.7 KiB
Python
"""
|
|
Step definitions for all slash command features.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
|
|
from pytest_bdd import scenarios, given, when, then, parsers
|
|
|
|
from tests.step_defs.common_steps import * # noqa: F401,F403 — import shared steps
|
|
|
|
scenarios(
|
|
"../features/commands/new.feature",
|
|
"../features/commands/status.feature",
|
|
"../features/commands/switch.feature",
|
|
"../features/commands/close.feature",
|
|
"../features/commands/direct_smart.feature",
|
|
"../features/commands/shell.feature",
|
|
"../features/commands/remind.feature",
|
|
"../features/commands/tasks.feature",
|
|
"../features/commands/nodes.feature",
|
|
"../features/commands/help.feature",
|
|
"../features/commands/perm.feature",
|
|
)
|
|
|
|
|
|
# ── When: send slash command ─────────────────────────────────────────────────
|
|
|
|
@when(parsers.parse('user sends "{text}"'))
|
|
def send_command(text, pytestconfig, feishu_calls, mock_run_claude):
|
|
import asyncio
|
|
from bot.commands import handle_command
|
|
user_id = getattr(pytestconfig, "_test_user_id", "user_abc123")
|
|
reply = asyncio.get_event_loop().run_until_complete(handle_command(user_id, text))
|
|
pytestconfig._reply = reply
|
|
|
|
|
|
# ── Given: task runner state ─────────────────────────────────────────────────
|
|
|
|
@given(parsers.parse('there is a running task "{task_id}" described as "{desc}"'))
|
|
def add_running_task(task_id, desc):
|
|
from agent.task_runner import task_runner, BackgroundTask, TaskStatus
|
|
task = BackgroundTask(
|
|
task_id=task_id,
|
|
description=desc,
|
|
started_at=time.time(),
|
|
status=TaskStatus.RUNNING,
|
|
)
|
|
task_runner._tasks[task_id] = task
|
|
|
|
|
|
@given(parsers.parse('there is a completed task "{task_id}" described as "{desc}"'))
|
|
def add_completed_task(task_id, desc):
|
|
from agent.task_runner import task_runner, BackgroundTask, TaskStatus
|
|
now = time.time()
|
|
task = BackgroundTask(
|
|
task_id=task_id,
|
|
description=desc,
|
|
started_at=now - 5,
|
|
status=TaskStatus.COMPLETED,
|
|
completed_at=now,
|
|
result="success",
|
|
)
|
|
task_runner._tasks[task_id] = task
|
|
|
|
|
|
@given(parsers.parse('there is a failed task "{task_id}" described as "{desc}"'))
|
|
def add_failed_task(task_id, desc):
|
|
from agent.task_runner import task_runner, BackgroundTask, TaskStatus
|
|
now = time.time()
|
|
task = BackgroundTask(
|
|
task_id=task_id,
|
|
description=desc,
|
|
started_at=now - 3,
|
|
status=TaskStatus.FAILED,
|
|
completed_at=now,
|
|
error="subprocess failed",
|
|
)
|
|
task_runner._tasks[task_id] = task
|