From 0eb29f2dcc51ee4a2642e74b55f1827a577f85ae Mon Sep 17 00:00:00 2001 From: "Yuyao Huang (Sam)" Date: Sat, 28 Mar 2026 07:44:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=9D=E5=A7=8B=E5=8C=96=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=E5=9F=BA=E7=A1=80=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加项目基础文件和目录结构,包括: - 初始化空包目录(bot/agent/orchestrator) - 配置文件(config.py)和示例(keyring.example.yaml) - 依赖文件(requirements.txt) - 主程序入口(main.py) - 调试脚本(debug_test.py) - 文档说明(README.md) - Git忽略文件(.gitignore) - 核心功能模块(pty_process/manager/handler/feishu等) --- .gitignore | 72 +++++++++++++++++ README.md | 48 ++++++++++++ agent/__init__.py | 1 + agent/manager.py | 128 +++++++++++++++++++++++++++++++ agent/pty_process.py | 86 +++++++++++++++++++++ bot/__init__.py | 1 + bot/feishu.py | 79 +++++++++++++++++++ bot/handler.py | 130 +++++++++++++++++++++++++++++++ config.py | 19 +++++ debug_test.py | 31 ++++++++ keyring.example.yaml | 6 ++ main.py | 59 ++++++++++++++ orchestrator/__init__.py | 1 + orchestrator/agent.py | 150 ++++++++++++++++++++++++++++++++++++ orchestrator/tools.py | 162 +++++++++++++++++++++++++++++++++++++++ requirements.txt | 8 ++ 16 files changed, 981 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 agent/__init__.py create mode 100644 agent/manager.py create mode 100644 agent/pty_process.py create mode 100644 bot/__init__.py create mode 100644 bot/feishu.py create mode 100644 bot/handler.py create mode 100644 config.py create mode 100644 debug_test.py create mode 100644 keyring.example.yaml create mode 100644 main.py create mode 100644 orchestrator/__init__.py create mode 100644 orchestrator/agent.py create mode 100644 orchestrator/tools.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4a123c2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,72 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ +env/ + +# Environment variables and secrets +.env +.env.* +*.env +keyring.yaml +secrets.yaml +secrets.json + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ +.project +.pydevproject +.settings/ + +# Claude settings +.claude/ + +# Logs +*.log +logs/ + +# OS +.DS_Store +Thumbs.db +Desktop.ini + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.nox/ + +# MyPy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Ruff +.ruff_cache/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..899b74b --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +# PhoneWork + +Feishu bot integration with Claude Code CLI. + +## Architecture + +- **Agent CLI**: Claude Code (print mode) +- **Chat Server**: FastAPI +- **Client**: Feishu bot API (long-connection) + +## Setup + +### 1. Feishu App +Create app at https://open.feishu.cn: +- Enable **Bot** capability +- Enable **long-connection event subscription** (no public URL needed) +- Get `FEISHU_APP_ID` and `FEISHU_APP_SECRET` + +### 2. LLM Endpoint +Configure OpenAI-compatible endpoint: +- `OPENAI_BASE_URL` +- `OPENAI_API_KEY` +- `OPENAI_MODEL` + +### 3. Claude Code CLI +- Install and authenticate `claude` command +- Ensure available in PATH + +### 4. Configuration +```bash +cp keyring.example.yaml keyring.yaml +# Edit keyring.yaml with your credentials +``` + +### 5. Run +```bash +pip install -r requirements.txt +python main.py +``` + +## Requirements + +| Item | Notes | +|---|---| +| Python 3.11+ | Required | +| Feishu App | Bot + long-connection enabled | +| OpenAI-compatible LLM | API endpoint and key | +| Claude Code CLI | Installed + authenticated | diff --git a/agent/__init__.py b/agent/__init__.py new file mode 100644 index 0000000..912256a --- /dev/null +++ b/agent/__init__.py @@ -0,0 +1 @@ +# empty init diff --git a/agent/manager.py b/agent/manager.py new file mode 100644 index 0000000..978c431 --- /dev/null +++ b/agent/manager.py @@ -0,0 +1,128 @@ +"""Session registry: tracks active project sessions by conversation ID.""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from agent.pty_process import run_claude + +logger = logging.getLogger(__name__) + +IDLE_TIMEOUT = 30 * 60 # 30 minutes in seconds + + +@dataclass +class Session: + conv_id: str + cwd: str + # Stable UUID passed to `claude --session-id` so CC owns the history + cc_session_id: str = field(default_factory=lambda: str(uuid.uuid4())) + last_activity: float = field(default_factory=lambda: asyncio.get_event_loop().time()) + # True after the first message has been sent (so we know to use --resume) + started: bool = False + + def touch(self) -> None: + self.last_activity = asyncio.get_event_loop().time() + + +class SessionManager: + """Registry of active Claude Code project sessions.""" + + def __init__(self) -> None: + self._sessions: Dict[str, Session] = {} + self._lock = asyncio.Lock() + self._reaper_task: Optional[asyncio.Task] = None + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def start(self) -> None: + loop = asyncio.get_event_loop() + self._reaper_task = loop.create_task(self._reaper_loop()) + + async def stop(self) -> None: + if self._reaper_task: + self._reaper_task.cancel() + async with self._lock: + self._sessions.clear() + + async def create(self, conv_id: str, working_dir: str) -> Session: + """Register a new session for the given working directory.""" + async with self._lock: + session = Session(conv_id=conv_id, cwd=working_dir) + self._sessions[conv_id] = session + logger.info( + "Created session %s (cc_session_id=%s) in %s", + conv_id, session.cc_session_id, working_dir, + ) + return session + + async def send(self, conv_id: str, message: str) -> str: + """ + Run claude -p with the message in the session's directory. + + - First message: uses --session-id to establish the CC session. + - Subsequent messages: uses --resume so CC has full history. + """ + async with self._lock: + session = self._sessions.get(conv_id) + if session is None: + raise KeyError(f"No session for conv_id={conv_id!r}") + session.touch() + cwd = session.cwd + cc_session_id = session.cc_session_id + first_message = not session.started + if first_message: + session.started = True + + output = await run_claude( + message, + cwd=cwd, + cc_session_id=cc_session_id, + resume=not first_message, + ) + return output + + async def close(self, conv_id: str) -> bool: + """Remove a session. Returns True if it existed.""" + async with self._lock: + if conv_id not in self._sessions: + return False + del self._sessions[conv_id] + logger.info("Closed session %s", conv_id) + return True + + def list_sessions(self) -> list[dict]: + return [ + {"conv_id": s.conv_id, "cwd": s.cwd, "cc_session_id": s.cc_session_id} + for s in self._sessions.values() + ] + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + async def _reaper_loop(self) -> None: + while True: + await asyncio.sleep(60) + await self._reap_idle() + + async def _reap_idle(self) -> None: + now = asyncio.get_event_loop().time() + async with self._lock: + to_close = [ + cid for cid, s in self._sessions.items() + if (now - s.last_activity) > IDLE_TIMEOUT + ] + for cid in to_close: + del self._sessions[cid] + logger.info("Reaped idle session %s", cid) + + +# Module-level singleton +manager = SessionManager() diff --git a/agent/pty_process.py b/agent/pty_process.py new file mode 100644 index 0000000..73c16bb --- /dev/null +++ b/agent/pty_process.py @@ -0,0 +1,86 @@ +"""Headless Claude Code runner using `claude -p` (print/non-interactive mode).""" + +from __future__ import annotations + +import asyncio +import logging +import re +import sys + +logger = logging.getLogger(__name__) + +ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") +MAX_OUTPUT_CHARS = 8000 # truncate very long responses before sending to Feishu + + +def strip_ansi(text: str) -> str: + return ANSI_ESCAPE.sub("", text) + + +async def run_claude( + prompt: str, + cwd: str, + cc_session_id: str | None = None, + resume: bool = False, + timeout: float = 300.0, +) -> str: + """ + Run `claude -p ` in the given directory and return the output. + + Args: + prompt: The message/instruction to pass to Claude Code. + cwd: Working directory for the subprocess. + cc_session_id: Stable UUID for the Claude Code session. + - First call: passed as --session-id to establish the session. + - Subsequent calls: passed as --resume so CC has full history. + resume: If True, use --resume instead of --session-id. + timeout: Maximum seconds to wait before giving up. + """ + base_args = [ + "--dangerously-skip-permissions", + "-p", prompt, + ] + + if cc_session_id: + if resume: + base_args = ["--resume", cc_session_id] + base_args + else: + base_args = ["--session-id", cc_session_id] + base_args + + if sys.platform == "win32": + # `claude` is a .cmd shim on Windows — must go through cmd /c + args = ["cmd", "/c", "claude"] + base_args + else: + args = ["claude"] + base_args + + logger.debug("Running: %s (cwd=%s)", " ".join(args[:6]), cwd) + + proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=cwd, + ) + + try: + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + return f"[Timed out after {timeout:.0f}s]" + + output = stdout_bytes.decode("utf-8", errors="replace") + output = strip_ansi(output).strip() + + if proc.returncode != 0: + err = stderr_bytes.decode("utf-8", errors="replace").strip() + logger.warning("claude -p exited %d: %s", proc.returncode, err[:200]) + if not output: + output = f"[Error exit {proc.returncode}] {err[:500]}" + + if len(output) > MAX_OUTPUT_CHARS: + output = output[:MAX_OUTPUT_CHARS] + "\n...[truncated]" + + return output diff --git a/bot/__init__.py b/bot/__init__.py new file mode 100644 index 0000000..912256a --- /dev/null +++ b/bot/__init__.py @@ -0,0 +1 @@ +# empty init diff --git a/bot/feishu.py b/bot/feishu.py new file mode 100644 index 0000000..d742cc1 --- /dev/null +++ b/bot/feishu.py @@ -0,0 +1,79 @@ +"""Feishu API client: send messages back to users.""" + +from __future__ import annotations + +import logging + +import lark_oapi as lark +from lark_oapi.api.im.v1 import ( + CreateMessageRequest, + CreateMessageRequestBody, +) + +from config import FEISHU_APP_ID, FEISHU_APP_SECRET + +logger = logging.getLogger(__name__) + +# Max Feishu text message length +MAX_TEXT_LEN = 4000 + + +def _make_client() -> lark.Client: + return ( + lark.Client.builder() + .app_id(FEISHU_APP_ID) + .app_secret(FEISHU_APP_SECRET) + .log_level(lark.LogLevel.WARNING) + .build() + ) + + +_client = _make_client() + + +def _truncate(text: str) -> str: + if len(text) <= MAX_TEXT_LEN: + return text + return text[: MAX_TEXT_LEN - 20] + "\n...[truncated]" + + +async def send_text(receive_id: str, receive_id_type: str, text: str) -> None: + """ + Send a plain-text message to a Feishu chat or user. + + Args: + receive_id: chat_id or open_id depending on receive_id_type. + receive_id_type: "chat_id" | "open_id" | "user_id" | "union_id". + text: message content. + """ + import json as _json + content = _json.dumps({"text": _truncate(text)}, ensure_ascii=False) + + request = ( + CreateMessageRequest.builder() + .receive_id_type(receive_id_type) + .request_body( + CreateMessageRequestBody.builder() + .receive_id(receive_id) + .msg_type("text") + .content(content) + .build() + ) + .build() + ) + + import asyncio + loop = asyncio.get_event_loop() + response = await loop.run_in_executor( + None, + lambda: _client.im.v1.message.create(request), + ) + + if not response.success(): + logger.error( + "Feishu send_text failed: code=%s msg=%s", + response.code, + response.msg, + ) + else: + logger.debug("Sent message to %s (%s)", receive_id, receive_id_type) diff --git a/bot/handler.py b/bot/handler.py new file mode 100644 index 0000000..a06d795 --- /dev/null +++ b/bot/handler.py @@ -0,0 +1,130 @@ +"""Feishu event handler using lark-oapi long-connection (WebSocket) mode.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import threading + +import lark_oapi as lark +from lark_oapi.api.im.v1 import P2ImMessageReceiveV1 + +from bot.feishu import send_text +from config import FEISHU_APP_ID, FEISHU_APP_SECRET +from orchestrator.agent import agent + +logger = logging.getLogger(__name__) + +# Keep a reference to the running event loop so sync callbacks can schedule coroutines +_main_loop: asyncio.AbstractEventLoop | None = None + + +def _handle_message(data: P2ImMessageReceiveV1) -> None: + """ + Synchronous callback invoked by the lark-oapi SDK on every incoming message. + We schedule async work onto the main event loop. + """ + try: + message = data.event.message + sender = data.event.sender + + # Log raw event for debugging + logger.info( + "RAW event: type=%r content=%r chat_type=%r", + getattr(message, "message_type", None), + getattr(message, "content", None), + getattr(message, "chat_type", None), + ) + + # Only handle text messages + if message.message_type != "text": + logger.info("Skipping non-text message_type=%r", message.message_type) + return + + # Extract fields + chat_id: str = message.chat_id + raw_content: str = message.content or "{}" + content_obj = json.loads(raw_content) + text: str = content_obj.get("text", "").strip() + + # Strip @mentions (Feishu injects "@bot_name " at start of group messages) + import re + text = re.sub(r"@\S+\s*", "", text).strip() + + open_id: str = "" + if sender and sender.sender_id: + open_id = sender.sender_id.open_id or "" + + if not text: + logger.info("Empty text after stripping, ignoring") + return + + logger.info("Received message from %s in %s: %r", open_id, chat_id, text[:80]) + + # Use open_id as user identifier for per-user history in the orchestrator + user_id = open_id or chat_id + + if _main_loop is None: + logger.error("Main event loop not set; cannot process message") + return + + # Schedule async processing; fire-and-forget + asyncio.run_coroutine_threadsafe( + _process_message(user_id, chat_id, text), + _main_loop, + ) + except Exception: + logger.exception("Error in _handle_message") + + +async def _process_message(user_id: str, chat_id: str, text: str) -> None: + """Run the orchestration agent and send the reply back to Feishu.""" + try: + reply = await agent.run(user_id, text) + if reply: + await send_text(chat_id, "chat_id", reply) + except Exception: + logger.exception("Error processing message for user %s", user_id) + + +def _handle_any(data: lark.CustomizedEvent) -> None: + """Catch-all handler to log any event the SDK receives.""" + logger.info("RAW CustomizedEvent: %s", lark.JSON.marshal(data)[:500]) + + +def build_event_handler() -> lark.EventDispatcherHandler: + """Construct the EventDispatcherHandler with all registered callbacks.""" + handler = ( + lark.EventDispatcherHandler.builder("", "") + .register_p2_im_message_receive_v1(_handle_message) + .register_p1_customized_event("im.message.receive_v1", _handle_any) + .build() + ) + return handler + + +def start_websocket_client(loop: asyncio.AbstractEventLoop) -> None: + """ + Start the lark-oapi WebSocket long-connection client in a background thread. + Must be called after the asyncio event loop is running. + """ + global _main_loop + _main_loop = loop + + event_handler = build_event_handler() + + ws_client = lark.ws.Client( + FEISHU_APP_ID, + FEISHU_APP_SECRET, + event_handler=event_handler, + log_level=lark.LogLevel.INFO, + ) + + def _run() -> None: + logger.info("Starting Feishu long-connection client...") + ws_client.start() # blocks until disconnected + + thread = threading.Thread(target=_run, daemon=True, name="feishu-ws") + thread.start() + logger.info("Feishu WebSocket thread started") diff --git a/config.py b/config.py new file mode 100644 index 0000000..1f99f57 --- /dev/null +++ b/config.py @@ -0,0 +1,19 @@ +import yaml +from pathlib import Path + +_CONFIG_PATH = Path(__file__).parent / "keyring.yaml" + + +def _load() -> dict: + with open(_CONFIG_PATH, "r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} + + +_cfg = _load() + +FEISHU_APP_ID: str = _cfg["FEISHU_APP_ID"] +FEISHU_APP_SECRET: str = _cfg["FEISHU_APP_SECRET"] +OPENAI_BASE_URL: str = _cfg["OPENAI_BASE_URL"] +OPENAI_API_KEY: str = _cfg["OPENAI_API_KEY"] +OPENAI_MODEL: str = _cfg.get("OPENAI_MODEL", "glm-4.7") +WORKING_DIR: Path = Path(_cfg.get("WORKING_DIR", Path.home())).expanduser().resolve() diff --git a/debug_test.py b/debug_test.py new file mode 100644 index 0000000..82adc03 --- /dev/null +++ b/debug_test.py @@ -0,0 +1,31 @@ +"""Quick diagnostic script — run interactively to trace what's failing.""" +import asyncio +import logging +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") + +async def main(): + # Step 1: Can the agent call tools at all? + print("\n=== Step 1: Agent tool-calling test ===") + from orchestrator.agent import agent + reply = await agent.run( + "test_user", + "请在 C:\\Users\\hyuyao\\Documents\\PhoneWork 目录开一个新的 Claude Code 会话,conv_id 随便取。" + ) + print("Agent reply:", reply) + + # Step 2: Check manager sessions + print("\n=== Step 2: Active sessions ===") + from agent.manager import manager + print(manager.list_sessions()) + + # Step 3: If session exists, try sending a message + sessions = manager.list_sessions() + if sessions: + cid = sessions[0]["conv_id"] + print(f"\n=== Step 3: Send 'hello' to session {cid} ===") + out = await manager.send(cid, "hello") + print("Output:", out[:500]) + else: + print("No sessions — tool was not called or failed.") + +asyncio.run(main()) diff --git a/keyring.example.yaml b/keyring.example.yaml new file mode 100644 index 0000000..ea2990b --- /dev/null +++ b/keyring.example.yaml @@ -0,0 +1,6 @@ +FEISHU_APP_ID: your_feishu_app_id +FEISHU_APP_SECRET: your_feishu_app_secret +OPENAI_BASE_URL: https://api.openai.com/v1/ +OPENAI_API_KEY: your_openai_api_key +OPENAI_MODEL: gpt-4 +WORKING_DIR: "/path/to/working/directory" diff --git a/main.py b/main.py new file mode 100644 index 0000000..f2f2e20 --- /dev/null +++ b/main.py @@ -0,0 +1,59 @@ +"""PhoneWork entry point: FastAPI app + Feishu long-connection client.""" + +from __future__ import annotations + +import asyncio +import logging + +import uvicorn +from fastapi import FastAPI + +from agent.manager import manager +from bot.handler import start_websocket_client + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + +app = FastAPI(title="PhoneWork", version="0.1.0") + + +@app.get("/health") +async def health() -> dict: + sessions = manager.list_sessions() + return {"status": "ok", "active_sessions": len(sessions)} + + +@app.get("/sessions") +async def list_sessions() -> list: + return manager.list_sessions() + + +@app.on_event("startup") +async def startup_event() -> None: + # Start the session manager's idle-reaper + await manager.start() + + # Start the Feishu WebSocket client in a background thread, + # passing the running event loop so async work can be scheduled + loop = asyncio.get_event_loop() + start_websocket_client(loop) + logger.info("PhoneWork started") + + +@app.on_event("shutdown") +async def shutdown_event() -> None: + await manager.stop() + logger.info("PhoneWork shut down") + + +if __name__ == "__main__": + uvicorn.run( + "main:app", + host="0.0.0.0", + port=8000, + reload=False, + log_level="info", + ) diff --git a/orchestrator/__init__.py b/orchestrator/__init__.py new file mode 100644 index 0000000..912256a --- /dev/null +++ b/orchestrator/__init__.py @@ -0,0 +1 @@ +# empty init diff --git a/orchestrator/agent.py b/orchestrator/agent.py new file mode 100644 index 0000000..bc4d08a --- /dev/null +++ b/orchestrator/agent.py @@ -0,0 +1,150 @@ +"""LangChain orchestration agent backed by ZhipuAI (OpenAI-compatible API). + +Uses LangChain 1.x tool-calling pattern: bind_tools + manual agentic loop. +""" + +from __future__ import annotations + +import json +import logging +from collections import defaultdict +from typing import Dict, List, Optional + +from langchain_core.messages import ( + AIMessage, + BaseMessage, + HumanMessage, + SystemMessage, + ToolMessage, +) +from langchain_openai import ChatOpenAI + +from config import OPENAI_API_KEY, OPENAI_BASE_URL, OPENAI_MODEL, WORKING_DIR +from orchestrator.tools import TOOLS + +logger = logging.getLogger(__name__) + +SYSTEM_PROMPT_TEMPLATE = """You are PhoneWork, an AI assistant that helps users control Claude Code \ +from their phone via Feishu (飞书). + +You manage Claude Code sessions. Each session has a conv_id and runs in a project directory. + +Base working directory: {working_dir} +Users refer to projects by subfolder name (e.g. "todo_app") or relative path. \ +Pass these names directly to `create_conversation` — the tool resolves them automatically. + +{active_session_line} + +Your responsibilities: +1. NEW session: call `create_conversation` with the project name/path. \ + If the user's message also contains a task, pass it as `initial_message` too. +2. Follow-up to ACTIVE session: call `send_to_conversation` with the active conv_id shown above. +3. List sessions: call `list_conversations`. +4. Close session: call `close_conversation`. + +Guidelines: +- Relay Claude Code's output verbatim. +- If no active session and the user sends a task without naming a directory, ask them which project. +- Keep your own words brief — let Claude Code's output speak. +- Reply in the same language the user uses (Chinese or English). +""" + +MAX_ITERATIONS = 10 +_TOOL_MAP = {t.name: t for t in TOOLS} + + +class OrchestrationAgent: + """Per-user agent with conversation history and active session tracking.""" + + def __init__(self) -> None: + llm = ChatOpenAI( + base_url=OPENAI_BASE_URL, + api_key=OPENAI_API_KEY, + model=OPENAI_MODEL, + temperature=0.0, + ) + self._llm_with_tools = llm.bind_tools(TOOLS) + + # user_id -> list[BaseMessage] + self._history: Dict[str, List[BaseMessage]] = defaultdict(list) + # user_id -> most recently active conv_id + self._active_conv: Dict[str, Optional[str]] = defaultdict(lambda: None) + + def _build_system_prompt(self, user_id: str) -> str: + conv_id = self._active_conv[user_id] + if conv_id: + active_line = f"ACTIVE SESSION: conv_id={conv_id!r} ← use this for all follow-up messages" + else: + active_line = "ACTIVE SESSION: none" + return SYSTEM_PROMPT_TEMPLATE.format( + working_dir=WORKING_DIR, + active_session_line=active_line, + ) + + async def run(self, user_id: str, text: str) -> str: + """Process a user message and return the agent's reply.""" + messages: List[BaseMessage] = ( + [SystemMessage(content=self._build_system_prompt(user_id))] + + self._history[user_id] + + [HumanMessage(content=text)] + ) + + reply = "" + try: + for _ in range(MAX_ITERATIONS): + ai_msg: AIMessage = await self._llm_with_tools.ainvoke(messages) + messages.append(ai_msg) + + if not ai_msg.tool_calls: + reply = ai_msg.content or "" + break + + for tc in ai_msg.tool_calls: + tool_name = tc["name"] + tool_args = tc["args"] + tool_id = tc["id"] + + tool_obj = _TOOL_MAP.get(tool_name) + if tool_obj is None: + result = f"Unknown tool: {tool_name}" + else: + try: + result = await tool_obj.arun(tool_args) + except Exception as exc: + result = f"Tool error: {exc}" + + # If a session was just created, record it as the active session + if tool_name == "create_conversation": + try: + data = json.loads(result) + if "conv_id" in data: + self._active_conv[user_id] = data["conv_id"] + logger.info( + "Active session for %s set to %s", + user_id, data["conv_id"], + ) + except Exception: + pass + + messages.append( + ToolMessage(content=str(result), tool_call_id=tool_id) + ) + else: + reply = "[Max iterations reached]" + + except Exception as exc: + logger.exception("Agent error for user %s", user_id) + reply = f"[Error] {exc}" + + # Update history + self._history[user_id].append(HumanMessage(content=text)) + self._history[user_id].append(AIMessage(content=reply)) + + if len(self._history[user_id]) > 40: + self._history[user_id] = self._history[user_id][-40:] + + return reply + + +# Module-level singleton +agent = OrchestrationAgent() diff --git a/orchestrator/tools.py b/orchestrator/tools.py new file mode 100644 index 0000000..349a3df --- /dev/null +++ b/orchestrator/tools.py @@ -0,0 +1,162 @@ +"""LangChain tools that bridge the orchestration agent to Claude Code PTY sessions.""" + +from __future__ import annotations + +import json +import uuid +from pathlib import Path +from typing import Optional, Type + +from langchain_core.tools import BaseTool +from pydantic import BaseModel, Field + +from agent.manager import manager +from config import WORKING_DIR + + +def _resolve_dir(working_dir: str) -> Path: + """ + Resolve working_dir to an absolute path under WORKING_DIR. + + Rules: + - Absolute paths are used as-is (but must stay within WORKING_DIR for safety). + - Relative paths / bare names are joined onto WORKING_DIR. + - The resolved directory is created if it doesn't exist. + """ + p = Path(working_dir) + if not p.is_absolute(): + p = WORKING_DIR / p + p = p.resolve() + + # Safety: must be inside WORKING_DIR + try: + p.relative_to(WORKING_DIR) + except ValueError: + raise ValueError( + f"Directory {p} is outside the allowed base {WORKING_DIR}. " + "Please use a subfolder name or a path inside the working directory." + ) + + p.mkdir(parents=True, exist_ok=True) + return p + + +# --------------------------------------------------------------------------- +# Input schemas +# --------------------------------------------------------------------------- + +class CreateConversationInput(BaseModel): + working_dir: str = Field( + ..., + description=( + "The project directory. Can be a subfolder name (e.g. 'todo_app'), " + "a relative path (e.g. 'projects/todo_app'), or a full absolute path. " + "Relative names are resolved under the configured base working directory." + ), + ) + initial_message: Optional[str] = Field(None, description="Optional first message to send after spawning") + + +class SendToConversationInput(BaseModel): + conv_id: str = Field(..., description="Conversation ID returned by create_conversation") + message: str = Field(..., description="Message / instruction to send to Claude Code") + + +class CloseConversationInput(BaseModel): + conv_id: str = Field(..., description="Conversation ID to close") + + +# --------------------------------------------------------------------------- +# Tools +# --------------------------------------------------------------------------- + +class CreateConversationTool(BaseTool): + name: str = "create_conversation" + description: str = ( + "Spawn a new Claude Code session in the given working directory. " + "Returns a conv_id that must be used for subsequent messages. " + "Use this when the user wants to start a new task in a specific directory." + ) + args_schema: Type[BaseModel] = CreateConversationInput + + def _run(self, working_dir: str, initial_message: Optional[str] = None) -> str: + raise NotImplementedError("Use async version") + + async def _arun(self, working_dir: str, initial_message: Optional[str] = None) -> str: + try: + resolved = _resolve_dir(working_dir) + except ValueError as exc: + return json.dumps({"error": str(exc)}) + + conv_id = str(uuid.uuid4())[:8] + await manager.create(conv_id, str(resolved)) + + result: dict = { + "conv_id": conv_id, + "working_dir": str(resolved), + } + + if initial_message: + output = await manager.send(conv_id, initial_message) + result["response"] = output + else: + result["status"] = "Session created. Send a message to start working." + + return json.dumps(result, ensure_ascii=False) + + +class SendToConversationTool(BaseTool): + name: str = "send_to_conversation" + description: str = ( + "Send a message to an existing Claude Code session and return its response. " + "Use this for follow-up messages in an ongoing session." + ) + args_schema: Type[BaseModel] = SendToConversationInput + + def _run(self, conv_id: str, message: str) -> str: + raise NotImplementedError("Use async version") + + async def _arun(self, conv_id: str, message: str) -> str: + try: + output = await manager.send(conv_id, message) + return json.dumps({"conv_id": conv_id, "response": output}, ensure_ascii=False) + except KeyError: + return json.dumps({"error": f"No active session for conv_id={conv_id!r}"}) + + +class ListConversationsTool(BaseTool): + name: str = "list_conversations" + description: str = "List all currently active Claude Code sessions." + + def _run(self) -> str: + raise NotImplementedError("Use async version") + + async def _arun(self) -> str: + sessions = manager.list_sessions() + if not sessions: + return "No active sessions." + return json.dumps(sessions, ensure_ascii=False, indent=2) + + +class CloseConversationTool(BaseTool): + name: str = "close_conversation" + description: str = "Close and terminate an active Claude Code session." + args_schema: Type[BaseModel] = CloseConversationInput + + def _run(self, conv_id: str) -> str: + raise NotImplementedError("Use async version") + + async def _arun(self, conv_id: str) -> str: + closed = await manager.close(conv_id) + if closed: + return f"Session {conv_id!r} closed." + return f"Session {conv_id!r} not found." + + +# Module-level tool list for easy import +TOOLS = [ + CreateConversationTool(), + SendToConversationTool(), + ListConversationsTool(), + CloseConversationTool(), +] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6fedd38 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +fastapi>=0.111.0 +uvicorn>=0.29.0 +lark-oapi>=1.3.0 +langchain>=0.2.0 +langchain-openai>=0.1.0 +langchain-community>=0.2.0 +pywinpty>=2.0.0 +pyyaml>=6.0.0