PhoneWork/bot/commands.py
Yuyao Huang (Sam) 9c04d47c8e docs: 更新命令前缀从/改为//
更新所有文档和代码中的命令前缀,从单斜杠`/`改为双斜杠`//`,以保持一致性
2026-03-30 01:40:53 +08:00

463 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Slash command handler for direct bot control."""
from __future__ import annotations
import argparse
import json
import logging
import re
import uuid
from typing import Optional, Tuple
from agent.manager import manager
from agent.scheduler import scheduler
from agent.task_runner import task_runner
from agent.cc_runner import VALID_PERMISSION_MODES, DEFAULT_PERMISSION_MODE
from orchestrator.agent import agent
from orchestrator.tools import set_current_user, get_current_chat
logger = logging.getLogger(__name__)
# Permission mode aliases (user-facing shorthand → internal CC mode)
_PERM_ALIASES: dict[str, str] = {
"bypass": "bypassPermissions",
"skip": "bypassPermissions",
"accept": "acceptEdits",
"plan": "plan",
}
_PERM_LABELS: dict[str, str] = {
"bypassPermissions": "bypass",
"acceptEdits": "accept",
"plan": "plan",
}
def _resolve_perm(alias: str) -> str | None:
"""Map user-facing alias to internal permission mode, or None if invalid."""
return _PERM_ALIASES.get(alias.lower().strip())
def _perm_label(mode: str) -> str:
"""Return short human-readable label for a permission mode."""
return _PERM_LABELS.get(mode, mode)
def parse_command(text: str) -> Optional[Tuple[str, str]]:
"""
Parse a bot command from text.
Returns (command, args) or None if not a command.
Commands must start with COMMAND_PREFIX (default "//").
"""
from config import COMMAND_PREFIX
text = text.strip()
if not text.startswith(COMMAND_PREFIX):
return None
# Strip the prefix, then split into command word and args
body = text[len(COMMAND_PREFIX):]
parts = body.split(None, 1)
cmd = COMMAND_PREFIX + parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
return (cmd, args)
async def handle_command(user_id: str, text: str) -> Optional[str]:
"""
Handle a slash command. Returns the reply or None if not a command.
"""
parsed = parse_command(text)
if not parsed:
return None
cmd, args = parsed
logger.info("Command: %s args=%r user=...%s", cmd, args[:50], user_id[-8:])
# In ROUTER_MODE, only handle router-specific commands locally.
# Session commands (//status, //new, //close, etc.) fall through to node forwarding.
from config import ROUTER_MODE, COMMAND_PREFIX
P = COMMAND_PREFIX
if ROUTER_MODE and cmd not in (P+"nodes", P+"node", P+"help", P+"h", P+"?"):
return None
set_current_user(user_id)
if cmd in (P+"new", P+"n"):
return await _cmd_new(user_id, args)
elif cmd in (P+"status", P+"list", P+"ls", P+"l"):
return await _cmd_status(user_id)
elif cmd in (P+"close", P+"c"):
return await _cmd_close(user_id, args)
elif cmd in (P+"switch", P+"s"):
return await _cmd_switch(user_id, args)
elif cmd == P+"retry":
return await _cmd_retry(user_id)
elif cmd in (P+"help", P+"h", P+"?"):
return _cmd_help()
elif cmd == P+"direct":
return _cmd_direct(user_id)
elif cmd == P+"smart":
return _cmd_smart(user_id)
elif cmd == P+"tasks":
return _cmd_tasks()
elif cmd == P+"shell":
return await _cmd_shell(args)
elif cmd == P+"remind":
return await _cmd_remind(args)
elif cmd == P+"perm":
return await _cmd_perm(user_id, args)
elif cmd in (P+"nodes", P+"node"):
return await _cmd_nodes(user_id, args)
else:
return None
async def _cmd_new(user_id: str, args: str) -> str:
"""Create a new session."""
if not args:
return "Usage: /new <project_dir> [initial_message] [--timeout N] [--perm MODE]\nModes: bypass (default), accept, plan"
parser = argparse.ArgumentParser()
parser.add_argument("working_dir", nargs="?", help="Project directory")
parser.add_argument("rest", nargs="*", help="Initial message")
parser.add_argument("--timeout", type=int, default=None, help="CC timeout in seconds")
parser.add_argument("--idle", type=int, default=None, help="Idle timeout in seconds")
parser.add_argument("--perm", default=None, help="Permission mode: bypass, accept, plan")
try:
parsed = parser.parse_args(args.split())
except SystemExit:
return "Usage: /new <project_dir> [initial_message] [--timeout N] [--idle N] [--perm MODE]"
if not parsed.working_dir:
return "Error: project_dir is required"
permission_mode = _resolve_perm(parsed.perm) if parsed.perm else DEFAULT_PERMISSION_MODE
if permission_mode is None:
return f"Invalid --perm. Valid modes: bypass, accept, plan"
working_dir = parsed.working_dir
initial_msg = " ".join(parsed.rest) if parsed.rest else None
from orchestrator.tools import _resolve_dir
try:
resolved = _resolve_dir(working_dir)
except ValueError as e:
return f"Error: {e}"
import uuid as _uuid
conv_id = str(_uuid.uuid4())[:8]
await manager.create(
conv_id,
str(resolved),
owner_id=user_id,
idle_timeout=parsed.idle or 1800,
cc_timeout=float(parsed.timeout or 300),
permission_mode=permission_mode,
)
agent._active_conv[user_id] = conv_id
response = None
if initial_msg:
response = await manager.send(conv_id, initial_msg, user_id=user_id)
chat_id = get_current_chat()
if chat_id:
from bot.feishu import send_card, send_text, build_sessions_card
sessions = manager.list_sessions(user_id=user_id)
routing_mode = "Direct 🟢" if agent.get_passthrough(user_id) else "Smart ⚪"
card = build_sessions_card(sessions, conv_id, routing_mode)
await send_card(chat_id, "chat_id", card)
if initial_msg and response:
await send_text(chat_id, "chat_id", response)
return ""
perm_label = _perm_label(permission_mode)
reply = f"✓ Created session `{conv_id}` in `{resolved}` [{perm_label}]"
if parsed.timeout:
reply += f" (timeout: {parsed.timeout}s)"
if initial_msg and response:
reply += f"\n\n{response}"
return reply
async def _cmd_status(user_id: str) -> str:
"""Show status: sessions and current mode."""
sessions = manager.list_sessions(user_id=user_id)
active = agent.get_active_conv(user_id)
passthrough = agent.get_passthrough(user_id)
mode = "Direct 🟢" if passthrough else "Smart ⚪"
chat_id = get_current_chat()
if chat_id:
from bot.feishu import send_card, build_sessions_card
card = build_sessions_card(sessions, active, mode)
await send_card(chat_id, "chat_id", card)
return ""
if not sessions:
return "No active sessions."
lines = ["**Your Sessions:**\n"]
for i, s in enumerate(sessions, 1):
marker = "" if s["conv_id"] == active else " "
perm = _perm_label(s.get("permission_mode", DEFAULT_PERMISSION_MODE))
lines.append(f"{marker}{i}. `{s['conv_id']}` - `{s['cwd']}` [{perm}]")
lines.append(f"\n**Mode:** {mode}")
lines.append("Use `//switch <n>` to activate a session.")
lines.append("Use `//direct` or `//smart` to change mode.")
return "\n".join(lines)
async def _cmd_close(user_id: str, args: str) -> str:
"""Close a session."""
# If a specific conv_id is given by name (not a number), resolve it directly.
if args:
try:
int(args)
by_number = True
except ValueError:
by_number = False
if not by_number:
# Explicit conv_id given — look it up directly (may belong to another user).
conv_id = args.strip()
try:
success = await manager.close(conv_id, user_id=user_id)
if success:
if agent.get_active_conv(user_id) == conv_id:
agent._active_conv[user_id] = None
return f"✓ Closed session `{conv_id}`"
else:
return f"Session `{conv_id}` not found."
except PermissionError as e:
return str(e)
sessions = manager.list_sessions(user_id=user_id)
if not sessions:
return "No sessions to close."
if args:
try:
idx = int(args) - 1
if 0 <= idx < len(sessions):
conv_id = sessions[idx]["conv_id"]
else:
return f"Invalid session number. Use 1-{len(sessions)}."
except ValueError:
conv_id = args.strip()
else:
conv_id = agent.get_active_conv(user_id)
if not conv_id:
return "No active session. Use `//close <conv_id>` or `//close <n>`."
try:
success = await manager.close(conv_id, user_id=user_id)
if success:
if agent.get_active_conv(user_id) == conv_id:
agent._active_conv[user_id] = None
return f"✓ Closed session `{conv_id}`"
else:
return f"Session `{conv_id}` not found."
except PermissionError as e:
return str(e)
async def _cmd_switch(user_id: str, args: str) -> str:
"""Switch to a different session."""
sessions = manager.list_sessions(user_id=user_id)
if not sessions:
return "No sessions available."
if not args:
return "Usage: /switch <n>\n" + await _cmd_status(user_id)
try:
idx = int(args) - 1
if 0 <= idx < len(sessions):
conv_id = sessions[idx]["conv_id"]
agent._active_conv[user_id] = conv_id
return f"✓ Switched to session `{conv_id}` ({sessions[idx]['cwd']})"
else:
return f"Invalid session number. Use 1-{len(sessions)}."
except ValueError:
return f"Invalid number: {args}"
async def _cmd_perm(user_id: str, args: str) -> str:
"""Change the permission mode of the active (or specified) session."""
parts = args.split()
if not parts:
return (
"Usage: /perm <mode> [conv_id]\n"
"Modes: bypass (default), accept, plan\n"
" bypass — skip all permission checks\n"
" accept — auto-accept file edits, confirm shell commands\n"
" plan — plan only, no writes"
)
alias = parts[0]
conv_id = parts[1] if len(parts) > 1 else agent.get_active_conv(user_id)
permission_mode = _resolve_perm(alias)
if permission_mode is None:
return f"Unknown mode '{alias}'. Valid: bypass, accept, plan"
if not conv_id:
return "No active session. Use `//perm <mode> <conv_id>` or activate a session first."
try:
manager.set_permission_mode(conv_id, permission_mode, user_id=user_id)
except KeyError:
return f"Session `{conv_id}` not found."
except PermissionError as e:
return str(e)
return f"✓ Session `{conv_id}` permission mode set to **{_perm_label(permission_mode)}**"
async def _cmd_retry(user_id: str) -> str:
"""Retry the last message (placeholder - needs history tracking)."""
return "Retry not yet implemented. Just send your message again."
def _cmd_direct(user_id: str) -> str:
"""Enable direct mode - messages go straight to Claude Code."""
conv = agent.get_active_conv(user_id)
if not conv:
return "No active session. Use `//new` or `//switch` first."
agent.set_passthrough(user_id, True)
return f"✓ Direct mode ON. Messages go directly to session `{conv}`."
def _cmd_smart(user_id: str) -> str:
"""Enable smart mode - messages go through LLM for routing."""
agent.set_passthrough(user_id, False)
return "✓ Smart mode ON. Messages go through LLM for intelligent routing."
def _cmd_tasks() -> str:
"""List background tasks."""
tasks = task_runner.list_tasks()
if not tasks:
return "No background tasks."
lines = ["**Background Tasks:**\n"]
for t in tasks:
status_emoji = {"completed": "", "failed": "", "running": "", "pending": "⏸️"}.get(
t["status"], ""
)
lines.append(f"{status_emoji} #{t['task_id']} - {t['description'][:50]} ({t['elapsed']}s)")
return "\n".join(lines)
async def _cmd_shell(args: str) -> str:
"""Execute a shell command directly."""
if not args:
return "Usage: /shell <command>\nExample: /shell git status"
from orchestrator.tools import ShellTool, get_current_chat
tool = ShellTool()
result = await tool._arun(command=args)
try:
data = json.loads(result)
if "error" in data:
return f"{data['error']}"
output = []
if data.get("stdout"):
output.append(data["stdout"])
if data.get("stderr"):
output.append(f"[stderr] {data['stderr']}")
output.append(f"[exit code: {data.get('exit_code', '?')}]")
return "\n".join(output) if output else "(no output)"
except json.JSONDecodeError:
return result
async def _cmd_remind(args: str) -> str:
"""Set a reminder."""
if not args:
return "Usage: /remind <time> <message>\nExample: /remind 10m check the build\nTime format: 30s, 10m, 1h"
parts = args.split(None, 1)
if len(parts) < 2:
return "Usage: /remind <time> <message>\nExample: /remind 10m check the build"
time_str, message = parts
match = re.match(r'^(\d+)(s|m|h)$', time_str.lower())
if not match:
return "Invalid time format. Use: 30s, 10m, 1h"
value = int(match.group(1))
unit = match.group(2)
seconds = value * {'s': 1, 'm': 60, 'h': 3600}[unit]
chat_id = get_current_chat()
job_id = await scheduler.schedule_once(
delay_seconds=seconds,
message=message,
notify_chat_id=chat_id,
)
return f"⏰ Reminder #{job_id} set for {value}{unit} from now"
async def _cmd_nodes(user_id: str, args: str) -> str:
"""List nodes or switch active node."""
from config import ROUTER_MODE
if not ROUTER_MODE:
return "Not in router mode. Run standalone.py for multi-host support."
from router.nodes import get_node_registry
registry = get_node_registry()
if args:
args = args.strip()
if registry.set_active_node(user_id, args):
return f"✓ Active node set to: {args}"
return f"Error: Node '{args}' not found"
nodes = registry.list_nodes()
if not nodes:
return "No nodes connected."
active_node_id = None
active_node = registry.get_active_node(user_id)
if active_node:
active_node_id = active_node.node_id
lines = ["**Connected Nodes:**\n"]
for n in nodes:
marker = "" if n["node_id"] == active_node_id else " "
status = "🟢" if n["status"] == "online" else "🔴"
lines.append(f"{marker}{n['display_name']} {status} sessions={n['sessions']}")
lines.append("\nUse `//node <name>` to switch active node.")
return "\n".join(lines)
def _cmd_help() -> str:
"""Show help."""
from config import COMMAND_PREFIX as P
return f"""**Commands:** (prefix: `{P}`)
{P}new <dir> [msg] [--timeout N] [--idle N] [--perm MODE] - Create session
{P}status - Show sessions and current mode
{P}close [n] - Close session (active or by number)
{P}switch <n> - Switch to session by number
{P}perm <mode> [conv_id] - Set permission mode (bypass/accept/plan)
{P}direct - Direct mode: messages → Claude Code (no LLM overhead)
{P}smart - Smart mode: messages → LLM routing (default)
{P}shell <cmd> - Run shell command (bypasses LLM)
{P}remind <time> <msg> - Set reminder (e.g. {P}remind 10m check build)
{P}tasks - List background tasks
{P}nodes - List connected host nodes
{P}node <name> - Switch active node
{P}retry - Retry last message
{P}help - Show this help
**Permission modes** (used by {P}perm and {P}new --perm):
bypass — 跳过所有权限确认CC 自动执行一切操作(默认)
适合:受信任的沙盒环境、自动化任务
accept — 自动接受文件编辑,但 shell 命令仍需手动确认
适合:日常开发,需要对命令执行保持控制
plan — 只规划、不执行任何写操作
适合:先预览 CC 的操作计划再决定是否执行"""