CrazyBoyM 85f44c358a Complete rewrite: original educational content only
- Remove all reverse-engineered Claude Code source code
- Replace with 100% original educational content from mini-claude-code
- Add clear disclaimer: independent project, not affiliated with Anthropic
- 5 progressive agent implementations (v0-v4, ~1100 lines total)
- Include agent-builder skill for teaching agent construction
- Bilingual documentation (EN + ZH)

This repository now focuses purely on teaching how modern AI agents work
through original, from-scratch implementations.

Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-31 07:01:42 +08:00

485 lines
18 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

Claude Code 没有秘密!价值 3000 万美金的 400 行代码
首先,这又是一个标题党。
接着,让我们回到正题。
为什么Claude Code 比Cursor 调用同一个模型API但效果牛 100 倍?
Claude Code 到底有什么工程设计上的秘密?
答案是Claude Code 没有秘密。
如果一定要说有就是model as agent。
什么意思呢也就是模型才是agent代码只是agent模型的道具。
模型as agent 思想是关键。
模型是80%代码是20%。
市面上哪些已有模型勉强是agent 模型?
claude sonnet, kimi k2 0905, glm4.5, qwen-codergpt5-codexcwm code world model etc.
之前的模型都是QA 模型,训练目标是回答用户输入问题的答案,而不是独立长时间连续超多轮工具调用以完成用户要求的工作。
我们做了个 0 - 1手搓 mini Claude Code 的仓库,每天更新一部分代码,让你这个国庆假期 7 天学个够:
https://github.com/shareAI-lab/mini_claude_code
教你假期闲的没事,搓一个值 3000 万美金的Agent AI
本期我们将会分享一个400行的Claude Code的超迷你代码但已经能完成Claude Code的90%以上工作)
之后我们将会陆续添加Todo 工具、Tasksubagent工具、System-reminder工具并为你一一解开Claude Code 的所有功能特性设计疑惑。
如果你不关心Claude Code的实现原理可以直接使用Kode这个开源项目Kode是响应ShareAI-lab在之前临时分享了Claude Code的逆向分析设计资料Repo后应广大网友热切要求顺便维护的开源版Claude Code基于Cc早期逆向版本+逆向分析继续开发维护并持续添加几乎所有Claude Code后续更新特性以及Codex的好用特性
图片
目前已被多个大型 / 明星初创商业公司的闭源产品参考 / 直接修改使用。
图片
图片
欢迎 众多网友一起参与维护也完全鼓励使用vibe Coding方式提PR过去已有网友参与了Bash工具对Windows的支持、Docker环境添加、 WebSearch & WebFetch工具添加等下个版本正在大面积重构中正在添加内置Web、IDE插件同时CLI包和Core SDK包解离。
Core SDK包支持在任意非cli场景用于各位开发者自己的业务场景下的灵活二次开发使用让Claude Code 驱动你的下一个“世界上第一个 xx 领域 Manus 产品”运转。当前SDK 的特性设计还没有完全收敛我们将会在Kode群里组织腾讯会议进行方案讨论欢迎一起参与
图片
Kode - 开源版Claude Codehttps://github.com/shareAI-lab/Kode
kimi 0905 搭配开源版Claude Codekode挺好用
可以用开源 claude code (kode) + k2 0905 / deepseek v3.1 / gpt4.5 比闭源claude code + sonnet 降智的情况下好。
安装后配置自定义模型使用:
npm install -g @shareai-lab/kode
大道至简, 万法归一
很多人看Claude Code 的逆向分析解读 / Kode 源码只见其形、不抓其神。我们重新出这个mini_claude_code系列仓库包括之前的训练营就是想把超出“术”的“道”进行传播和分享。
下面的代码推荐你复制到本地终端运行,随意修改、二开玩耍,这样你也搓了一个值 3000 万美金的agent AI项目(doge。
记住对于下一代Agent 系统模型是80%代码是20%。
#!/usr/bin/env python3
import os
import sys
import json
import time
import threading
import subprocess
from pathlib import Path
try:
from anthropic import Anthropic
except Exception as e:
sys.stderr.write("Install with: pip install anthropic\n")
raise
ANTHROPIC_BASE_URL = "https://api.moonshot.cn/anthropic"
ANTHROPIC_API_KEY = "sk-xxx" # 替换为你的key
AGENT_MODEL = "kimi-k2-turbo-preview"
# ---------- Workspace & Helpers ----------
WORKDIR = Path.cwd()
MAX_TOOL_RESULT_CHARS = 100_000
def safe_path(p: str) -> Path:
abs_path = (WORKDIR / str(p or "")).resolve()
rel = abs_path.relative_to(WORKDIR) if abs_path.is_relative_to(WORKDIR) else None
if rel is None:
raise ValueError("Path escapes workspace")
return abs_path
def clamp_text(s: str, n: int = MAX_TOOL_RESULT_CHARS) -> str:
if len(s) <= n:
return s
return s[:n] + f"\n\n...<truncated {len(s) - n} chars>"
def pretty_tool_line(kind: str, title: str) -> None:
print(f"⏺ {kind}({title})…")
def pretty_sub_line(text: str) -> None:
print(f" ⎿ {text}")
# 轻量等待指示器(最小实现)
class Spinner:
def __init__(self, label: str = "等待模型响应") -> None:
self.label = label
self.frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
self._stop = threading.Event()
self._thread = None
def start(self):
if not sys.stdout.isatty() or self._thread is not None:
return
self._stop.clear()
def run():
i = 0
while not self._stop.is_set():
sys.stdout.write("\r" + self.label + " " + self.frames[i % len(self.frames)])
sys.stdout.flush()
i += 1
time.sleep(0.08)
self._thread = threading.Thread(target=run, daemon=True)
self._thread.start()
def stop(self):
if self._thread is None:
return
self._stop.set()
self._thread.join(timeout=1)
self._thread = None
try:
# clear current line
sys.stdout.write("\r\x1b[2K")
sys.stdout.flush()
except Exception:
pass
def log_error_debug(tag: str, info) -> None:
try:
js = json.dumps(info, ensure_ascii=False, indent=2)
out = js if len(js) <= 4000 else js[:4000] + "\n...<truncated>"
print(f"⚠️ {tag}:")
print(out)
except Exception:
print(f"⚠️ {tag}: (unserializable info)")
# ---------- Content normalization helpers ----------
def block_to_dict(block):
"""Convert SDK response block objects to plain dicts for reuse in messages.
Supports TextBlock, ToolUseBlock, and dict inputs. Best-effort fallback.
"""
if isinstance(block, dict):
return block
out = {}
for key in ("type", "text", "id", "name", "input", "citations"):
if hasattr(block, key):
out[key] = getattr(block, key)
# Fallback: include any public attributes
if not out and hasattr(block, "__dict__"):
out = {k: v for k, v in vars(block).items() if not k.startswith("_")}
if hasattr(block, "type"):
out["type"] = getattr(block, "type")
return out
def normalize_content_list(content):
try:
return [block_to_dict(b) for b in (content or [])]
except Exception:
return []
# ---------- SDK client ----------
api_key = ANTHROPIC_API_KEY
if not api_key:
sys.stderr.write("❌ ANTHROPIC_API_KEY not set\n")
sys.exit(1)
base_url = ANTHROPIC_BASE_URL
client = Anthropic(api_key=api_key, base_url=base_url) if base_url else Anthropic(api_key=api_key)
# ---------- System prompt ----------
SYSTEM = (
f"You are a coding agent operating INSIDE the user's repository at {WORKDIR}.\n"
"Follow this loop strictly: plan briefly → use TOOLS to act directly on files/shell → report concise results.\n"
"Rules:\n"
"- Prefer taking actions with tools (read/write/edit/bash) over long prose.\n"
"- Keep outputs terse. Use bullet lists / checklists when summarizing.\n"
"- Never invent file paths. Ask via reads or list directories first if unsure.\n"
"- For edits, apply the smallest change that satisfies the request.\n"
"- For bash, avoid destructive or privileged commands; stay inside the workspace.\n"
"- After finishing, summarize what changed and how to run or test."
)
# ---------- Tools ----------
tools = [
{
"name": "bash",
"description": (
"Execute a shell command inside the project workspace. Use for scaffolding, "
"formatting, running scripts, etc."
),
"input_schema": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Shell command to run"},
"timeout_ms": {"type": "integer", "minimum": 1000, "maximum": 120000},
},
"required": ["command"],
"additionalProperties": False,
},
},
{
"name": "read_file",
"description": "Read a UTF-8 text file. Optionally slice by line range or clamp length.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"start_line": {"type": "integer", "minimum": 1},
"end_line": {"type": "integer", "minimum": -1},
"max_chars": {"type": "integer", "minimum": 1, "maximum": 200000},
},
"required": ["path"],
"additionalProperties": False,
},
},
{
"name": "write_file",
"description": "Create or overwrite/append a UTF-8 text file. Use overwrite unless explicitly asked to append.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
"mode": {"type": "string", "enum": ["overwrite", "append"], "default": "overwrite"},
},
"required": ["path", "content"],
"additionalProperties": False,
},
},
{
"name": "edit_text",
"description": "Small, precise text edits. Choose one action: replace | insert | delete_range.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"action": {"type": "string", "enum": ["replace", "insert", "delete_range"]},
"find": {"type": "string"},
"replace": {"type": "string"},
"insert_after": {"type": "integer", "minimum": -1},
"new_text": {"type": "string"},
"range": {"type": "array", "items": {"type": "integer"}, "minItems": 2, "maxItems": 2},
},
"required": ["path", "action"],
"additionalProperties": False,
},
},
]
# ---------- Tool executors ----------
def run_bash(input_obj: dict) -> str:
cmd = str(input_obj.get("command") or "")
if not cmd:
raise ValueError("missing bash.command")
if (
subprocess is not None
and ("rm -rf /" in cmd or "shutdown" in cmd or "reboot" in cmd or "sudo " in cmd)
):
raise ValueError("blocked dangerous command")
timeout_ms = int(input_obj.get("timeout_ms") or 30000)
try:
proc = subprocess.run(
cmd,
cwd=str(WORKDIR),
shell=True,
capture_output=True,
text=True,
timeout=timeout_ms / 1000.0,
)
out = "\n".join([x for x in [proc.stdout, proc.stderr] if x]).strip()
return clamp_text(out or "(no output)")
except subprocess.TimeoutExpired:
return "(timeout)"
def run_read(input_obj: dict) -> str:
fp = safe_path(input_obj.get("path"))
text = fp.read_text("utf-8")
lines = text.split("\n")
start = (max(1, int(input_obj.get("start_line") or 1)) - 1) if input_obj.get("start_line") else 0
if isinstance(input_obj.get("end_line"), int):
end_val = input_obj.get("end_line")
end = len(lines) if end_val < 0 else max(start, end_val)
else:
end = len(lines)
text = "\n".join(lines[start:end])
max_chars = int(input_obj.get("max_chars") or 100_000)
return clamp_text(text, max_chars)
def run_write(input_obj: dict) -> str:
fp = safe_path(input_obj.get("path"))
fp.parent.mkdir(parents=True, exist_ok=True)
content = input_obj.get("content") or ""
mode = input_obj.get("mode")
if mode == "append" and fp.exists():
with fp.open("a", encoding="utf-8") as f:
f.write(content)
else:
fp.write_text(content, encoding="utf-8")
bytes_len = len(content.encode("utf-8"))
rel = fp.relative_to(WORKDIR)
return f"wrote {bytes_len} bytes to {rel}"
def run_edit(input_obj: dict) -> str:
fp = safe_path(input_obj.get("path"))
text = fp.read_text("utf-8")
action = input_obj.get("action")
if action == "replace":
find = str(input_obj.get("find") or "")
if not find:
raise ValueError("edit_text.replace missing find")
replaced = text.replace(find, str(input_obj.get("replace") or ""))
fp.write_text(replaced, encoding="utf-8")
return f"replace done ({len(replaced.encode('utf-8'))} bytes)"
elif action == "insert":
line = int(input_obj.get("insert_after") if input_obj.get("insert_after") is not None else -1)
lines = text.split("\n")
idx = max(-1, min(len(lines) - 1, line))
lines[idx + 1:idx + 1] = [str(input_obj.get("new_text") or "")]
nxt = "\n".join(lines)
fp.write_text(nxt, encoding="utf-8")
return f"inserted after line {line}"
elif action == "delete_range":
rng = input_obj.get("range") or []
if not (len(rng) == 2 and isinstance(rng[0], int) and isinstance(rng[1], int) and rng[1] >= rng[0]):
raise ValueError("edit_text.delete_range invalid range")
s, e = rng
lines = text.split("\n")
nxt = "\n".join([*lines[:s], *lines[e:]])
fp.write_text(nxt, encoding="utf-8")
return f"deleted lines [{s}, {e})"
else:
raise ValueError(f"unsupported edit_text.action: {action}")
def dispatch_tool(tu: dict) -> dict:
try:
# Support both dict and SDK block objects
def gv(obj, key, default=None):
return obj.get(key, default) if isinstance(obj, dict) else getattr(obj, key, default)
name = gv(tu, "name")
input_obj = gv(tu, "input", {}) or {}
tool_use_id = gv(tu, "id")
if name == "bash":
pretty_tool_line("Bash", (input_obj.get("command") if isinstance(input_obj, dict) else None))
out = run_bash(input_obj if isinstance(input_obj, dict) else {})
pretty_sub_line(clamp_text(out, 2000) if out else "(No content)")
return {"type": "tool_result", "tool_use_id": tool_use_id, "content": out}
if name == "read_file":
pretty_tool_line("Read", (input_obj.get("path") if isinstance(input_obj, dict) else None))
out = run_read(input_obj if isinstance(input_obj, dict) else {})
pretty_sub_line(clamp_text(out, 2000))
return {"type": "tool_result", "tool_use_id": tool_use_id, "content": out}
if name == "write_file":
pretty_tool_line("Write", (input_obj.get("path") if isinstance(input_obj, dict) else None))
out = run_write(input_obj if isinstance(input_obj, dict) else {})
pretty_sub_line(out)
return {"type": "tool_result", "tool_use_id": tool_use_id, "content": out}
if name == "edit_text":
action = input_obj.get("action") if isinstance(input_obj, dict) else None
path_v = input_obj.get("path") if isinstance(input_obj, dict) else None
pretty_tool_line("Edit", f"{action} {path_v}")
out = run_edit(input_obj if isinstance(input_obj, dict) else {})
pretty_sub_line(out)
return {"type": "tool_result", "tool_use_id": tool_use_id, "content": out}
return {"type": "tool_result", "tool_use_id": tool_use_id, "content": f"unknown tool: {name}", "is_error": True}
except Exception as e:
tool_use_id = tu.get("id") if isinstance(tu, dict) else getattr(tu, "id", None)
return {"type": "tool_result", "tool_use_id": tool_use_id, "content": str(e), "is_error": True}
# ---------- Core loop ----------
def query(messages: list, opts: dict | None = None) -> list:
opts = opts or {}
while True:
spinner = Spinner()
spinner.start()
try:
res = client.messages.create(
model=AGENT_MODEL,
system=SYSTEM,
messages=messages,
tools=tools,
max_tokens=16000,
**({"tool_choice": opts["tool_choice"]} if "tool_choice" in opts else {}),
)
finally:
spinner.stop()
tool_uses = []
try:
for block in getattr(res, "content", []):
btype = getattr(block, "type", None) if not isinstance(block, dict) else block.get("type")
if btype == "text":
text = getattr(block, "text", None) if not isinstance(block, dict) else block.get("text")
sys.stdout.write((text or "") + "\n")
if btype == "tool_use":
tool_uses.append(block)
except Exception as err:
log_error_debug(
"Iterating res.content failed",
{
"error": str(err),
"stop_reason": getattr(res, "stop_reason", None),
"content_type": type(getattr(res, "content", None)).__name__,
"is_array": isinstance(getattr(res, "content", None), list),
"keys": list(res.__dict__.keys()) if hasattr(res, "__dict__") else [],
"preview": (json.dumps(res, default=lambda o: getattr(o, "__dict__", str(o)))[:2000] if res else ""),
},
)
raise
if getattr(res, "stop_reason", None) == "tool_use":
results = [dispatch_tool(tu) for tu in tool_uses]
messages.append({"role": "assistant", "content": normalize_content_list(res.content)})
messages.append({"role": "user", "content": results})
continue
messages.append({"role": "assistant", "content": normalize_content_list(res.content)})
return messages
def main():
print(f"Tiny CC Agent (custom tools only) — cwd: {WORKDIR}")
print('Type "exit" or "quit" to leave.\n')
history: list = []
while True:
try:
line = input("User: ")
except EOFError:
break
if not line or line.strip().lower() in {"q", "quit", "exit"}:
break
history.append({"role": "user", "content": [{"type": "text", "text": line}]})
try:
query(history)
except Exception as e:
print("Error:", str(e))
if __name__ == "__main__":
main()