mirror of
https://github.com/shareAI-lab/analysis_claude_code.git
synced 2026-06-20 20:23:36 +08:00
409 lines
18 KiB
Python
409 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
s07: Skill Loading — two-level on-demand knowledge injection.
|
|
|
|
Layer 1 (cheap, always present):
|
|
SYSTEM prompt includes skill names + one-line descriptions (~100 tokens/skill)
|
|
"Skills available: agent-builder, code-review, mcp-builder, pdf"
|
|
|
|
Layer 2 (expensive, on demand):
|
|
Agent calls load_skill("code-review") → full SKILL.md content
|
|
injected via tool_result (~2000 tokens/skill)
|
|
|
|
skills/
|
|
agent-builder/SKILL.md
|
|
code-review/SKILL.md
|
|
mcp-builder/SKILL.md
|
|
pdf/SKILL.md
|
|
|
|
Changes from s06:
|
|
+ build_system() — scan skills/ dir at startup, inject catalog into SYSTEM
|
|
+ load_skill(name) — return full SKILL.md content via tool_result
|
|
+ SKILLS_DIR config
|
|
Loop unchanged: load_skill auto-dispatches via TOOL_HANDLERS.
|
|
|
|
Run: python s07_skill_loading/code.py
|
|
Needs: pip install anthropic python-dotenv + ANTHROPIC_API_KEY in .env
|
|
"""
|
|
|
|
import os, subprocess, json
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import readline
|
|
readline.parse_and_bind('set bind-tty-special-chars off')
|
|
except ImportError:
|
|
pass
|
|
|
|
from anthropic import Anthropic
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(override=True)
|
|
if os.getenv("ANTHROPIC_BASE_URL"):
|
|
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
|
|
|
|
WORKDIR = Path.cwd()
|
|
SKILLS_DIR = WORKDIR / "skills"
|
|
TASKS_DIR = WORKDIR / ".tasks"; TASKS_DIR.mkdir(exist_ok=True)
|
|
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
|
|
MODEL = os.environ["MODEL_ID"]
|
|
|
|
# s07: Skill catalog scan (used by build_system below)
|
|
def _parse_frontmatter(text: str) -> tuple[dict, str]:
|
|
"""Parse YAML frontmatter from SKILL.md. Returns (meta, body)."""
|
|
if not text.startswith("---"):
|
|
return {}, text
|
|
parts = text.split("---", 2)
|
|
if len(parts) < 3:
|
|
return {}, text
|
|
meta = {}
|
|
for line in parts[1].strip().splitlines():
|
|
if ":" in line:
|
|
k, v = line.split(":", 1)
|
|
meta[k.strip()] = v.strip().strip('"').strip("'")
|
|
return meta, parts[2].strip()
|
|
|
|
# Build skill registry at startup (used for safe lookup in load_skill)
|
|
SKILL_REGISTRY: dict[str, dict] = {}
|
|
|
|
def _scan_skills():
|
|
"""Scan skills/ dir, populate SKILL_REGISTRY with name/description/content."""
|
|
if not SKILLS_DIR.exists():
|
|
return
|
|
for d in sorted(SKILLS_DIR.iterdir()):
|
|
if not d.is_dir():
|
|
continue
|
|
manifest = d / "SKILL.md"
|
|
if manifest.exists():
|
|
raw = manifest.read_text()
|
|
meta, body = _parse_frontmatter(raw)
|
|
name = meta.get("name", d.name)
|
|
desc = meta.get("description", raw.split("\n")[0].lstrip("#").strip())
|
|
SKILL_REGISTRY[name] = {"name": name, "description": desc, "content": raw}
|
|
|
|
_scan_skills()
|
|
|
|
def list_skills() -> str:
|
|
"""List all skills (name + one-line description)."""
|
|
if not SKILL_REGISTRY:
|
|
return "(no skills found)"
|
|
return "\n".join(f"- **{s['name']}**: {s['description']}" for s in SKILL_REGISTRY.values())
|
|
|
|
# s07: SYSTEM includes skill catalog (cheap — just names + descriptions)
|
|
def build_system() -> str:
|
|
"""Build SYSTEM prompt with skill catalog injected at startup."""
|
|
catalog = list_skills()
|
|
return (
|
|
f"You are a coding agent at {WORKDIR}. "
|
|
f"Skills available:\n{catalog}\n"
|
|
"Use load_skill to get full details when needed."
|
|
)
|
|
|
|
SYSTEM = build_system()
|
|
|
|
# s07: subagent gets its own system prompt — no skill loading, no task
|
|
SUB_SYSTEM = (
|
|
f"You are a coding agent at {WORKDIR}. "
|
|
"Complete the task you were given, then return a concise summary. "
|
|
"Do not delegate further."
|
|
)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# FROM s02-s06 (unchanged): Tool Implementations
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
def safe_path(p: str) -> Path:
|
|
path = (WORKDIR / p).resolve()
|
|
if not path.is_relative_to(WORKDIR):
|
|
raise ValueError(f"Path escapes workspace: {p}")
|
|
return path
|
|
|
|
def run_bash(command: str) -> str:
|
|
try:
|
|
r = subprocess.run(command, shell=True, cwd=WORKDIR,
|
|
capture_output=True, text=True, timeout=120)
|
|
out = (r.stdout + r.stderr).strip()
|
|
return out[:50000] if out else "(no output)"
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Timeout (120s)"
|
|
|
|
def run_read(path: str, limit: int | None = None) -> str:
|
|
try:
|
|
lines = safe_path(path).read_text().splitlines()
|
|
if limit and limit < len(lines):
|
|
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def run_write(path: str, content: str) -> str:
|
|
try:
|
|
file_path = safe_path(path)
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
file_path.write_text(content)
|
|
return f"Wrote {len(content)} bytes to {path}"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def run_edit(path: str, old_text: str, new_text: str) -> str:
|
|
try:
|
|
file_path = safe_path(path)
|
|
text = file_path.read_text()
|
|
if old_text not in text:
|
|
return f"Error: text not found in {path}"
|
|
file_path.write_text(text.replace(old_text, new_text, 1))
|
|
return f"Edited {path}"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def run_glob(pattern: str) -> str:
|
|
import glob as g
|
|
try:
|
|
results = []
|
|
for match in g.glob(pattern, root_dir=WORKDIR):
|
|
if (WORKDIR / match).resolve().is_relative_to(WORKDIR):
|
|
results.append(match)
|
|
return "\n".join(results) if results else "(no matches)"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def run_todo_write(todos: list) -> str:
|
|
for i, t in enumerate(todos):
|
|
if "content" not in t or "status" not in t:
|
|
return f"Error: todos[{i}] missing 'content' or 'status'"
|
|
if t["status"] not in ("pending", "in_progress", "completed"):
|
|
return f"Error: todos[{i}] has invalid status '{t['status']}'"
|
|
tasks_file = TASKS_DIR / "current_todos.json"
|
|
tasks_file.write_text(json.dumps(todos, indent=2, ensure_ascii=False))
|
|
lines = ["\n\033[33m## Current Tasks\033[0m"]
|
|
for t in todos:
|
|
icon = {"pending": " ", "in_progress": "\033[36m▸\033[0m", "completed": "\033[32m✓\033[0m"}[t["status"]]
|
|
lines.append(f" [{icon}] {t['content']}")
|
|
print("\n".join(lines))
|
|
return f"Updated {len(todos)} tasks"
|
|
|
|
def extract_text(content) -> str:
|
|
if not isinstance(content, list):
|
|
return str(content)
|
|
return "\n".join(getattr(b, "text", "") for b in content if getattr(b, "type", None) == "text")
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# FROM s06 (unchanged): Subagent
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
SUB_TOOLS = [
|
|
{"name": "bash", "description": "Run a shell command.",
|
|
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
|
|
{"name": "read_file", "description": "Read file contents.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
|
|
{"name": "write_file", "description": "Write content to a file.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
|
|
{"name": "edit_file", "description": "Replace exact text in a file once.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
|
|
{"name": "glob", "description": "Find files matching a glob pattern.",
|
|
"input_schema": {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}},
|
|
]
|
|
SUB_HANDLERS = {"bash": run_bash, "read_file": run_read, "write_file": run_write,
|
|
"edit_file": run_edit, "glob": run_glob}
|
|
|
|
def spawn_subagent(description: str) -> str:
|
|
print(f"\n\033[35m[Subagent spawned]\033[0m")
|
|
messages = [{"role": "user", "content": description}]
|
|
for _ in range(30):
|
|
response = client.messages.create(model=MODEL, system=SUB_SYSTEM,
|
|
messages=messages, tools=SUB_TOOLS, max_tokens=8000)
|
|
messages.append({"role": "assistant", "content": response.content})
|
|
if response.stop_reason != "tool_use":
|
|
break
|
|
results = []
|
|
for block in response.content:
|
|
if block.type == "tool_use":
|
|
blocked = trigger_hooks("PreToolUse", block)
|
|
if blocked:
|
|
results.append({"type": "tool_result", "tool_use_id": block.id,
|
|
"content": str(blocked)})
|
|
continue
|
|
handler = SUB_HANDLERS.get(block.name)
|
|
output = handler(**block.input) if handler else f"Unknown: {block.name}"
|
|
trigger_hooks("PostToolUse", block, output)
|
|
print(f" \033[90m[sub] {block.name}: {str(output)[:100]}\033[0m")
|
|
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
|
|
messages.append({"role": "user", "content": results})
|
|
result = extract_text(messages[-1]["content"])
|
|
if not result:
|
|
for msg in reversed(messages):
|
|
if msg["role"] == "assistant":
|
|
result = extract_text(msg["content"])
|
|
if result:
|
|
break
|
|
if not result:
|
|
result = "Subagent stopped after 30 turns without final answer."
|
|
print(f"\033[35m[Subagent done]\033[0m")
|
|
return result
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# NEW in s07: load_skill — runtime full content loading
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
def load_skill(name: str) -> str:
|
|
"""Load full skill content. Lookup via registry — no path traversal."""
|
|
skill = SKILL_REGISTRY.get(name)
|
|
if not skill:
|
|
return f"Skill not found: {name}"
|
|
return skill["content"]
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# Tool Registry — all tools from s02-s07
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
TOOLS = [
|
|
{"name": "bash", "description": "Run a shell command.",
|
|
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
|
|
{"name": "read_file", "description": "Read file contents.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
|
|
{"name": "write_file", "description": "Write content to a file.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
|
|
{"name": "edit_file", "description": "Replace exact text in a file once.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
|
|
{"name": "glob", "description": "Find files matching a glob pattern.",
|
|
"input_schema": {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}},
|
|
{"name": "todo_write", "description": "Create and manage a task list for your current coding session.",
|
|
"input_schema": {"type": "object", "properties": {"todos": {"type": "array", "items": {"type": "object", "properties": {"content": {"type": "string"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}}, "required": ["content", "status"]}}}, "required": ["todos"]}},
|
|
{"name": "task", "description": "Launch a subagent to handle a complex subtask. Returns only the final conclusion.",
|
|
"input_schema": {"type": "object", "properties": {"description": {"type": "string"}}, "required": ["description"]}},
|
|
# s07: skill tool (catalog is already in SYSTEM prompt, this loads full content)
|
|
{"name": "load_skill", "description": "Load the full content of a skill by name.",
|
|
"input_schema": {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}},
|
|
]
|
|
|
|
TOOL_HANDLERS = {
|
|
"bash": run_bash, "read_file": run_read, "write_file": run_write,
|
|
"edit_file": run_edit, "glob": run_glob, "todo_write": run_todo_write,
|
|
"task": spawn_subagent, "load_skill": load_skill,
|
|
}
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# FROM s04 (unchanged): Hook System
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
HOOKS = {"UserPromptSubmit": [], "PreToolUse": [], "PostToolUse": [], "Stop": []}
|
|
|
|
def register_hook(event: str, callback):
|
|
HOOKS[event].append(callback)
|
|
|
|
def trigger_hooks(event: str, *args):
|
|
for callback in HOOKS[event]:
|
|
result = callback(*args)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
DENY_LIST = ["rm -rf /", "sudo", "shutdown", "reboot", "mkfs", "dd if="]
|
|
|
|
def permission_hook(block):
|
|
if block.name == "bash":
|
|
for p in DENY_LIST:
|
|
if p in block.input.get("command", ""):
|
|
print(f"\n\033[31m⛔ Blocked: '{p}'\033[0m")
|
|
return "Permission denied"
|
|
return None
|
|
|
|
def log_hook(block):
|
|
print(f"\033[90m[HOOK] {block.name}\033[0m")
|
|
return None
|
|
|
|
def context_inject_hook(query: str):
|
|
print(f"\033[90m[HOOK] UserPromptSubmit: working in {WORKDIR}\033[0m")
|
|
return None
|
|
|
|
def summary_hook(messages: list):
|
|
tool_count = sum(1 for m in messages
|
|
for b in (m.get("content") if isinstance(m.get("content"), list) else [])
|
|
if isinstance(b, dict) and b.get("type") == "tool_result")
|
|
print(f"\033[90m[HOOK] Stop: session used {tool_count} tool calls\033[0m")
|
|
return None
|
|
|
|
register_hook("UserPromptSubmit", context_inject_hook)
|
|
register_hook("PreToolUse", permission_hook)
|
|
register_hook("PreToolUse", log_hook)
|
|
register_hook("Stop", summary_hook)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# agent_loop — same as s05-s06 + nag reminder
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
rounds_since_todo = 0
|
|
|
|
def agent_loop(messages: list):
|
|
global rounds_since_todo
|
|
while True:
|
|
if rounds_since_todo >= 3 and messages:
|
|
messages.append({"role": "user",
|
|
"content": "<reminder>Update your todos.</reminder>"})
|
|
rounds_since_todo = 0
|
|
|
|
response = client.messages.create(
|
|
model=MODEL, system=SYSTEM, messages=messages,
|
|
tools=TOOLS, max_tokens=8000,
|
|
)
|
|
messages.append({"role": "assistant", "content": response.content})
|
|
|
|
if response.stop_reason != "tool_use":
|
|
force = trigger_hooks("Stop", messages)
|
|
if force:
|
|
messages.append({"role": "user", "content": force})
|
|
continue
|
|
return
|
|
|
|
rounds_since_todo += 1
|
|
results = []
|
|
for block in response.content:
|
|
if block.type != "tool_use":
|
|
continue
|
|
|
|
blocked = trigger_hooks("PreToolUse", block)
|
|
if blocked:
|
|
results.append({"type": "tool_result", "tool_use_id": block.id,
|
|
"content": str(blocked)})
|
|
continue
|
|
|
|
handler = TOOL_HANDLERS.get(block.name)
|
|
output = handler(**block.input) if handler else f"Unknown: {block.name}"
|
|
|
|
trigger_hooks("PostToolUse", block, output)
|
|
|
|
if block.name == "todo_write":
|
|
rounds_since_todo = 0
|
|
|
|
results.append({"type": "tool_result", "tool_use_id": block.id,
|
|
"content": output})
|
|
|
|
messages.append({"role": "user", "content": results})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("s07: Skill Loading — catalog in SYSTEM, content on demand")
|
|
print("Type a question, press Enter. Type q to quit.\n")
|
|
|
|
history = []
|
|
while True:
|
|
try:
|
|
query = input("\033[36ms07 >> \033[0m")
|
|
except (EOFError, KeyboardInterrupt):
|
|
break
|
|
if query.strip().lower() in ("q", "exit", ""):
|
|
break
|
|
trigger_hooks("UserPromptSubmit", query)
|
|
history.append({"role": "user", "content": query})
|
|
agent_loop(history)
|
|
for block in history[-1]["content"]:
|
|
if getattr(block, "type", None) == "text":
|
|
print(block.text)
|
|
print()
|