mirror of
https://github.com/shareAI-lab/analysis_claude_code.git
synced 2026-03-22 10:25:41 +08:00
- 11 sessions from basic agent loop to autonomous teams - Python MVP implementations for each session - Mental-model-first docs in en/zh/ja - Interactive web platform with step-through visualizations - Incremental architecture: each session adds one mechanism
215 lines
7.9 KiB
Python
215 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
s05_skill_loading.py - Skills
|
|
|
|
Two-layer skill injection that avoids bloating the system prompt:
|
|
|
|
Layer 1 (cheap): skill names in system prompt (~100 tokens/skill)
|
|
Layer 2 (on demand): full skill body in tool_result
|
|
|
|
System prompt:
|
|
+--------------------------------------+
|
|
| You are a coding agent. |
|
|
| Skills available: |
|
|
| - git: Git workflow helpers | <-- Layer 1: metadata only
|
|
| - test: Testing best practices |
|
|
+--------------------------------------+
|
|
|
|
When model calls load_skill("git"):
|
|
+--------------------------------------+
|
|
| tool_result: |
|
|
| <skill> |
|
|
| Full git workflow instructions... | <-- Layer 2: full body
|
|
| Step 1: ... |
|
|
| Step 2: ... |
|
|
| </skill> |
|
|
+--------------------------------------+
|
|
|
|
Key insight: "Don't put everything in the system prompt. Load on demand."
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from anthropic import Anthropic
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(override=True)
|
|
|
|
if os.getenv("ANTHROPIC_BASE_URL"):
|
|
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
|
|
|
|
WORKDIR = Path.cwd()
|
|
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
|
|
MODEL = os.environ["MODEL_ID"]
|
|
SKILLS_DIR = WORKDIR / ".skills"
|
|
|
|
|
|
# -- SkillLoader: parse .skills/*.md files with YAML frontmatter --
|
|
class SkillLoader:
|
|
def __init__(self, skills_dir: Path):
|
|
self.skills_dir = skills_dir
|
|
self.skills = {}
|
|
self._load_all()
|
|
|
|
def _load_all(self):
|
|
if not self.skills_dir.exists():
|
|
return
|
|
for f in sorted(self.skills_dir.glob("*.md")):
|
|
name = f.stem
|
|
text = f.read_text()
|
|
meta, body = self._parse_frontmatter(text)
|
|
self.skills[name] = {"meta": meta, "body": body, "path": str(f)}
|
|
|
|
def _parse_frontmatter(self, text: str) -> tuple:
|
|
"""Parse YAML frontmatter between --- delimiters."""
|
|
match = re.match(r"^---\n(.*?)\n---\n(.*)", text, re.DOTALL)
|
|
if not match:
|
|
return {}, text
|
|
meta = {}
|
|
for line in match.group(1).strip().splitlines():
|
|
if ":" in line:
|
|
key, val = line.split(":", 1)
|
|
meta[key.strip()] = val.strip()
|
|
return meta, match.group(2).strip()
|
|
|
|
def get_descriptions(self) -> str:
|
|
"""Layer 1: short descriptions for the system prompt."""
|
|
if not self.skills:
|
|
return "(no skills available)"
|
|
lines = []
|
|
for name, skill in self.skills.items():
|
|
desc = skill["meta"].get("description", "No description")
|
|
tags = skill["meta"].get("tags", "")
|
|
line = f" - {name}: {desc}"
|
|
if tags:
|
|
line += f" [{tags}]"
|
|
lines.append(line)
|
|
return "\n".join(lines)
|
|
|
|
def get_content(self, name: str) -> str:
|
|
"""Layer 2: full skill body returned in tool_result."""
|
|
skill = self.skills.get(name)
|
|
if not skill:
|
|
return f"Error: Unknown skill '{name}'. Available: {', '.join(self.skills.keys())}"
|
|
return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"
|
|
|
|
|
|
SKILL_LOADER = SkillLoader(SKILLS_DIR)
|
|
|
|
# Layer 1: skill metadata injected into system prompt
|
|
SYSTEM = f"""You are a coding agent at {WORKDIR}.
|
|
Use load_skill to access specialized knowledge before tackling unfamiliar topics.
|
|
|
|
Skills available:
|
|
{SKILL_LOADER.get_descriptions()}"""
|
|
|
|
|
|
# -- Tool implementations --
|
|
def safe_path(p: str) -> Path:
|
|
path = (WORKDIR / p).resolve()
|
|
if not path.is_relative_to(WORKDIR):
|
|
raise ValueError(f"Path escapes workspace: {p}")
|
|
return path
|
|
|
|
def run_bash(command: str) -> str:
|
|
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
|
|
if any(d in command for d in dangerous):
|
|
return "Error: Dangerous command blocked"
|
|
try:
|
|
r = subprocess.run(command, shell=True, cwd=WORKDIR,
|
|
capture_output=True, text=True, timeout=120)
|
|
out = (r.stdout + r.stderr).strip()
|
|
return out[:50000] if out else "(no output)"
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Timeout (120s)"
|
|
|
|
def run_read(path: str, limit: int = None) -> str:
|
|
try:
|
|
lines = safe_path(path).read_text().splitlines()
|
|
if limit and limit < len(lines):
|
|
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
|
|
return "\n".join(lines)[:50000]
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def run_write(path: str, content: str) -> str:
|
|
try:
|
|
fp = safe_path(path)
|
|
fp.parent.mkdir(parents=True, exist_ok=True)
|
|
fp.write_text(content)
|
|
return f"Wrote {len(content)} bytes"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def run_edit(path: str, old_text: str, new_text: str) -> str:
|
|
try:
|
|
fp = safe_path(path)
|
|
content = fp.read_text()
|
|
if old_text not in content:
|
|
return f"Error: Text not found in {path}"
|
|
fp.write_text(content.replace(old_text, new_text, 1))
|
|
return f"Edited {path}"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
TOOL_HANDLERS = {
|
|
"bash": lambda **kw: run_bash(kw["command"]),
|
|
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
|
|
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
|
|
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
|
|
"load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
|
|
}
|
|
|
|
TOOLS = [
|
|
{"name": "bash", "description": "Run a shell command.",
|
|
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
|
|
{"name": "read_file", "description": "Read file contents.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
|
|
{"name": "write_file", "description": "Write content to file.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
|
|
{"name": "edit_file", "description": "Replace exact text in file.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
|
|
{"name": "load_skill", "description": "Load specialized knowledge by name.",
|
|
"input_schema": {"type": "object", "properties": {"name": {"type": "string", "description": "Skill name to load"}}, "required": ["name"]}},
|
|
]
|
|
|
|
|
|
def agent_loop(messages: list):
|
|
while True:
|
|
response = client.messages.create(
|
|
model=MODEL, system=SYSTEM, messages=messages,
|
|
tools=TOOLS, max_tokens=8000,
|
|
)
|
|
messages.append({"role": "assistant", "content": response.content})
|
|
if response.stop_reason != "tool_use":
|
|
return
|
|
results = []
|
|
for block in response.content:
|
|
if block.type == "tool_use":
|
|
handler = TOOL_HANDLERS.get(block.name)
|
|
try:
|
|
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
|
|
except Exception as e:
|
|
output = f"Error: {e}"
|
|
print(f"> {block.name}: {str(output)[:200]}")
|
|
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
|
|
messages.append({"role": "user", "content": results})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
history = []
|
|
while True:
|
|
try:
|
|
query = input("\033[36ms05 >> \033[0m")
|
|
except (EOFError, KeyboardInterrupt):
|
|
break
|
|
if query.strip().lower() in ("q", "exit", ""):
|
|
break
|
|
history.append({"role": "user", "content": query})
|
|
agent_loop(history)
|
|
print()
|