"""Slash command handler for direct bot control.""" from __future__ import annotations import argparse import json import logging import re import uuid from typing import Optional, Tuple from agent.manager import manager from agent.scheduler import scheduler from agent.task_runner import task_runner from agent.cc_runner import VALID_PERMISSION_MODES, DEFAULT_PERMISSION_MODE from orchestrator.agent import agent from orchestrator.tools import set_current_user, get_current_chat logger = logging.getLogger(__name__) # Permission mode aliases (user-facing shorthand → internal CC mode) _PERM_ALIASES: dict[str, str] = { "bypass": "bypassPermissions", "skip": "bypassPermissions", "accept": "acceptEdits", "plan": "plan", } _PERM_LABELS: dict[str, str] = { "bypassPermissions": "bypass", "acceptEdits": "accept", "plan": "plan", } def _resolve_perm(alias: str) -> str | None: """Map user-facing alias to internal permission mode, or None if invalid.""" return _PERM_ALIASES.get(alias.lower().strip()) def _perm_label(mode: str) -> str: """Return short human-readable label for a permission mode.""" return _PERM_LABELS.get(mode, mode) def parse_command(text: str) -> Optional[Tuple[str, str]]: """ Parse a bot command from text. Returns (command, args) or None if not a command. Commands must start with COMMAND_PREFIX (default "//"). """ from config import COMMAND_PREFIX text = text.strip() if not text.startswith(COMMAND_PREFIX): return None # Strip the prefix, then split into command word and args body = text[len(COMMAND_PREFIX):] parts = body.split(None, 1) cmd = COMMAND_PREFIX + parts[0].lower() args = parts[1] if len(parts) > 1 else "" return (cmd, args) async def handle_command(user_id: str, text: str) -> Optional[str]: """ Handle a slash command. Returns the reply or None if not a command. """ parsed = parse_command(text) if not parsed: return None cmd, args = parsed logger.info("Command: %s args=%r user=...%s", cmd, args[:50], user_id[-8:]) # In ROUTER_MODE, only handle router-specific commands locally. # Session commands (//status, //new, //close, etc.) fall through to node forwarding. from config import ROUTER_MODE, COMMAND_PREFIX P = COMMAND_PREFIX if ROUTER_MODE and cmd not in (P+"nodes", P+"node", P+"help", P+"h", P+"?"): return None set_current_user(user_id) if cmd in (P+"new", P+"n"): return await _cmd_new(user_id, args) elif cmd in (P+"status", P+"list", P+"ls", P+"l"): return await _cmd_status(user_id) elif cmd in (P+"close", P+"c"): return await _cmd_close(user_id, args) elif cmd in (P+"switch", P+"s"): return await _cmd_switch(user_id, args) elif cmd == P+"retry": return await _cmd_retry(user_id) elif cmd in (P+"help", P+"h", P+"?"): return _cmd_help() elif cmd == P+"direct": return _cmd_direct(user_id) elif cmd == P+"smart": return _cmd_smart(user_id) elif cmd == P+"tasks": return _cmd_tasks() elif cmd == P+"shell": return await _cmd_shell(args) elif cmd == P+"remind": return await _cmd_remind(args) elif cmd == P+"perm": return await _cmd_perm(user_id, args) elif cmd in (P+"nodes", P+"node"): return await _cmd_nodes(user_id, args) else: return None async def _cmd_new(user_id: str, args: str) -> str: """Create a new session.""" if not args: return "Usage: /new [initial_message] [--timeout N] [--perm MODE]\nModes: bypass (default), accept, plan" parser = argparse.ArgumentParser() parser.add_argument("working_dir", nargs="?", help="Project directory") parser.add_argument("rest", nargs="*", help="Initial message") parser.add_argument("--timeout", type=int, default=None, help="CC timeout in seconds") parser.add_argument("--idle", type=int, default=None, help="Idle timeout in seconds") parser.add_argument("--perm", default=None, help="Permission mode: bypass, accept, plan") try: parsed = parser.parse_args(args.split()) except SystemExit: return "Usage: /new [initial_message] [--timeout N] [--idle N] [--perm MODE]" if not parsed.working_dir: return "Error: project_dir is required" permission_mode = _resolve_perm(parsed.perm) if parsed.perm else DEFAULT_PERMISSION_MODE if permission_mode is None: return f"Invalid --perm. Valid modes: bypass, accept, plan" working_dir = parsed.working_dir initial_msg = " ".join(parsed.rest) if parsed.rest else None from orchestrator.tools import _resolve_dir try: resolved = _resolve_dir(working_dir) except ValueError as e: return f"Error: {e}" import uuid as _uuid conv_id = str(_uuid.uuid4())[:8] await manager.create( conv_id, str(resolved), owner_id=user_id, idle_timeout=parsed.idle or 1800, cc_timeout=float(parsed.timeout or 300), permission_mode=permission_mode, ) agent._active_conv[user_id] = conv_id response = None if initial_msg: response = await manager.send(conv_id, initial_msg, user_id=user_id) chat_id = get_current_chat() if chat_id: from bot.feishu import send_card, send_text, build_sessions_card sessions = manager.list_sessions(user_id=user_id) routing_mode = "Direct 🟢" if agent.get_passthrough(user_id) else "Smart ⚪" card = build_sessions_card(sessions, conv_id, routing_mode) await send_card(chat_id, "chat_id", card) if initial_msg and response: await send_text(chat_id, "chat_id", response) return "" perm_label = _perm_label(permission_mode) reply = f"✓ Created session `{conv_id}` in `{resolved}` [{perm_label}]" if parsed.timeout: reply += f" (timeout: {parsed.timeout}s)" if initial_msg and response: reply += f"\n\n{response}" return reply async def _cmd_status(user_id: str) -> str: """Show status: sessions and current mode.""" sessions = manager.list_sessions(user_id=user_id) active = agent.get_active_conv(user_id) passthrough = agent.get_passthrough(user_id) mode = "Direct 🟢" if passthrough else "Smart ⚪" chat_id = get_current_chat() if chat_id: from bot.feishu import send_card, build_sessions_card card = build_sessions_card(sessions, active, mode) await send_card(chat_id, "chat_id", card) return "" if not sessions: return "No active sessions." lines = ["**Your Sessions:**\n"] for i, s in enumerate(sessions, 1): marker = "→ " if s["conv_id"] == active else " " perm = _perm_label(s.get("permission_mode", DEFAULT_PERMISSION_MODE)) lines.append(f"{marker}{i}. `{s['conv_id']}` - `{s['cwd']}` [{perm}]") lines.append(f"\n**Mode:** {mode}") lines.append("Use `/switch ` to activate a session.") lines.append("Use `/direct` or `/smart` to change mode.") return "\n".join(lines) async def _cmd_close(user_id: str, args: str) -> str: """Close a session.""" # If a specific conv_id is given by name (not a number), resolve it directly. if args: try: int(args) by_number = True except ValueError: by_number = False if not by_number: # Explicit conv_id given — look it up directly (may belong to another user). conv_id = args.strip() try: success = await manager.close(conv_id, user_id=user_id) if success: if agent.get_active_conv(user_id) == conv_id: agent._active_conv[user_id] = None return f"✓ Closed session `{conv_id}`" else: return f"Session `{conv_id}` not found." except PermissionError as e: return str(e) sessions = manager.list_sessions(user_id=user_id) if not sessions: return "No sessions to close." if args: try: idx = int(args) - 1 if 0 <= idx < len(sessions): conv_id = sessions[idx]["conv_id"] else: return f"Invalid session number. Use 1-{len(sessions)}." except ValueError: conv_id = args.strip() else: conv_id = agent.get_active_conv(user_id) if not conv_id: return "No active session. Use `/close ` or `/close `." try: success = await manager.close(conv_id, user_id=user_id) if success: if agent.get_active_conv(user_id) == conv_id: agent._active_conv[user_id] = None return f"✓ Closed session `{conv_id}`" else: return f"Session `{conv_id}` not found." except PermissionError as e: return str(e) async def _cmd_switch(user_id: str, args: str) -> str: """Switch to a different session.""" sessions = manager.list_sessions(user_id=user_id) if not sessions: return "No sessions available." if not args: return "Usage: /switch \n" + await _cmd_status(user_id) try: idx = int(args) - 1 if 0 <= idx < len(sessions): conv_id = sessions[idx]["conv_id"] agent._active_conv[user_id] = conv_id return f"✓ Switched to session `{conv_id}` ({sessions[idx]['cwd']})" else: return f"Invalid session number. Use 1-{len(sessions)}." except ValueError: return f"Invalid number: {args}" async def _cmd_perm(user_id: str, args: str) -> str: """Change the permission mode of the active (or specified) session.""" parts = args.split() if not parts: return ( "Usage: /perm [conv_id]\n" "Modes: bypass (default), accept, plan\n" " bypass — skip all permission checks\n" " accept — auto-accept file edits, confirm shell commands\n" " plan — plan only, no writes" ) alias = parts[0] conv_id = parts[1] if len(parts) > 1 else agent.get_active_conv(user_id) permission_mode = _resolve_perm(alias) if permission_mode is None: return f"Unknown mode '{alias}'. Valid: bypass, accept, plan" if not conv_id: return "No active session. Use `/perm ` or activate a session first." try: manager.set_permission_mode(conv_id, permission_mode, user_id=user_id) except KeyError: return f"Session `{conv_id}` not found." except PermissionError as e: return str(e) return f"✓ Session `{conv_id}` permission mode set to **{_perm_label(permission_mode)}**" async def _cmd_retry(user_id: str) -> str: """Retry the last message (placeholder - needs history tracking).""" return "Retry not yet implemented. Just send your message again." def _cmd_direct(user_id: str) -> str: """Enable direct mode - messages go straight to Claude Code.""" conv = agent.get_active_conv(user_id) if not conv: return "No active session. Use `/new` or `/switch` first." agent.set_passthrough(user_id, True) return f"✓ Direct mode ON. Messages go directly to session `{conv}`." def _cmd_smart(user_id: str) -> str: """Enable smart mode - messages go through LLM for routing.""" agent.set_passthrough(user_id, False) return "✓ Smart mode ON. Messages go through LLM for intelligent routing." def _cmd_tasks() -> str: """List background tasks.""" tasks = task_runner.list_tasks() if not tasks: return "No background tasks." lines = ["**Background Tasks:**\n"] for t in tasks: status_emoji = {"completed": "✅", "failed": "❌", "running": "⏳", "pending": "⏸️"}.get( t["status"], "❓" ) lines.append(f"{status_emoji} #{t['task_id']} - {t['description'][:50]} ({t['elapsed']}s)") return "\n".join(lines) async def _cmd_shell(args: str) -> str: """Execute a shell command directly.""" if not args: return "Usage: /shell \nExample: /shell git status" from orchestrator.tools import ShellTool, get_current_chat tool = ShellTool() result = await tool._arun(command=args) try: data = json.loads(result) if "error" in data: return f"❌ {data['error']}" output = [] if data.get("stdout"): output.append(data["stdout"]) if data.get("stderr"): output.append(f"[stderr] {data['stderr']}") output.append(f"[exit code: {data.get('exit_code', '?')}]") return "\n".join(output) if output else "(no output)" except json.JSONDecodeError: return result async def _cmd_remind(args: str) -> str: """Set a reminder.""" if not args: return "Usage: /remind