Yuyao Huang (Sam) cbeafa35a5 feat(perm): 添加会话权限模式管理功能
实现会话权限模式管理功能,包括:
1. 在 pty_process 中定义三种权限模式标志
2. 添加 /perm 命令用于修改会话权限模式
3. 新增 run_command 工具用于执行 bot 控制命令
4. 在会话管理中支持权限模式设置
5. 添加完整的测试用例和文档说明
2026-03-29 06:46:45 +08:00

302 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""LangChain orchestration agent backed by ZhipuAI (OpenAI-compatible API).
Uses LangChain 1.x tool-calling pattern: bind_tools + manual agentic loop.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
from collections import defaultdict
from typing import Dict, List, Optional
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
SystemMessage,
ToolMessage,
)
from langchain_openai import ChatOpenAI
from agent.manager import manager
from config import OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL, WORKING_DIR
from orchestrator.tools import TOOLS, set_current_user
logger = logging.getLogger(__name__)
SYSTEM_PROMPT_TEMPLATE = """You are PhoneWork, an AI assistant that helps users control Claude Code \
from their phone via Feishu (飞书).
Today's date: {today}
You manage Claude Code sessions. Each session has a conv_id and runs in a project directory.
Base working directory: {working_dir}
Users refer to projects by subfolder name (e.g. "todo_app") or relative path. \
Pass these names directly to `create_conversation` — the tool resolves them automatically.
{active_session_line}
## Tools — two distinct categories
### Bot control commands (use `run_command`)
`run_command` executes PhoneWork slash commands. Use it when the user asks to:
- Change permission mode: "切换到只读模式" → run_command("/perm plan")
- Close/switch sessions: "关掉第一个" → run_command("/close 1")
- Change routing mode: "切换到直连模式" → run_command("/direct")
- Set a reminder: "10分钟后提醒我" → run_command("/remind 10m 提醒我")
- Check status: "看看现在有哪些 session" → run_command("/status")
- Any other /command the user would type manually
Available bot commands (pass verbatim to run_command):
/new <dir> [msg] [--perm bypass|accept|plan] — create session
/close [n|conv_id] — close session
/switch <n> — switch active session
/perm <mode> [conv_id] — permission mode: bypass (default), accept, plan
/direct — direct mode (bypass LLM for CC messages)
/smart — smart mode (LLM routing, default)
/status — list sessions
/remind <Ns|Nm|Nh> <msg> — one-shot reminder
/tasks — list background tasks
### Host shell commands (use `run_shell`)
`run_shell` executes shell commands on the host machine (git, ls, cat, pip, etc.).
NEVER use `run_shell` for bot control. NEVER use `run_command` for shell commands.
## Session responsibilities
1. NEW session: call `create_conversation` with the project name/path. \
If the user's message also contains a task, pass it as `initial_message` too.
2. Follow-up to ACTIVE session: call `send_to_conversation` with the active conv_id shown above.
3. BOT CONTROL: call `run_command` with the appropriate slash command.
4. GENERAL QUESTIONS: answer directly — do NOT create a session for simple Q&A.
5. WEB / SEARCH: use `web` at most twice, then synthesize and reply.
6. BACKGROUND TASKS: when a task starts, reply immediately — do NOT poll `task_status`.
Guidelines:
- Relay Claude Code's output verbatim.
- If no active session and the user sends a task without naming a directory, ask which project.
- After using any tool, always produce a final text reply. Never end a turn on a tool call.
- Keep your own words brief — let Claude Code's output speak.
- Reply in the same language the user uses (Chinese or English).
"""
MAX_ITERATIONS = 10
_TOOL_MAP = {t.name: t for t in TOOLS}
QUESTION_PATTERNS = [
r'\?$', # ends with ?
r'$', # ends with Chinese ?
r'\b(what|how|why|when|where|who|which|explain|describe|tell me|can you|could you|is there|are there|do you know)\b',
r'(什么|怎么|为什么|何时|哪里|谁|哪个|解释|描述|告诉我|能否|可以|有没有|是不是)',
]
def _is_general_question(text: str) -> bool:
"""Check if text looks like a general knowledge question (not a project task)."""
text_lower = text.lower().strip()
project_indicators = [
'create', 'make', 'build', 'fix', 'update', 'delete', 'remove', 'add',
'implement', 'refactor', 'test', 'run', 'execute', 'start', 'stop',
'project', 'folder', 'directory', 'file', 'code', 'session',
'创建', '制作', '构建', '修复', '更新', '删除', '添加', '实现', '重构', '测试', '运行', '项目', '文件夹', '文件', '代码',
]
for indicator in project_indicators:
if indicator in text_lower:
return False
for pattern in QUESTION_PATTERNS:
if re.search(pattern, text_lower, re.IGNORECASE):
return True
return False
class OrchestrationAgent:
"""Per-user agent with conversation history and active session tracking."""
def __init__(self) -> None:
llm = ChatOpenAI(
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
model=OPENAI_MODEL,
temperature=0.1,
)
self._llm_with_tools = llm.bind_tools(TOOLS)
# user_id -> list[BaseMessage]
self._history: dict[str, list[BaseMessage]] = defaultdict(list)
# user_id -> most recently active conv_id
self._active_conv: dict[str, Optional[str]] = defaultdict(lambda: None)
# user_id -> asyncio.Lock (prevents concurrent processing per user)
self._user_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
# user_id -> passthrough mode enabled
self._passthrough: dict[str, bool] = defaultdict(lambda: False)
def _build_system_prompt(self, user_id: str) -> str:
from datetime import date
today = date.today().strftime("%Y-%m-%d")
conv_id = self._active_conv[user_id]
if conv_id:
active_line = f"ACTIVE SESSION: conv_id={conv_id!r} ← use this for all follow-up messages"
else:
active_line = "ACTIVE SESSION: none"
return SYSTEM_PROMPT_TEMPLATE.format(
working_dir=WORKING_DIR,
active_session_line=active_line,
today=today,
)
def get_active_conv(self, user_id: str) -> Optional[str]:
return self._active_conv.get(user_id)
def get_passthrough(self, user_id: str) -> bool:
return self._passthrough.get(user_id, False)
def set_passthrough(self, user_id: str, enabled: bool) -> None:
self._passthrough[user_id] = enabled
async def run(self, user_id: str, text: str) -> str:
"""Process a user message and return the agent's reply."""
async with self._user_locks[user_id]:
return await self._run_locked(user_id, text)
async def _run_locked(self, user_id: str, text: str) -> str:
"""Internal implementation, must be called with user lock held."""
set_current_user(user_id)
active_conv = self._active_conv[user_id]
short_uid = user_id[-8:]
logger.info(">>> user=...%s conv=%s msg=%r", short_uid, active_conv, text[:80])
logger.debug(" history_len=%d", len(self._history[user_id]))
# Passthrough mode: if enabled and active session, bypass LLM
if self._passthrough[user_id] and active_conv:
try:
reply = await manager.send(active_conv, text, user_id=user_id)
logger.info("<<< [passthrough] reply: %r", reply[:120])
return reply
except KeyError:
logger.warning("Session %s no longer exists, clearing active_conv", active_conv)
self._active_conv[user_id] = None
except Exception as exc:
logger.exception("Passthrough error for user=%s", user_id)
return f"[Error] {exc}"
# Direct Q&A: if no active session and message looks like a general question, answer directly
if not active_conv and _is_general_question(text):
logger.debug(" → direct Q&A (no tools)")
llm_no_tools = ChatOpenAI(
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
model=OPENAI_MODEL,
temperature=0.7,
)
qa_prompt = (
"You are a helpful assistant. Answer the user's question concisely and accurately. "
"Reply in the same language the user uses.\n\n"
f"Question: {text}"
)
response = await llm_no_tools.ainvoke([HumanMessage(content=qa_prompt)])
return response.content or ""
messages: list[BaseMessage] = (
[SystemMessage(content=self._build_system_prompt(user_id))]
+ self._history[user_id]
+ [HumanMessage(content=text)]
)
reply = ""
try:
web_calls = 0
task_status_calls = 0
for iteration in range(MAX_ITERATIONS):
logger.debug(" LLM call #%d", iteration)
ai_msg: AIMessage = await self._llm_with_tools.ainvoke(messages)
messages.append(ai_msg)
if not ai_msg.tool_calls:
reply = ai_msg.content or ""
logger.debug(" → done (no tool call)")
break
for tc in ai_msg.tool_calls:
tool_name = tc["name"]
tool_args = tc["args"]
tool_id = tc["id"]
args_summary = ", ".join(
f"{k}={str(v)[:50]!r}" for k, v in tool_args.items()
)
logger.info("%s(%s)", tool_name, args_summary)
if tool_name == "web":
web_calls += 1
if web_calls > 2:
result = "Web search limit reached. Synthesize from results already obtained."
logger.warning(" web call limit exceeded, blocking")
messages.append(
ToolMessage(content=str(result), tool_call_id=tool_id)
)
continue
if tool_name == "task_status":
task_status_calls += 1
if task_status_calls > 1:
result = "Task is still running in the background. Stop polling and tell the user they will be notified when it completes."
logger.warning(" task_status poll limit exceeded, blocking")
messages.append(
ToolMessage(content=str(result), tool_call_id=tool_id)
)
continue
tool_obj = _TOOL_MAP.get(tool_name)
if tool_obj is None:
result = f"Unknown tool: {tool_name}"
logger.warning(" unknown tool: %s", tool_name)
else:
try:
result = await tool_obj.arun(tool_args)
except Exception as exc:
result = f"Tool error: {exc}"
logger.error(" tool %s error: %s", tool_name, exc)
logger.debug("%s: %r", tool_name, str(result)[:120])
if tool_name == "create_conversation":
try:
data = json.loads(result)
if "conv_id" in data:
self._active_conv[user_id] = data["conv_id"]
logger.info(" ✓ active session → %s", data["conv_id"])
except Exception:
pass
messages.append(
ToolMessage(content=str(result), tool_call_id=tool_id)
)
else:
reply = "[Max iterations reached]"
logger.warning(" max iterations reached")
except Exception as exc:
logger.exception("agent error for user=%s", user_id)
reply = f"[Error] {exc}"
logger.info("<<< reply: %r", reply[:120])
# Update history
self._history[user_id].append(HumanMessage(content=text))
self._history[user_id].append(AIMessage(content=reply))
if len(self._history[user_id]) > 40:
self._history[user_id] = self._history[user_id][-40:]
return reply
agent = OrchestrationAgent()