- Implement SDK session with secretary model for tool approval flow - Add audit logging for tool usage and permission decisions - Support Feishu card interactions for approval requests - Add new commands for task interruption and progress checking - Remove old test files and update documentation
73 lines
2.0 KiB
Python
73 lines
2.0 KiB
Python
"""SDK hooks for audit logging and dangerous command blocking."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
|
|
from claude_agent_sdk import HookContext, HookInput, HookJSONOutput, HookMatcher
|
|
|
|
BLOCKED_PATTERNS = [
|
|
r"\brm\s+-rf\s+/",
|
|
r"\brm\s+-rf\s+~",
|
|
r"\bformat\s+",
|
|
r"\bmkfs\b",
|
|
r"\bshutdown\b",
|
|
r"\breboot\b",
|
|
r"\bdd\s+if=",
|
|
r":\(\)\{:\|:&\};:",
|
|
r"\bchmod\s+777\s+/",
|
|
r"\bchown\s+.*\s+/",
|
|
r"\bsudo\s+rm\b",
|
|
r"\bsudo\s+chmod\b",
|
|
r"\bsudo\s+chown\b",
|
|
r"\bsudo\s+dd\b",
|
|
r"\bkill\s+-9\s+1\b",
|
|
]
|
|
|
|
|
|
async def audit_hook(
|
|
input_data: HookInput, tool_use_id: str | None, context: HookContext
|
|
) -> HookJSONOutput:
|
|
"""PostToolUse hook — log tool calls to audit JSONL."""
|
|
from agent.audit import log_tool_use
|
|
|
|
log_tool_use(
|
|
session_id=input_data.get("session_id", ""),
|
|
tool_name=input_data.get("tool_name", ""),
|
|
tool_input=input_data.get("tool_input", {}),
|
|
tool_response=input_data.get("tool_response"),
|
|
)
|
|
return {}
|
|
|
|
|
|
async def deny_dangerous_hook(
|
|
input_data: HookInput, tool_use_id: str | None, context: HookContext
|
|
) -> HookJSONOutput:
|
|
"""PreToolUse hook — block dangerous Bash commands."""
|
|
if input_data.get("tool_name") != "Bash":
|
|
return {}
|
|
|
|
command = input_data.get("tool_input", {}).get("command", "")
|
|
for pattern in BLOCKED_PATTERNS:
|
|
if re.search(pattern, command, re.IGNORECASE):
|
|
return {
|
|
"hookSpecificOutput": {
|
|
"hookEventName": "PreToolUse",
|
|
"permissionDecision": "deny",
|
|
"permissionDecisionReason": f"Blocked by policy: matches {pattern}",
|
|
}
|
|
}
|
|
return {}
|
|
|
|
|
|
def build_hooks(conv_id: str) -> dict[str, list[HookMatcher]]:
|
|
"""Build hooks configuration for a session."""
|
|
return {
|
|
"PostToolUse": [
|
|
HookMatcher(matcher="Bash|Edit|Write|MultiEdit", hooks=[audit_hook]),
|
|
],
|
|
"PreToolUse": [
|
|
HookMatcher(matcher="Bash", hooks=[deny_dangerous_hook]),
|
|
],
|
|
}
|