Claude Code 没有秘密!价值 3000 万美金的 400 行代码 首先,这又是一个标题党。 接着,让我们回到正题。 为什么Claude Code 比Cursor 调用同一个模型API但效果牛 100 倍? Claude Code 到底有什么工程设计上的秘密? 答案是:Claude Code 没有秘密。 如果一定要说有,就是model as agent。 什么意思呢?也就是模型才是agent,代码只是agent模型的道具。 模型as agent 思想是关键。 模型是80%,代码是20%。 市面上哪些已有模型勉强是agent 模型? claude sonnet, kimi k2 0905, glm4.5, qwen-coder,gpt5-codex,cwm code world model etc. 之前的模型都是QA 模型,训练目标是回答用户输入问题的答案,而不是独立长时间连续超多轮工具调用以完成用户要求的工作。 我们做了个 0 - 1手搓 mini Claude Code 的仓库,每天更新一部分代码,让你这个国庆假期 7 天学个够: https://github.com/shareAI-lab/mini_claude_code 教你假期闲的没事,搓一个值 3000 万美金的Agent AI 本期我们将会分享一个400行的Claude Code的超迷你代码(但已经能完成Claude Code的90%以上工作) 之后,我们将会陆续添加Todo 工具、Task(subagent)工具、System-reminder工具,并为你一一解开Claude Code 的所有功能特性设计疑惑。 • 如果你不关心Claude Code的实现原理,可以直接使用Kode这个开源项目,Kode是响应ShareAI-lab在之前临时分享了Claude Code的逆向分析设计资料Repo后,应广大网友热切要求,顺便维护的开源版Claude Code(基于Cc早期逆向版本+逆向分析继续开发维护,并持续添加几乎所有Claude Code后续更新特性以及Codex的好用特性) 图片 • 目前已被多个大型 / 明星初创商业公司的闭源产品参考 / 直接修改使用。 图片 图片 • 欢迎 众多网友一起参与维护(也完全鼓励使用vibe Coding方式提PR),过去已有网友参与了Bash工具对Windows的支持、Docker环境添加、 WebSearch & WebFetch工具添加等,下个版本正在大面积重构中,正在添加内置Web、IDE插件,同时CLI包和Core SDK包解离。 • Core SDK包支持在任意非cli场景用于各位开发者自己的业务场景下的灵活二次开发使用,让Claude Code 驱动你的下一个“世界上第一个 xx 领域 Manus 产品”运转。(当前SDK 的特性设计还没有完全收敛,我们将会在Kode群里组织腾讯会议进行方案讨论,欢迎一起参与) 图片 • Kode - 开源版Claude Code:https://github.com/shareAI-lab/Kode • kimi 0905 搭配开源版Claude Code(kode)挺好用! 可以用开源 claude code (kode) + k2 0905 / deepseek v3.1 / gpt4.5 比闭源claude code + sonnet 降智的情况下好。 安装后配置自定义模型使用: npm install -g @shareai-lab/kode 大道至简, 万法归一 很多人看Claude Code 的逆向分析解读 / Kode 源码只见其形、不抓其神。我们重新出这个mini_claude_code系列仓库(包括之前的训练营)就是想把超出“术”的“道”进行传播和分享。 下面的代码推荐你复制到本地终端运行,随意修改、二开玩耍,这样你也搓了一个值 3000 万美金的agent AI项目(doge。 记住:对于下一代Agent 系统,模型是80%,代码是20%。 #!/usr/bin/env python3 import os import sys import json import time import threading import subprocess from pathlib import Path try: from anthropic import Anthropic except Exception as e: sys.stderr.write("Install with: pip install anthropic\n") raise ANTHROPIC_BASE_URL = "https://api.moonshot.cn/anthropic" ANTHROPIC_API_KEY = "sk-xxx" # 替换为你的key AGENT_MODEL = "kimi-k2-turbo-preview" # ---------- Workspace & Helpers ---------- WORKDIR = Path.cwd() MAX_TOOL_RESULT_CHARS = 100_000 def safe_path(p: str) -> Path: abs_path = (WORKDIR / str(p or "")).resolve() rel = abs_path.relative_to(WORKDIR) if abs_path.is_relative_to(WORKDIR) else None if rel is None: raise ValueError("Path escapes workspace") return abs_path def clamp_text(s: str, n: int = MAX_TOOL_RESULT_CHARS) -> str: if len(s) <= n: return s return s[:n] + f"\n\n..." def pretty_tool_line(kind: str, title: str) -> None: print(f"⏺ {kind}({title})…") def pretty_sub_line(text: str) -> None: print(f" ⎿ {text}") # 轻量等待指示器(最小实现) class Spinner: def __init__(self, label: str = "等待模型响应") -> None: self.label = label self.frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] self._stop = threading.Event() self._thread = None def start(self): if not sys.stdout.isatty() or self._thread is not None: return self._stop.clear() def run(): i = 0 while not self._stop.is_set(): sys.stdout.write("\r" + self.label + " " + self.frames[i % len(self.frames)]) sys.stdout.flush() i += 1 time.sleep(0.08) self._thread = threading.Thread(target=run, daemon=True) self._thread.start() def stop(self): if self._thread is None: return self._stop.set() self._thread.join(timeout=1) self._thread = None try: # clear current line sys.stdout.write("\r\x1b[2K") sys.stdout.flush() except Exception: pass def log_error_debug(tag: str, info) -> None: try: js = json.dumps(info, ensure_ascii=False, indent=2) out = js if len(js) <= 4000 else js[:4000] + "\n..." print(f"⚠️ {tag}:") print(out) except Exception: print(f"⚠️ {tag}: (unserializable info)") # ---------- Content normalization helpers ---------- def block_to_dict(block): """Convert SDK response block objects to plain dicts for reuse in messages. Supports TextBlock, ToolUseBlock, and dict inputs. Best-effort fallback. """ if isinstance(block, dict): return block out = {} for key in ("type", "text", "id", "name", "input", "citations"): if hasattr(block, key): out[key] = getattr(block, key) # Fallback: include any public attributes if not out and hasattr(block, "__dict__"): out = {k: v for k, v in vars(block).items() if not k.startswith("_")} if hasattr(block, "type"): out["type"] = getattr(block, "type") return out def normalize_content_list(content): try: return [block_to_dict(b) for b in (content or [])] except Exception: return [] # ---------- SDK client ---------- api_key = ANTHROPIC_API_KEY if not api_key: sys.stderr.write("❌ ANTHROPIC_API_KEY not set\n") sys.exit(1) base_url = ANTHROPIC_BASE_URL client = Anthropic(api_key=api_key, base_url=base_url) if base_url else Anthropic(api_key=api_key) # ---------- System prompt ---------- SYSTEM = ( f"You are a coding agent operating INSIDE the user's repository at {WORKDIR}.\n" "Follow this loop strictly: plan briefly → use TOOLS to act directly on files/shell → report concise results.\n" "Rules:\n" "- Prefer taking actions with tools (read/write/edit/bash) over long prose.\n" "- Keep outputs terse. Use bullet lists / checklists when summarizing.\n" "- Never invent file paths. Ask via reads or list directories first if unsure.\n" "- For edits, apply the smallest change that satisfies the request.\n" "- For bash, avoid destructive or privileged commands; stay inside the workspace.\n" "- After finishing, summarize what changed and how to run or test." ) # ---------- Tools ---------- tools = [ { "name": "bash", "description": ( "Execute a shell command inside the project workspace. Use for scaffolding, " "formatting, running scripts, etc." ), "input_schema": { "type": "object", "properties": { "command": {"type": "string", "description": "Shell command to run"}, "timeout_ms": {"type": "integer", "minimum": 1000, "maximum": 120000}, }, "required": ["command"], "additionalProperties": False, }, }, { "name": "read_file", "description": "Read a UTF-8 text file. Optionally slice by line range or clamp length.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "start_line": {"type": "integer", "minimum": 1}, "end_line": {"type": "integer", "minimum": -1}, "max_chars": {"type": "integer", "minimum": 1, "maximum": 200000}, }, "required": ["path"], "additionalProperties": False, }, }, { "name": "write_file", "description": "Create or overwrite/append a UTF-8 text file. Use overwrite unless explicitly asked to append.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, "mode": {"type": "string", "enum": ["overwrite", "append"], "default": "overwrite"}, }, "required": ["path", "content"], "additionalProperties": False, }, }, { "name": "edit_text", "description": "Small, precise text edits. Choose one action: replace | insert | delete_range.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "action": {"type": "string", "enum": ["replace", "insert", "delete_range"]}, "find": {"type": "string"}, "replace": {"type": "string"}, "insert_after": {"type": "integer", "minimum": -1}, "new_text": {"type": "string"}, "range": {"type": "array", "items": {"type": "integer"}, "minItems": 2, "maxItems": 2}, }, "required": ["path", "action"], "additionalProperties": False, }, }, ] # ---------- Tool executors ---------- def run_bash(input_obj: dict) -> str: cmd = str(input_obj.get("command") or "") if not cmd: raise ValueError("missing bash.command") if ( subprocess is not None and ("rm -rf /" in cmd or "shutdown" in cmd or "reboot" in cmd or "sudo " in cmd) ): raise ValueError("blocked dangerous command") timeout_ms = int(input_obj.get("timeout_ms") or 30000) try: proc = subprocess.run( cmd, cwd=str(WORKDIR), shell=True, capture_output=True, text=True, timeout=timeout_ms / 1000.0, ) out = "\n".join([x for x in [proc.stdout, proc.stderr] if x]).strip() return clamp_text(out or "(no output)") except subprocess.TimeoutExpired: return "(timeout)" def run_read(input_obj: dict) -> str: fp = safe_path(input_obj.get("path")) text = fp.read_text("utf-8") lines = text.split("\n") start = (max(1, int(input_obj.get("start_line") or 1)) - 1) if input_obj.get("start_line") else 0 if isinstance(input_obj.get("end_line"), int): end_val = input_obj.get("end_line") end = len(lines) if end_val < 0 else max(start, end_val) else: end = len(lines) text = "\n".join(lines[start:end]) max_chars = int(input_obj.get("max_chars") or 100_000) return clamp_text(text, max_chars) def run_write(input_obj: dict) -> str: fp = safe_path(input_obj.get("path")) fp.parent.mkdir(parents=True, exist_ok=True) content = input_obj.get("content") or "" mode = input_obj.get("mode") if mode == "append" and fp.exists(): with fp.open("a", encoding="utf-8") as f: f.write(content) else: fp.write_text(content, encoding="utf-8") bytes_len = len(content.encode("utf-8")) rel = fp.relative_to(WORKDIR) return f"wrote {bytes_len} bytes to {rel}" def run_edit(input_obj: dict) -> str: fp = safe_path(input_obj.get("path")) text = fp.read_text("utf-8") action = input_obj.get("action") if action == "replace": find = str(input_obj.get("find") or "") if not find: raise ValueError("edit_text.replace missing find") replaced = text.replace(find, str(input_obj.get("replace") or "")) fp.write_text(replaced, encoding="utf-8") return f"replace done ({len(replaced.encode('utf-8'))} bytes)" elif action == "insert": line = int(input_obj.get("insert_after") if input_obj.get("insert_after") is not None else -1) lines = text.split("\n") idx = max(-1, min(len(lines) - 1, line)) lines[idx + 1:idx + 1] = [str(input_obj.get("new_text") or "")] nxt = "\n".join(lines) fp.write_text(nxt, encoding="utf-8") return f"inserted after line {line}" elif action == "delete_range": rng = input_obj.get("range") or [] if not (len(rng) == 2 and isinstance(rng[0], int) and isinstance(rng[1], int) and rng[1] >= rng[0]): raise ValueError("edit_text.delete_range invalid range") s, e = rng lines = text.split("\n") nxt = "\n".join([*lines[:s], *lines[e:]]) fp.write_text(nxt, encoding="utf-8") return f"deleted lines [{s}, {e})" else: raise ValueError(f"unsupported edit_text.action: {action}") def dispatch_tool(tu: dict) -> dict: try: # Support both dict and SDK block objects def gv(obj, key, default=None): return obj.get(key, default) if isinstance(obj, dict) else getattr(obj, key, default) name = gv(tu, "name") input_obj = gv(tu, "input", {}) or {} tool_use_id = gv(tu, "id") if name == "bash": pretty_tool_line("Bash", (input_obj.get("command") if isinstance(input_obj, dict) else None)) out = run_bash(input_obj if isinstance(input_obj, dict) else {}) pretty_sub_line(clamp_text(out, 2000) if out else "(No content)") return {"type": "tool_result", "tool_use_id": tool_use_id, "content": out} if name == "read_file": pretty_tool_line("Read", (input_obj.get("path") if isinstance(input_obj, dict) else None)) out = run_read(input_obj if isinstance(input_obj, dict) else {}) pretty_sub_line(clamp_text(out, 2000)) return {"type": "tool_result", "tool_use_id": tool_use_id, "content": out} if name == "write_file": pretty_tool_line("Write", (input_obj.get("path") if isinstance(input_obj, dict) else None)) out = run_write(input_obj if isinstance(input_obj, dict) else {}) pretty_sub_line(out) return {"type": "tool_result", "tool_use_id": tool_use_id, "content": out} if name == "edit_text": action = input_obj.get("action") if isinstance(input_obj, dict) else None path_v = input_obj.get("path") if isinstance(input_obj, dict) else None pretty_tool_line("Edit", f"{action} {path_v}") out = run_edit(input_obj if isinstance(input_obj, dict) else {}) pretty_sub_line(out) return {"type": "tool_result", "tool_use_id": tool_use_id, "content": out} return {"type": "tool_result", "tool_use_id": tool_use_id, "content": f"unknown tool: {name}", "is_error": True} except Exception as e: tool_use_id = tu.get("id") if isinstance(tu, dict) else getattr(tu, "id", None) return {"type": "tool_result", "tool_use_id": tool_use_id, "content": str(e), "is_error": True} # ---------- Core loop ---------- def query(messages: list, opts: dict | None = None) -> list: opts = opts or {} while True: spinner = Spinner() spinner.start() try: res = client.messages.create( model=AGENT_MODEL, system=SYSTEM, messages=messages, tools=tools, max_tokens=16000, **({"tool_choice": opts["tool_choice"]} if "tool_choice" in opts else {}), ) finally: spinner.stop() tool_uses = [] try: for block in getattr(res, "content", []): btype = getattr(block, "type", None) if not isinstance(block, dict) else block.get("type") if btype == "text": text = getattr(block, "text", None) if not isinstance(block, dict) else block.get("text") sys.stdout.write((text or "") + "\n") if btype == "tool_use": tool_uses.append(block) except Exception as err: log_error_debug( "Iterating res.content failed", { "error": str(err), "stop_reason": getattr(res, "stop_reason", None), "content_type": type(getattr(res, "content", None)).__name__, "is_array": isinstance(getattr(res, "content", None), list), "keys": list(res.__dict__.keys()) if hasattr(res, "__dict__") else [], "preview": (json.dumps(res, default=lambda o: getattr(o, "__dict__", str(o)))[:2000] if res else ""), }, ) raise if getattr(res, "stop_reason", None) == "tool_use": results = [dispatch_tool(tu) for tu in tool_uses] messages.append({"role": "assistant", "content": normalize_content_list(res.content)}) messages.append({"role": "user", "content": results}) continue messages.append({"role": "assistant", "content": normalize_content_list(res.content)}) return messages def main(): print(f"Tiny CC Agent (custom tools only) — cwd: {WORKDIR}") print('Type "exit" or "quit" to leave.\n') history: list = [] while True: try: line = input("User: ") except EOFError: break if not line or line.strip().lower() in {"q", "quit", "exit"}: break history.append({"role": "user", "content": [{"type": "text", "text": line}]}) try: query(history) except Exception as e: print("Error:", str(e)) if __name__ == "__main__": main()