PhoneWork/agent/pty_process.py
Yuyao Huang (Sam) b67e2dd2db feat: 增强日志功能并添加rich依赖
- 添加rich库依赖以改进日志显示
- 在各模块添加详细调试日志,包括消息处理、命令执行和工具调用过程
- 使用RichHandler美化日志输出并抑制第三方库的噪音日志
- 在关键路径添加日志记录,便于问题排查
2026-03-28 07:57:24 +08:00

93 lines
2.8 KiB
Python

"""Headless Claude Code runner using `claude -p` (print/non-interactive mode)."""
from __future__ import annotations
import asyncio
import logging
import re
import sys
logger = logging.getLogger(__name__)
ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
MAX_OUTPUT_CHARS = 8000 # truncate very long responses before sending to Feishu
def strip_ansi(text: str) -> str:
return ANSI_ESCAPE.sub("", text)
async def run_claude(
prompt: str,
cwd: str,
cc_session_id: str | None = None,
resume: bool = False,
timeout: float = 300.0,
) -> str:
"""
Run `claude -p <prompt>` in the given directory and return the output.
Args:
prompt: The message/instruction to pass to Claude Code.
cwd: Working directory for the subprocess.
cc_session_id: Stable UUID for the Claude Code session.
- First call: passed as --session-id to establish the session.
- Subsequent calls: passed as --resume so CC has full history.
resume: If True, use --resume instead of --session-id.
timeout: Maximum seconds to wait before giving up.
"""
base_args = [
"--dangerously-skip-permissions",
"-p", prompt,
]
if cc_session_id:
if resume:
base_args = ["--resume", cc_session_id] + base_args
else:
base_args = ["--session-id", cc_session_id] + base_args
if sys.platform == "win32":
# `claude` is a .cmd shim on Windows — must go through cmd /c
args = ["cmd", "/c", "claude"] + base_args
else:
args = ["claude"] + base_args
logger.debug(
"[worker] cmd=%s session_id=%s resume=%s cwd=%s prompt=%r",
" ".join(args[:6]), cc_session_id, resume, cwd, prompt[:120],
)
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
logger.warning("[worker] timed out after %.0fs session_id=%s", timeout, cc_session_id)
return f"[Timed out after {timeout:.0f}s]"
output = stdout_bytes.decode("utf-8", errors="replace")
output = strip_ansi(output).strip()
if proc.returncode != 0:
err = stderr_bytes.decode("utf-8", errors="replace").strip()
logger.warning("[worker] exit=%d stderr=%r", proc.returncode, err[:200])
if not output:
output = f"[Error exit {proc.returncode}] {err[:500]}"
logger.debug("[worker] output (%d chars): %r", len(output), output[:300])
if len(output) > MAX_OUTPUT_CHARS:
output = output[:MAX_OUTPUT_CHARS] + "\n...[truncated]"
return output