"""LangChain tools that bridge the orchestration agent to Claude Code PTY sessions.""" from __future__ import annotations import json import uuid from contextvars import ContextVar from pathlib import Path from typing import Optional, Type from langchain_core.tools import BaseTool from pydantic import BaseModel, Field from agent.manager import manager from config import WORKING_DIR _current_user_id: ContextVar[Optional[str]] = ContextVar("current_user_id", default=None) def set_current_user(user_id: Optional[str]) -> None: _current_user_id.set(user_id) def get_current_user() -> Optional[str]: return _current_user_id.get() def _resolve_dir(working_dir: str) -> Path: """ Resolve working_dir to an absolute path under WORKING_DIR. Rules: - Absolute paths are used as-is (but must stay within WORKING_DIR for safety). - Relative paths / bare names are joined onto WORKING_DIR. - Path traversal attempts (..) are blocked. - The resolved directory is created if it doesn't exist. """ working_dir = working_dir.strip() if ".." in working_dir.split("/") or ".." in working_dir.split("\\"): raise ValueError( "Path traversal not allowed. Use a subfolder name or path inside the working directory." ) p = Path(working_dir) if not p.is_absolute(): p = WORKING_DIR / p p = p.resolve() try: p.relative_to(WORKING_DIR) except ValueError: raise ValueError( f"Directory {p} is outside the allowed base {WORKING_DIR}. " "Please use a subfolder name or a path inside the working directory." ) p.mkdir(parents=True, exist_ok=True) return p # --------------------------------------------------------------------------- # Input schemas # --------------------------------------------------------------------------- class CreateConversationInput(BaseModel): working_dir: str = Field( ..., description=( "The project directory. Can be a subfolder name (e.g. 'todo_app'), " "a relative path (e.g. 'projects/todo_app'), or a full absolute path. " "Relative names are resolved under the configured base working directory." ), ) initial_message: Optional[str] = Field(None, description="Optional first message to send after spawning") idle_timeout: Optional[int] = Field(None, description="Idle timeout in seconds (default 1800)") cc_timeout: Optional[float] = Field(None, description="Claude Code execution timeout in seconds (default 300)") class SendToConversationInput(BaseModel): conv_id: str = Field(..., description="Conversation ID returned by create_conversation") message: str = Field(..., description="Message / instruction to send to Claude Code") class CloseConversationInput(BaseModel): conv_id: str = Field(..., description="Conversation ID to close") # --------------------------------------------------------------------------- # Tools # --------------------------------------------------------------------------- class CreateConversationTool(BaseTool): name: str = "create_conversation" description: str = ( "Spawn a new Claude Code session in the given working directory. " "Returns a conv_id that must be used for subsequent messages. " "Use this when the user wants to start a new task in a specific directory." ) args_schema: Type[BaseModel] = CreateConversationInput def _run(self, working_dir: str, initial_message: Optional[str] = None, idle_timeout: Optional[int] = None, cc_timeout: Optional[float] = None) -> str: raise NotImplementedError("Use async version") async def _arun(self, working_dir: str, initial_message: Optional[str] = None, idle_timeout: Optional[int] = None, cc_timeout: Optional[float] = None) -> str: try: resolved = _resolve_dir(working_dir) except ValueError as exc: return json.dumps({"error": str(exc)}) user_id = get_current_user() conv_id = str(uuid.uuid4())[:8] await manager.create( conv_id, str(resolved), owner_id=user_id or "", idle_timeout=idle_timeout or 1800, cc_timeout=cc_timeout or 300.0, ) result: dict = { "conv_id": conv_id, "working_dir": str(resolved), } if initial_message: output = await manager.send(conv_id, initial_message, user_id=user_id) result["response"] = output else: result["status"] = "Session created. Send a message to start working." return json.dumps(result, ensure_ascii=False) class SendToConversationTool(BaseTool): name: str = "send_to_conversation" description: str = ( "Send a message to an existing Claude Code session and return its response. " "Use this for follow-up messages in an ongoing session." ) args_schema: Type[BaseModel] = SendToConversationInput def _run(self, conv_id: str, message: str) -> str: raise NotImplementedError("Use async version") async def _arun(self, conv_id: str, message: str) -> str: user_id = get_current_user() try: output = await manager.send(conv_id, message, user_id=user_id) return json.dumps({"conv_id": conv_id, "response": output}, ensure_ascii=False) except KeyError: return json.dumps({"error": f"No active session for conv_id={conv_id!r}"}) except PermissionError as e: return json.dumps({"error": str(e)}) class ListConversationsTool(BaseTool): name: str = "list_conversations" description: str = "List all currently active Claude Code sessions." def _run(self) -> str: raise NotImplementedError("Use async version") async def _arun(self) -> str: user_id = get_current_user() sessions = manager.list_sessions(user_id=user_id) if not sessions: return "No active sessions." return json.dumps(sessions, ensure_ascii=False, indent=2) class CloseConversationTool(BaseTool): name: str = "close_conversation" description: str = "Close and terminate an active Claude Code session." args_schema: Type[BaseModel] = CloseConversationInput def _run(self, conv_id: str) -> str: raise NotImplementedError("Use async version") async def _arun(self, conv_id: str) -> str: user_id = get_current_user() try: closed = await manager.close(conv_id, user_id=user_id) if closed: return f"Session {conv_id!r} closed." return f"Session {conv_id!r} not found." except PermissionError as e: return str(e) # Module-level tool list for easy import TOOLS = [ CreateConversationTool(), SendToConversationTool(), ListConversationsTool(), CloseConversationTool(), ]