mirror of
https://github.com/shareAI-lab/analysis_claude_code.git
synced 2026-03-22 02:15:42 +08:00
Comprehensive rewrite establishing the harness engineering narrative across the entire repository. README (EN/ZH/JA): added "The Model IS the Agent" manifesto with historical proof (DQN, OpenAI Five, AlphaStar, Tencent Jueyu), "What an Agent Is NOT" critique, harness engineer role definition, "Why Claude Code" as masterclass in harness design, and universe vision. Consistent framing: model = driver, harness = vehicle. docs (36 files, 3 languages): injected one-line "Harness layer" callout after the motto in every session document (s01-s12). agents (13 Python files): added harness framing comment before each module docstring. skills/agent-philosophy.md: full rewrite aligned with harness narrative.
150 lines
5.5 KiB
Python
150 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
# Harness: tool dispatch -- expanding what the model can reach.
|
|
"""
|
|
s02_tool_use.py - Tools
|
|
|
|
The agent loop from s01 didn't change. We just added tools to the array
|
|
and a dispatch map to route calls.
|
|
|
|
+----------+ +-------+ +------------------+
|
|
| User | ---> | LLM | ---> | Tool Dispatch |
|
|
| prompt | | | | { |
|
|
+----------+ +---+---+ | bash: run_bash |
|
|
^ | read: run_read |
|
|
| | write: run_wr |
|
|
+----------+ edit: run_edit |
|
|
tool_result| } |
|
|
+------------------+
|
|
|
|
Key insight: "The loop didn't change at all. I just added tools."
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from anthropic import Anthropic
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(override=True)
|
|
|
|
if os.getenv("ANTHROPIC_BASE_URL"):
|
|
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
|
|
|
|
WORKDIR = Path.cwd()
|
|
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
|
|
MODEL = os.environ["MODEL_ID"]
|
|
|
|
SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain."
|
|
|
|
|
|
def safe_path(p: str) -> Path:
|
|
path = (WORKDIR / p).resolve()
|
|
if not path.is_relative_to(WORKDIR):
|
|
raise ValueError(f"Path escapes workspace: {p}")
|
|
return path
|
|
|
|
|
|
def run_bash(command: str) -> str:
|
|
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
|
|
if any(d in command for d in dangerous):
|
|
return "Error: Dangerous command blocked"
|
|
try:
|
|
r = subprocess.run(command, shell=True, cwd=WORKDIR,
|
|
capture_output=True, text=True, timeout=120)
|
|
out = (r.stdout + r.stderr).strip()
|
|
return out[:50000] if out else "(no output)"
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Timeout (120s)"
|
|
|
|
|
|
def run_read(path: str, limit: int = None) -> str:
|
|
try:
|
|
text = safe_path(path).read_text()
|
|
lines = text.splitlines()
|
|
if limit and limit < len(lines):
|
|
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
|
|
return "\n".join(lines)[:50000]
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
def run_write(path: str, content: str) -> str:
|
|
try:
|
|
fp = safe_path(path)
|
|
fp.parent.mkdir(parents=True, exist_ok=True)
|
|
fp.write_text(content)
|
|
return f"Wrote {len(content)} bytes to {path}"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
def run_edit(path: str, old_text: str, new_text: str) -> str:
|
|
try:
|
|
fp = safe_path(path)
|
|
content = fp.read_text()
|
|
if old_text not in content:
|
|
return f"Error: Text not found in {path}"
|
|
fp.write_text(content.replace(old_text, new_text, 1))
|
|
return f"Edited {path}"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
# -- The dispatch map: {tool_name: handler} --
|
|
TOOL_HANDLERS = {
|
|
"bash": lambda **kw: run_bash(kw["command"]),
|
|
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
|
|
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
|
|
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
|
|
}
|
|
|
|
TOOLS = [
|
|
{"name": "bash", "description": "Run a shell command.",
|
|
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
|
|
{"name": "read_file", "description": "Read file contents.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
|
|
{"name": "write_file", "description": "Write content to file.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
|
|
{"name": "edit_file", "description": "Replace exact text in file.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
|
|
]
|
|
|
|
|
|
def agent_loop(messages: list):
|
|
while True:
|
|
response = client.messages.create(
|
|
model=MODEL, system=SYSTEM, messages=messages,
|
|
tools=TOOLS, max_tokens=8000,
|
|
)
|
|
messages.append({"role": "assistant", "content": response.content})
|
|
if response.stop_reason != "tool_use":
|
|
return
|
|
results = []
|
|
for block in response.content:
|
|
if block.type == "tool_use":
|
|
handler = TOOL_HANDLERS.get(block.name)
|
|
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
|
|
print(f"> {block.name}: {output[:200]}")
|
|
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
|
|
messages.append({"role": "user", "content": results})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
history = []
|
|
while True:
|
|
try:
|
|
query = input("\033[36ms02 >> \033[0m")
|
|
except (EOFError, KeyboardInterrupt):
|
|
break
|
|
if query.strip().lower() in ("q", "exit", ""):
|
|
break
|
|
history.append({"role": "user", "content": query})
|
|
agent_loop(history)
|
|
response_content = history[-1]["content"]
|
|
if isinstance(response_content, list):
|
|
for block in response_content:
|
|
if hasattr(block, "text"):
|
|
print(block.text)
|
|
print()
|