"""Headless Claude Code runner using `claude -p` (print/non-interactive mode).""" from __future__ import annotations import asyncio import logging import re import sys logger = logging.getLogger(__name__) ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") MAX_OUTPUT_CHARS = 8000 # truncate very long responses before sending to Feishu def strip_ansi(text: str) -> str: return ANSI_ESCAPE.sub("", text) async def run_claude( prompt: str, cwd: str, cc_session_id: str | None = None, resume: bool = False, timeout: float = 300.0, ) -> str: """ Run `claude -p ` in the given directory and return the output. Args: prompt: The message/instruction to pass to Claude Code. cwd: Working directory for the subprocess. cc_session_id: Stable UUID for the Claude Code session. - First call: passed as --session-id to establish the session. - Subsequent calls: passed as --resume so CC has full history. resume: If True, use --resume instead of --session-id. timeout: Maximum seconds to wait before giving up. """ base_args = [ "--dangerously-skip-permissions", "-p", prompt, ] if cc_session_id: if resume: base_args = ["--resume", cc_session_id] + base_args else: base_args = ["--session-id", cc_session_id] + base_args if sys.platform == "win32": # `claude` is a .cmd shim on Windows — must go through cmd /c args = ["cmd", "/c", "claude"] + base_args else: args = ["claude"] + base_args logger.info( "[worker] %s cwd=%s prompt=%r", ("resume" if resume else "new") + f"/{cc_session_id[:8] if cc_session_id else '-'}", cwd, prompt[:80], ) proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, ) try: stdout_bytes, stderr_bytes = await asyncio.wait_for( proc.communicate(), timeout=timeout ) except asyncio.TimeoutError: proc.kill() await proc.communicate() logger.warning("[worker] timed out after %.0fs session_id=%s", timeout, cc_session_id) return f"[Timed out after {timeout:.0f}s]" output = stdout_bytes.decode("utf-8", errors="replace") output = strip_ansi(output).strip() if proc.returncode != 0: err = stderr_bytes.decode("utf-8", errors="replace").strip() logger.warning("[worker] exit=%d stderr=%r", proc.returncode, err[:200]) if not output: output = f"[Error exit {proc.returncode}] {err[:500]}" logger.info("[worker] output (%d chars): %r", len(output), output[:120]) if len(output) > MAX_OUTPUT_CHARS: output = output[:MAX_OUTPUT_CHARS] + "\n...[truncated]" return output