"""LangChain tools that bridge the orchestration agent to Claude Code PTY sessions.""" from __future__ import annotations import json import uuid from pathlib import Path from typing import Optional, Type from langchain_core.tools import BaseTool from pydantic import BaseModel, Field from agent.manager import manager from config import WORKING_DIR def _resolve_dir(working_dir: str) -> Path: """ Resolve working_dir to an absolute path under WORKING_DIR. Rules: - Absolute paths are used as-is (but must stay within WORKING_DIR for safety). - Relative paths / bare names are joined onto WORKING_DIR. - The resolved directory is created if it doesn't exist. """ p = Path(working_dir) if not p.is_absolute(): p = WORKING_DIR / p p = p.resolve() # Safety: must be inside WORKING_DIR try: p.relative_to(WORKING_DIR) except ValueError: raise ValueError( f"Directory {p} is outside the allowed base {WORKING_DIR}. " "Please use a subfolder name or a path inside the working directory." ) p.mkdir(parents=True, exist_ok=True) return p # --------------------------------------------------------------------------- # Input schemas # --------------------------------------------------------------------------- class CreateConversationInput(BaseModel): working_dir: str = Field( ..., description=( "The project directory. Can be a subfolder name (e.g. 'todo_app'), " "a relative path (e.g. 'projects/todo_app'), or a full absolute path. " "Relative names are resolved under the configured base working directory." ), ) initial_message: Optional[str] = Field(None, description="Optional first message to send after spawning") class SendToConversationInput(BaseModel): conv_id: str = Field(..., description="Conversation ID returned by create_conversation") message: str = Field(..., description="Message / instruction to send to Claude Code") class CloseConversationInput(BaseModel): conv_id: str = Field(..., description="Conversation ID to close") # --------------------------------------------------------------------------- # Tools # --------------------------------------------------------------------------- class CreateConversationTool(BaseTool): name: str = "create_conversation" description: str = ( "Spawn a new Claude Code session in the given working directory. " "Returns a conv_id that must be used for subsequent messages. " "Use this when the user wants to start a new task in a specific directory." ) args_schema: Type[BaseModel] = CreateConversationInput def _run(self, working_dir: str, initial_message: Optional[str] = None) -> str: raise NotImplementedError("Use async version") async def _arun(self, working_dir: str, initial_message: Optional[str] = None) -> str: try: resolved = _resolve_dir(working_dir) except ValueError as exc: return json.dumps({"error": str(exc)}) conv_id = str(uuid.uuid4())[:8] await manager.create(conv_id, str(resolved)) result: dict = { "conv_id": conv_id, "working_dir": str(resolved), } if initial_message: output = await manager.send(conv_id, initial_message) result["response"] = output else: result["status"] = "Session created. Send a message to start working." return json.dumps(result, ensure_ascii=False) class SendToConversationTool(BaseTool): name: str = "send_to_conversation" description: str = ( "Send a message to an existing Claude Code session and return its response. " "Use this for follow-up messages in an ongoing session." ) args_schema: Type[BaseModel] = SendToConversationInput def _run(self, conv_id: str, message: str) -> str: raise NotImplementedError("Use async version") async def _arun(self, conv_id: str, message: str) -> str: try: output = await manager.send(conv_id, message) return json.dumps({"conv_id": conv_id, "response": output}, ensure_ascii=False) except KeyError: return json.dumps({"error": f"No active session for conv_id={conv_id!r}"}) class ListConversationsTool(BaseTool): name: str = "list_conversations" description: str = "List all currently active Claude Code sessions." def _run(self) -> str: raise NotImplementedError("Use async version") async def _arun(self) -> str: sessions = manager.list_sessions() if not sessions: return "No active sessions." return json.dumps(sessions, ensure_ascii=False, indent=2) class CloseConversationTool(BaseTool): name: str = "close_conversation" description: str = "Close and terminate an active Claude Code session." args_schema: Type[BaseModel] = CloseConversationInput def _run(self, conv_id: str) -> str: raise NotImplementedError("Use async version") async def _arun(self, conv_id: str) -> str: closed = await manager.close(conv_id) if closed: return f"Session {conv_id!r} closed." return f"Session {conv_id!r} not found." # Module-level tool list for easy import TOOLS = [ CreateConversationTool(), SendToConversationTool(), ListConversationsTool(), CloseConversationTool(), ]