#!/usr/bin/env python3 """ s20: Comprehensive Agent — all teaching components in one loop. Run: python s20_comprehensive/code.py Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY This final chapter intentionally puts the earlier teaching mechanisms back together: dispatch, permission, hooks, todo, subagent, skills, compaction, memory, prompt assembly, error recovery, task graph, background tasks, cron, teams, protocols, autonomous agents, worktrees, and MCP. """ import os, subprocess, json, time, random, threading, re from pathlib import Path from datetime import datetime from dataclasses import dataclass, asdict, field try: import readline readline.parse_and_bind('set bind-tty-special-chars off') READLINE_AVAILABLE = True except ImportError: READLINE_AVAILABLE = False from anthropic import Anthropic from dotenv import load_dotenv load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) MODEL = os.environ["MODEL_ID"] PRIMARY_MODEL = MODEL FALLBACK_MODEL = os.getenv("FALLBACK_MODEL_ID") SKILLS_DIR = WORKDIR / "skills" TRANSCRIPT_DIR = WORKDIR / ".transcripts" TOOL_RESULTS_DIR = WORKDIR / ".task_outputs" / "tool-results" DEFAULT_MAX_TOKENS = 8000 ESCALATED_MAX_TOKENS = 16000 MAX_RETRIES = 3 MAX_CONSECUTIVE_529 = 2 MAX_RECOVERY_RETRIES = 2 BASE_DELAY_MS = 500 CONTEXT_LIMIT = 50000 KEEP_RECENT_TOOL_RESULTS = 3 PERSIST_THRESHOLD = 30000 CONTINUATION_PROMPT = "Continue from the previous response. Do not repeat completed work." PROMPT = "\033[36ms20 >> \033[0m" CLI_ACTIVE = False def terminal_print(text: str): if threading.current_thread() is threading.main_thread() or not CLI_ACTIVE: print(text) return line = "" if READLINE_AVAILABLE: try: line = readline.get_line_buffer() except Exception: line = "" print(f"\r\033[K{text}") print(PROMPT + line, end="", flush=True) # ── Task System ── # Tasks are tiny durable records. Later systems add ownership, dependencies, # worktrees, and teammates on top of this same file-backed state. TASKS_DIR = WORKDIR / ".tasks" TASKS_DIR.mkdir(exist_ok=True) CURRENT_TODOS: list[dict] = [] @dataclass class Task: id: str subject: str description: str status: str owner: str | None blockedBy: list[str] worktree: str | None = None def _task_path(task_id: str) -> Path: return TASKS_DIR / f"{task_id}.json" def create_task(subject: str, description: str = "", blockedBy: list[str] | None = None) -> Task: task = Task( id=f"task_{int(time.time())}_{random.randint(0, 9999):04d}", subject=subject, description=description, status="pending", owner=None, blockedBy=blockedBy or [], ) save_task(task) return task def save_task(task: Task): _task_path(task.id).write_text(json.dumps(asdict(task), indent=2)) def load_task(task_id: str) -> Task: return Task(**json.loads(_task_path(task_id).read_text())) def list_tasks() -> list[Task]: return [Task(**json.loads(p.read_text())) for p in sorted(TASKS_DIR.glob("task_*.json"))] def get_task_json(task_id: str) -> str: return json.dumps(asdict(load_task(task_id)), indent=2) def can_start(task_id: str) -> bool: # Dependencies are intentionally simple: every blocker must exist and be # completed before the task can be claimed. task = load_task(task_id) for dep_id in task.blockedBy: if not _task_path(dep_id).exists(): return False if load_task(dep_id).status != "completed": return False return True def claim_task(task_id: str, owner: str = "agent") -> str: task = load_task(task_id) if task.status != "pending": return f"Task {task_id} is {task.status}, cannot claim" if task.owner: return f"Task {task_id} already owned by {task.owner}" if not can_start(task_id): deps = [d for d in task.blockedBy if _task_path(d).exists() and load_task(d).status != "completed"] missing = [d for d in task.blockedBy if not _task_path(d).exists()] parts = [] if deps: parts.append(f"blocked by: {deps}") if missing: parts.append(f"missing deps: {missing}") return "Cannot start — " + ", ".join(parts) task.owner = owner task.status = "in_progress" save_task(task) print(f" \033[36m[claim] {task.subject} → in_progress\033[0m") return f"Claimed {task.id} ({task.subject})" def complete_task(task_id: str) -> str: task = load_task(task_id) if task.status != "in_progress": return f"Task {task_id} is {task.status}, cannot complete" task.status = "completed" save_task(task) unblocked = [t.subject for t in list_tasks() if t.status == "pending" and t.blockedBy and can_start(t.id)] print(f" \033[32m[complete] {task.subject} ✓\033[0m") msg = f"Completed {task.id} ({task.subject})" if unblocked: msg += f"\nUnblocked: {', '.join(unblocked)}" return msg # ── Worktree System ── # Worktree names become filesystem paths, so the teaching version keeps the # validation rules strict and reuses them for create/remove/keep. WORKTREES_DIR = WORKDIR / ".worktrees" WORKTREES_DIR.mkdir(exist_ok=True) VALID_WT_NAME = re.compile(r'^[A-Za-z0-9._-]{1,64}$') def validate_worktree_name(name: str) -> str | None: if not name: return "Worktree name cannot be empty" if name in (".", ".."): return f"'{name}' is not a valid worktree name" if not VALID_WT_NAME.match(name): return (f"Invalid worktree name '{name}': " "only letters, digits, dots, underscores, dashes (1-64 chars)") return None def run_git(args: list[str]) -> tuple[bool, str]: try: r = subprocess.run(["git"] + args, cwd=WORKDIR, capture_output=True, text=True, timeout=30) out = (r.stdout + r.stderr).strip() return r.returncode == 0, out[:5000] if out else "(no output)" except subprocess.TimeoutExpired: return False, "Error: git timeout" def log_event(event_type: str, worktree_name: str, task_id: str = ""): event = {"type": event_type, "worktree": worktree_name, "task_id": task_id, "ts": time.time()} events_file = WORKTREES_DIR / "events.jsonl" with open(events_file, "a") as f: f.write(json.dumps(event) + "\n") def create_worktree(name: str, task_id: str = "") -> str: # Tool-layer validation is part of the safety boundary; do it before git # sees the name, not only after git happens to reject something. err = validate_worktree_name(name) if err: return f"Error: {err}" if task_id: try: load_task(task_id) except FileNotFoundError: return f"Error: task {task_id} not found" path = WORKTREES_DIR / name if path.exists(): return f"Worktree '{name}' already exists at {path}" ok, result = run_git(["worktree", "add", str(path), "-b", f"wt/{name}", "HEAD"]) if not ok: return f"Git error: {result}" if task_id: bind_task_to_worktree(task_id, name) log_event("create", name, task_id) print(f" \033[33m[worktree] created: {name} at {path}\033[0m") return f"Worktree '{name}' created at {path}" def bind_task_to_worktree(task_id: str, worktree_name: str): task = load_task(task_id) task.worktree = worktree_name save_task(task) def _count_worktree_changes(path: Path) -> tuple[int, int]: try: r1 = subprocess.run(["git", "status", "--porcelain"], cwd=path, capture_output=True, text=True, timeout=10) files = len([l for l in r1.stdout.strip().splitlines() if l.strip()]) r2 = subprocess.run(["git", "log", "@{push}..HEAD", "--oneline"], cwd=path, capture_output=True, text=True, timeout=10) commits = len([l for l in r2.stdout.strip().splitlines() if l.strip()]) return files, commits except Exception: return -1, -1 def remove_worktree(name: str, discard_changes: bool = False) -> str: err = validate_worktree_name(name) if err: return err path = WORKTREES_DIR / name if not path.exists(): return f"Worktree '{name}' not found" if not discard_changes: files, commits = _count_worktree_changes(path) if files < 0: return "Cannot verify status. Use discard_changes=true to force." if files > 0 or commits > 0: return (f"Worktree '{name}' has {files} file(s), {commits} commit(s). " "Use discard_changes=true or keep_worktree.") ok1, _ = run_git(["worktree", "remove", str(path), "--force"]) if not ok1: return f"Failed to remove worktree '{name}'" run_git(["branch", "-D", f"wt/{name}"]) log_event("remove", name) print(f" \033[33m[worktree] removed: {name}\033[0m") return f"Worktree '{name}' removed" def keep_worktree(name: str) -> str: err = validate_worktree_name(name) if err: return err log_event("keep", name) return f"Worktree '{name}' kept for review (branch: wt/{name})" # ── Skill Loading ── SKILL_REGISTRY: dict[str, dict] = {} def _parse_frontmatter(text: str) -> tuple[dict, str]: if not text.startswith("---"): return {}, text parts = text.split("---", 2) if len(parts) < 3: return {}, text meta = {} for line in parts[1].strip().splitlines(): if ":" in line: key, value = line.split(":", 1) meta[key.strip()] = value.strip().strip('"').strip("'") return meta, parts[2].strip() def scan_skills(): SKILL_REGISTRY.clear() if not SKILLS_DIR.exists(): return for directory in sorted(SKILLS_DIR.iterdir()): if not directory.is_dir(): continue manifest = directory / "SKILL.md" if not manifest.exists(): continue raw = manifest.read_text() meta, _ = _parse_frontmatter(raw) name = meta.get("name", directory.name) desc = meta.get("description", raw.split("\n")[0].lstrip("#").strip()) SKILL_REGISTRY[name] = { "name": name, "description": desc, "content": raw, } scan_skills() def list_skills() -> str: if not SKILL_REGISTRY: return "(no skills found)" return "\n".join( f"- {skill['name']}: {skill['description']}" for skill in SKILL_REGISTRY.values()) def load_skill(name: str) -> str: skill = SKILL_REGISTRY.get(name) if not skill: available = ", ".join(SKILL_REGISTRY.keys()) or "(none)" return f"Skill not found: {name}. Available: {available}" return skill["content"] # ── Prompt Assembly ── PROMPT_SECTIONS = { "identity": "You are a coding agent. Act, don't explain.", "tools": "Available tools: bash, read_file, write_file, edit_file, glob, " "todo_write, task, load_skill, compact, " "create_task, list_tasks, get_task, claim_task, complete_task, " "schedule_cron, list_crons, cancel_cron, " "spawn_teammate, send_message, check_inbox, " "request_shutdown, request_plan, review_plan, " "create_worktree, remove_worktree, keep_worktree, " "connect_mcp. MCP tools are prefixed mcp__{server}__{tool}.", "workspace": f"Working directory: {WORKDIR}", "memory": "Relevant memories are injected below when available.", } def assemble_system_prompt(context: dict) -> str: # The system prompt is rebuilt each turn from live context. This is where # memory, skill catalog, MCP state, and active teammates become visible. sections = [PROMPT_SECTIONS["identity"], PROMPT_SECTIONS["tools"], PROMPT_SECTIONS["workspace"]] sections.append(f"Current time: {datetime.now().isoformat(timespec='seconds')}") sections.append("Skills catalog:\n" + list_skills() + "\nUse load_skill(name) when a skill is relevant.") if context.get("memories"): sections.append(f"Relevant memories:\n{context['memories']}") mcp_names = list(mcp_clients.keys()) if mcp_names: sections.append(f"Connected MCP servers: {', '.join(mcp_names)}") return "\n\n".join(sections) # ── Basic Tools ── def safe_path(p: str, cwd: Path = None) -> Path: # File tools stay inside the workspace or teammate worktree. Bash remains # powerful on purpose and is controlled by the permission hook instead. base = cwd or WORKDIR path = (base / p).resolve() if not path.is_relative_to(base): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str, cwd: Path = None, run_in_background: bool = False) -> str: # run_in_background is consumed by the dispatcher; direct execution ignores it. try: r = subprocess.run(command, shell=True, cwd=cwd or WORKDIR, capture_output=True, text=True, timeout=120) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int | None = None, offset: int = 0, cwd: Path = None) -> str: try: lines = safe_path(path, cwd).read_text().splitlines() offset = max(int(offset or 0), 0) limit = int(limit) if limit is not None else None lines = lines[offset:] if limit is not None and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"] return "\n".join(lines) except Exception as e: return f"Error: {e}" def run_write(path: str, content: str, cwd: Path = None) -> str: try: fp = safe_path(path, cwd) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes to {path}" except Exception as e: return f"Error: {e}" def run_edit(path: str, old_text: str, new_text: str, cwd: Path = None) -> str: try: fp = safe_path(path, cwd) text = fp.read_text() if old_text not in text: return f"Error: text not found in {path}" fp.write_text(text.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" def run_glob(pattern: str, cwd: Path = None) -> str: import glob as g try: base = cwd or WORKDIR results = [] for match in g.glob(pattern, root_dir=base): if (base / match).resolve().is_relative_to(base): results.append(match) return "\n".join(results) if results else "(no matches)" except Exception as e: return f"Error: {e}" def call_tool_handler(handler, args: dict, name: str) -> str: if not handler: return f"Unknown: {name}" try: return handler(**(args or {})) except TypeError as e: return f"Error: {e}" def run_todo_write(todos: list) -> str: global CURRENT_TODOS for i, todo in enumerate(todos): if "content" not in todo or "status" not in todo: return f"Error: todos[{i}] missing 'content' or 'status'" if todo["status"] not in ("pending", "in_progress", "completed"): return f"Error: todos[{i}] has invalid status '{todo['status']}'" CURRENT_TODOS = todos print(f" \033[33m[todo] updated {len(CURRENT_TODOS)} item(s)\033[0m") return f"Updated {len(CURRENT_TODOS)} todos" # ── MessageBus ── # Team communication is append-only JSONL mailboxes. This keeps the protocol # inspectable on disk and lets background teammates send messages. MAILBOX_DIR = WORKDIR / ".mailboxes" MAILBOX_DIR.mkdir(exist_ok=True) class MessageBus: def send(self, from_agent: str, to_agent: str, content: str, msg_type: str = "message", metadata: dict = None): msg = {"from": from_agent, "to": to_agent, "content": content, "type": msg_type, "ts": time.time(), "metadata": metadata or {}} inbox = MAILBOX_DIR / f"{to_agent}.jsonl" with open(inbox, "a") as f: f.write(json.dumps(msg) + "\n") terminal_print(f" \033[33m[bus] {from_agent} → {to_agent}: " f"({msg_type}) {content[:50]}\033[0m") def read_inbox(self, agent: str) -> list[dict]: inbox = MAILBOX_DIR / f"{agent}.jsonl" if not inbox.exists(): return [] msgs = [json.loads(line) for line in inbox.read_text().splitlines() if line.strip()] inbox.unlink() return msgs BUS = MessageBus() active_teammates: dict[str, bool] = {} # ── Protocol State ── @dataclass class ProtocolState: request_id: str type: str sender: str target: str status: str payload: str created_at: float = field(default_factory=time.time) pending_requests: dict[str, ProtocolState] = {} def new_request_id() -> str: return f"req_{random.randint(0, 999999):06d}" def match_response(response_type: str, request_id: str, approve: bool): # Responses are matched by request_id so one protocol reply cannot approve # a different pending request. state = pending_requests.get(request_id) if not state: return if state.type == "shutdown" and response_type != "shutdown_response": return if state.type == "plan_approval" and response_type != "plan_approval_response": return state.status = "approved" if approve else "rejected" def consume_lead_inbox(route_protocol=True) -> list[dict]: msgs = BUS.read_inbox("lead") if route_protocol: for msg in msgs: meta = msg.get("metadata", {}) req_id = meta.get("request_id", "") msg_type = msg.get("type", "") if req_id and msg_type.endswith("_response"): match_response(msg_type, req_id, meta.get("approve", False)) return msgs # ── Autonomous Agent ── IDLE_POLL_INTERVAL = 5 IDLE_TIMEOUT = 60 def scan_unclaimed_tasks() -> list[dict]: unclaimed = [] for f in sorted(TASKS_DIR.glob("task_*.json")): task = json.loads(f.read_text()) if (task.get("status") == "pending" and not task.get("owner") and can_start(task["id"])): unclaimed.append(task) return unclaimed def idle_poll(agent_name: str, messages: list, name: str, role: str, worktree_context: dict | None = None) -> str: # Autonomous teammates wake up for inbox messages first, then look for # unclaimed tasks. This keeps direct protocol messages higher priority. for _ in range(IDLE_TIMEOUT // IDLE_POLL_INTERVAL): time.sleep(IDLE_POLL_INTERVAL) inbox = BUS.read_inbox(agent_name) if inbox: for msg in inbox: if msg.get("type") == "shutdown_request": req_id = msg.get("metadata", {}).get("request_id", "") BUS.send(name, "lead", "Shutting down.", "shutdown_response", {"request_id": req_id, "approve": True}) return "shutdown" messages.append({"role": "user", "content": "" + json.dumps(inbox) + ""}) return "work" unclaimed = scan_unclaimed_tasks() if unclaimed: task_data = unclaimed[0] result = claim_task(task_data["id"], agent_name) if "Claimed" in result: wt_info = "" if task_data.get("worktree"): wt_path = WORKTREES_DIR / task_data["worktree"] wt_info = f"\nWork directory: {wt_path}" if worktree_context is not None: worktree_context["path"] = str(wt_path) messages.append({"role": "user", "content": f"Task {task_data['id']}: " f"{task_data['subject']}{wt_info}"}) return "work" return "timeout" # ── Teammate Thread ── def spawn_teammate_thread(name: str, role: str, prompt: str) -> str: if name in active_teammates: return f"Teammate '{name}' already exists" # Plan approval is a real gate: after submit_plan, the teammate stops # taking model/tool steps until lead sends plan_approval_response. protocol_ctx = {"waiting_plan": None} system = (f"You are '{name}', a {role}. " f"Use tools to complete tasks. " f"If a task has a worktree, work in that directory.") def handle_inbox_message(name: str, msg: dict, messages: list): msg_type = msg.get("type", "message") meta = msg.get("metadata", {}) req_id = meta.get("request_id", "") if msg_type == "shutdown_request": BUS.send(name, "lead", "Shutting down.", "shutdown_response", {"request_id": req_id, "approve": True}) return True if msg_type == "plan_approval_response": approve = meta.get("approve", False) if req_id == protocol_ctx["waiting_plan"]: protocol_ctx["waiting_plan"] = None messages.append({"role": "user", "content": "[Plan approved]" if approve else f"[Plan rejected] {msg['content']}"}) return False def run(): wt_ctx = {"path": None} def _wt_cwd(): # Once a task with a worktree is claimed, all teammate file tools # transparently run inside that isolated directory. p = wt_ctx["path"] return Path(p) if p else None def _run_bash(command: str) -> str: return run_bash(command, cwd=_wt_cwd()) def _run_read(path: str) -> str: return run_read(path, cwd=_wt_cwd()) def _run_write(path: str, content: str) -> str: return run_write(path, content, cwd=_wt_cwd()) def _run_list_tasks(): tasks = list_tasks() if not tasks: return "No tasks." return "\n".join( f" {t.id}: {t.subject} [{t.status}]" + (f" (wt:{t.worktree})" if t.worktree else "") for t in tasks) def _run_claim_task(task_id: str): result = claim_task(task_id, owner=name) if "Claimed" in result: task = load_task(task_id) wt_ctx["path"] = (str(WORKTREES_DIR / task.worktree) if task.worktree else None) return result def _run_complete_task(task_id: str): result = complete_task(task_id) wt_ctx["path"] = None return result messages = [{"role": "user", "content": prompt}] sub_tools = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}, "offset": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "send_message", "description": "Send message to another agent.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}}, "required": ["to", "content"]}}, {"name": "submit_plan", "description": "Submit a plan for Lead approval.", "input_schema": {"type": "object", "properties": {"plan": {"type": "string"}}, "required": ["plan"]}}, {"name": "list_tasks", "description": "List all tasks.", "input_schema": {"type": "object", "properties": {}, "required": []}}, {"name": "claim_task", "description": "Claim a pending task.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "complete_task", "description": "Mark an in-progress task as completed.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, ] sub_handlers = { "bash": _run_bash, "read_file": _run_read, "write_file": _run_write, "send_message": lambda to, content: (BUS.send(name, to, content), "Sent")[1], "list_tasks": _run_list_tasks, "claim_task": _run_claim_task, "complete_task": _run_complete_task, } while True: if len(messages) <= 3: messages.insert(0, {"role": "user", "content": f"You are '{name}', role: {role}. " f"Continue your work."}) should_shutdown = False for _ in range(10): inbox = BUS.read_inbox(name) for msg in inbox: stopped = handle_inbox_message(name, msg, messages) if stopped: should_shutdown = True break if should_shutdown: break if protocol_ctx["waiting_plan"]: # Poll only for protocol replies while the approval gate is # closed; do not let the model continue with the task. time.sleep(IDLE_POLL_INTERVAL) continue if inbox and not should_shutdown: non_protocol = [m for m in inbox if m.get("type") == "message"] if non_protocol: messages.append({"role": "user", "content": "" + json.dumps(non_protocol) + ""}) try: response = client.messages.create( model=MODEL, system=system, messages=messages[-20:], tools=sub_tools, max_tokens=8000) except Exception: break messages.append({"role": "assistant", "content": response.content}) if not has_tool_use(response.content): break results = [] for block in response.content: if block.type == "tool_use": if block.name == "submit_plan": output = _teammate_submit_plan( name, block.input.get("plan", "")) match = re.search(r"\((req_\d+)\)", output) protocol_ctx["waiting_plan"] = ( match.group(1) if match else output) else: handler = sub_handlers.get(block.name) output = call_tool_handler(handler, block.input, block.name) results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)}) if protocol_ctx["waiting_plan"]: # Ignore later tool_use blocks from the same model # response; they belong after approval, not before. break messages.append({"role": "user", "content": results}) if protocol_ctx["waiting_plan"]: break if should_shutdown: break if protocol_ctx["waiting_plan"]: continue idle_result = idle_poll(name, messages, name, role, wt_ctx) if idle_result in ("shutdown", "timeout"): break summary = "Done." for msg in reversed(messages): if msg["role"] == "assistant" and isinstance(msg["content"], list): for b in msg["content"]: if getattr(b, "type", None) == "text": summary = b.text break else: continue break BUS.send(name, "lead", summary, "result") active_teammates.pop(name, None) active_teammates[name] = True threading.Thread(target=run, daemon=True).start() return f"Teammate '{name}' spawned as {role}" def _teammate_submit_plan(from_name: str, plan: str) -> str: req_id = new_request_id() pending_requests[req_id] = ProtocolState( request_id=req_id, type="plan_approval", sender=from_name, target="lead", status="pending", payload=plan) BUS.send(from_name, "lead", plan, "plan_approval_request", {"request_id": req_id}) return f"Plan submitted ({req_id})" # ── Lead Protocol Tools ── def run_request_shutdown(teammate: str) -> str: req_id = new_request_id() pending_requests[req_id] = ProtocolState( request_id=req_id, type="shutdown", sender="lead", target=teammate, status="pending", payload="") BUS.send("lead", teammate, "Shut down.", "shutdown_request", {"request_id": req_id}) return f"Shutdown request sent to {teammate}" def run_request_plan(teammate: str, task: str) -> str: BUS.send("lead", teammate, f"Submit plan for: {task}", "message") return f"Asked {teammate} to submit a plan" def run_review_plan(request_id: str, approve: bool, feedback: str = "") -> str: state = pending_requests.get(request_id) if not state: return f"Request {request_id} not found" state.status = "approved" if approve else "rejected" BUS.send("lead", state.sender, feedback or ("Approved" if approve else "Rejected"), "plan_approval_response", {"request_id": request_id, "approve": approve}) return f"Plan {'approved' if approve else 'rejected'}" # ── Hooks + Permission Pipeline ── # Hooks are intentionally outside tool handlers. The loop can add permission, # logging, and stop behavior without changing each individual tool. HOOKS = {"UserPromptSubmit": [], "PreToolUse": [], "PostToolUse": [], "Stop": []} def register_hook(event: str, callback): HOOKS[event].append(callback) def trigger_hooks(event: str, *args): for callback in HOOKS[event]: result = callback(*args) if result is not None: return result return None DENY_LIST = ["rm -rf /", "sudo", "shutdown", "reboot", "mkfs", "dd if="] DESTRUCTIVE = ["rm ", "> /etc/", "chmod 777"] def permission_hook(block): # The permission layer sees the raw tool_use before dispatch. It can deny, # ask the user, or allow execution to continue. if block.name == "bash": command = block.input.get("command", "") for pattern in DENY_LIST: if pattern in command: return f"Permission denied: '{pattern}' is on the deny list" if any(token in command for token in DESTRUCTIVE): print(f"\n\033[33m[permission] destructive command\033[0m") print(f" {command}") choice = input(" Allow? [y/N] ").strip().lower() if choice not in ("y", "yes"): return "Permission denied by user" if block.name in ("write_file", "edit_file"): path = block.input.get("path", "") try: safe_path(path) except Exception: return f"Permission denied: path escapes workspace: {path}" if block.name.startswith("mcp__") and "deploy" in block.name: print(f"\n\033[33m[permission] MCP destructive-looking tool: {block.name}\033[0m") choice = input(" Allow? [y/N] ").strip().lower() if choice not in ("y", "yes"): return "Permission denied by user" return None def log_hook(block): print(f"\033[90m[HOOK] {block.name}\033[0m") return None def large_output_hook(block, output): if len(str(output)) > 100000: print(f"\033[33m[HOOK] large output from {block.name}: " f"{len(str(output))} chars\033[0m") return None def user_prompt_hook(query: str): print(f"\033[90m[HOOK] UserPromptSubmit: {WORKDIR}\033[0m") return None def stop_hook(messages: list): tool_count = 0 for msg in messages: content = msg.get("content") if isinstance(content, list): tool_count += sum(1 for item in content if isinstance(item, dict) and item.get("type") == "tool_result") print(f"\033[90m[HOOK] Stop: {tool_count} tool result(s)\033[0m") return None register_hook("UserPromptSubmit", user_prompt_hook) register_hook("PreToolUse", permission_hook) register_hook("PreToolUse", log_hook) register_hook("PostToolUse", large_output_hook) register_hook("Stop", stop_hook) # ── Subagent Tool ── SUB_SYSTEM = ( f"You are a coding subagent at {WORKDIR}. " "Complete the task, then return a concise final summary. " "Do not spawn more agents." ) SUB_TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}, "offset": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to a file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in a file once.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, {"name": "glob", "description": "Find files matching a glob pattern.", "input_schema": {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}}, ] SUB_HANDLERS = { "bash": run_bash, "read_file": run_read, "write_file": run_write, "edit_file": run_edit, "glob": run_glob, } def extract_text(content) -> str: if not isinstance(content, list): return str(content) return "\n".join( getattr(block, "text", "") for block in content if getattr(block, "type", None) == "text").strip() def has_tool_use(content) -> bool: # Do not rely on stop_reason alone; the concrete tool_use block is the # continuation signal used by the loop. return any(getattr(block, "type", None) == "tool_use" for block in content) def spawn_subagent(description: str) -> str: messages = [{"role": "user", "content": description}] for _ in range(30): response = client.messages.create( model=MODEL, system=SUB_SYSTEM, messages=messages, tools=SUB_TOOLS, max_tokens=8000) messages.append({"role": "assistant", "content": response.content}) if not has_tool_use(response.content): break results = [] for block in response.content: if block.type != "tool_use": continue blocked = trigger_hooks("PreToolUse", block) if blocked: output = str(blocked) else: handler = SUB_HANDLERS.get(block.name) output = call_tool_handler(handler, block.input, block.name) trigger_hooks("PostToolUse", block, output) results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)}) messages.append({"role": "user", "content": results}) for msg in reversed(messages): if msg["role"] == "assistant": text = extract_text(msg["content"]) if text: return text return "Subagent finished without a text summary." # ── Context Compaction ── # Compaction is layered: first shrink oversized tool results, then trim old # message ranges, and only call the model for a summary when the context is # still too large or the model explicitly asks for compact. def estimate_size(messages: list) -> int: return len(json.dumps(messages, default=str)) def collect_tool_results(messages: list): found = [] for mi, msg in enumerate(messages): content = msg.get("content") if msg.get("role") != "user" or not isinstance(content, list): continue for bi, block in enumerate(content): if isinstance(block, dict) and block.get("type") == "tool_result": found.append((mi, bi, block)) return found def persist_large_output(tool_use_id: str, output: str) -> str: if len(output) <= PERSIST_THRESHOLD: return output TOOL_RESULTS_DIR.mkdir(parents=True, exist_ok=True) path = TOOL_RESULTS_DIR / f"{tool_use_id}.txt" if not path.exists(): path.write_text(output) return (f"\nFull output: {path}\n" f"Preview:\n{output[:2000]}\n") def tool_result_budget(messages: list, max_bytes: int = 200_000) -> list: if not messages: return messages last = messages[-1] content = last.get("content") if last.get("role") != "user" or not isinstance(content, list): return messages blocks = [(i, b) for i, b in enumerate(content) if isinstance(b, dict) and b.get("type") == "tool_result"] total = sum(len(str(b.get("content", ""))) for _, b in blocks) if total <= max_bytes: return messages for _, block in sorted(blocks, key=lambda pair: len(str(pair[1].get("content", ""))), reverse=True): if total <= max_bytes: break text = str(block.get("content", "")) block["content"] = persist_large_output( block.get("tool_use_id", "unknown"), text) total = sum(len(str(b.get("content", ""))) for _, b in blocks) return messages def snip_compact(messages: list, max_messages: int = 50) -> list: if len(messages) <= max_messages: return messages keep_head, keep_tail = 3, max_messages - 3 snipped = len(messages) - keep_head - keep_tail return (messages[:keep_head] + [{"role": "user", "content": f"[snipped {snipped} messages]"}] + messages[-keep_tail:]) def micro_compact(messages: list) -> list: tool_results = collect_tool_results(messages) if len(tool_results) <= KEEP_RECENT_TOOL_RESULTS: return messages for _, _, block in tool_results[:-KEEP_RECENT_TOOL_RESULTS]: if len(str(block.get("content", ""))) > 120: block["content"] = "[Earlier tool result compacted. Re-run if needed.]" return messages def write_transcript(messages: list) -> Path: TRANSCRIPT_DIR.mkdir(parents=True, exist_ok=True) path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl" with path.open("w") as f: for msg in messages: f.write(json.dumps(msg, default=str) + "\n") return path def summarize_history(messages: list) -> str: conversation = json.dumps(messages, default=str)[:80000] prompt = ("Summarize this coding-agent conversation so work can continue. " "Preserve current goal, key findings, changed files, remaining work, " "and user constraints.\n\n" + conversation) response = client.messages.create( model=MODEL, messages=[{"role": "user", "content": prompt}], max_tokens=2000) return extract_text(response.content) or "(empty summary)" def compact_history(messages: list) -> list: transcript = write_transcript(messages) print(f" \033[36m[compact] transcript saved: {transcript}\033[0m") summary = summarize_history(messages) return [{"role": "user", "content": f"[Compacted]\n\n{summary}"}] def reactive_compact(messages: list) -> list: transcript = write_transcript(messages) print(f" \033[31m[reactive compact] transcript saved: {transcript}\033[0m") try: summary = summarize_history(messages) except Exception: summary = "Earlier conversation was trimmed after a prompt-too-long error." return [{"role": "user", "content": f"[Reactive compact]\n\n{summary}"}, *messages[-5:]] # ── Error Recovery ── class RecoveryState: def __init__(self): self.has_escalated = False self.recovery_count = 0 self.consecutive_529 = 0 self.has_attempted_reactive_compact = False self.current_model = PRIMARY_MODEL def retry_delay(attempt: int) -> float: base = min(BASE_DELAY_MS * (2 ** attempt), 32000) / 1000 return base + random.uniform(0, base * 0.25) def with_retry(fn, state: RecoveryState): for attempt in range(MAX_RETRIES): try: result = fn() state.consecutive_529 = 0 return result except Exception as e: name = type(e).__name__.lower() msg = str(e).lower() if "ratelimit" in name or "429" in msg: delay = retry_delay(attempt) print(f" \033[33m[429] retry {attempt + 1}/{MAX_RETRIES} " f"after {delay:.1f}s\033[0m") time.sleep(delay) continue if "overloaded" in name or "529" in msg or "overloaded" in msg: state.consecutive_529 += 1 if state.consecutive_529 >= MAX_CONSECUTIVE_529 and FALLBACK_MODEL: state.current_model = FALLBACK_MODEL state.consecutive_529 = 0 print(f" \033[31m[529] switching to {FALLBACK_MODEL}\033[0m") delay = retry_delay(attempt) print(f" \033[33m[529] retry {attempt + 1}/{MAX_RETRIES} " f"after {delay:.1f}s\033[0m") time.sleep(delay) continue raise raise RuntimeError(f"Max retries ({MAX_RETRIES}) exceeded") def is_prompt_too_long_error(e: Exception) -> bool: msg = str(e).lower() return (("prompt" in msg and "long" in msg) or "context_length_exceeded" in msg or "max_context_window" in msg) # ── Background Tasks ── # Slow tools return a placeholder tool_result immediately. Their real output is # later injected as a task_notification, so the main loop can keep moving. _bg_counter = 0 background_tasks: dict[str, dict] = {} background_results: dict[str, str] = {} background_lock = threading.Lock() def is_slow_operation(tool_name: str, tool_input: dict) -> bool: if tool_name != "bash": return False command = tool_input.get("command", "").lower() slow_keywords = ["install", "build", "test", "deploy", "compile", "docker build", "pip install", "npm install", "cargo build", "pytest", "make"] return any(keyword in command for keyword in slow_keywords) def should_run_background(tool_name: str, tool_input: dict) -> bool: if tool_name != "bash": return False return bool(tool_input.get("run_in_background")) or is_slow_operation(tool_name, tool_input) def start_background_task(block, handlers: dict) -> str: global _bg_counter _bg_counter += 1 bg_id = f"bg_{_bg_counter:04d}" command = block.input.get("command", block.name) def worker(): handler = handlers.get(block.name) result = call_tool_handler(handler, block.input, block.name) trigger_hooks("PostToolUse", block, result) with background_lock: background_tasks[bg_id]["status"] = "completed" background_results[bg_id] = str(result) with background_lock: background_tasks[bg_id] = { "tool_use_id": block.id, "command": command, "status": "running", } threading.Thread(target=worker, daemon=True).start() print(f" \033[33m[background] {bg_id}: {str(command)[:60]}\033[0m") return bg_id def collect_background_results() -> list[str]: with background_lock: ready = [bg_id for bg_id, task in background_tasks.items() if task["status"] == "completed"] notifications = [] for bg_id in ready: with background_lock: task = background_tasks.pop(bg_id) output = background_results.pop(bg_id, "") summary = output[:200] if len(output) > 200 else output notifications.append( f"\n" f" {bg_id}\n" f" completed\n" f" {task['command']}\n" f" {summary}\n" f"") return notifications # ── Cron Scheduler ── # Cron jobs are stored separately from conversation history. When a job fires, # it becomes a scheduled prompt that is injected back into the same agent loop. DURABLE_PATH = WORKDIR / ".scheduled_tasks.json" @dataclass class CronJob: id: str cron: str prompt: str recurring: bool durable: bool scheduled_jobs: dict[str, CronJob] = {} cron_queue: list[CronJob] = [] cron_lock = threading.Lock() _last_fired: dict[str, str] = {} def _cron_field_matches(field: str, value: int) -> bool: if field == "*": return True if field.startswith("*/"): step = int(field[2:]) return step > 0 and value % step == 0 if "," in field: return any(_cron_field_matches(part.strip(), value) for part in field.split(",")) if "-" in field: lo, hi = field.split("-", 1) return int(lo) <= value <= int(hi) return value == int(field) def cron_matches(cron_expr: str, dt: datetime) -> bool: fields = cron_expr.strip().split() if len(fields) != 5: return False minute, hour, dom, month, dow = fields dow_val = (dt.weekday() + 1) % 7 m = _cron_field_matches(minute, dt.minute) h = _cron_field_matches(hour, dt.hour) dom_ok = _cron_field_matches(dom, dt.day) month_ok = _cron_field_matches(month, dt.month) dow_ok = _cron_field_matches(dow, dow_val) if not (m and h and month_ok): return False if dom == "*" and dow == "*": return True if dom == "*": return dow_ok if dow == "*": return dom_ok return dom_ok or dow_ok def _validate_cron_field(field: str, lo: int, hi: int) -> str | None: if field == "*": return None if field.startswith("*/"): step = field[2:] if not step.isdigit() or int(step) <= 0: return f"Invalid step: {field}" return None if "," in field: for part in field.split(","): err = _validate_cron_field(part.strip(), lo, hi) if err: return err return None if "-" in field: left, right = field.split("-", 1) if not left.isdigit() or not right.isdigit(): return f"Invalid range: {field}" a, b = int(left), int(right) if a < lo or a > hi or b < lo or b > hi: return f"Range {field} out of bounds [{lo}-{hi}]" if a > b: return f"Range start > end: {field}" return None if not field.isdigit(): return f"Invalid field: {field}" value = int(field) if value < lo or value > hi: return f"Value {value} out of bounds [{lo}-{hi}]" return None def validate_cron(cron_expr: str) -> str | None: fields = cron_expr.strip().split() if len(fields) != 5: return f"Expected 5 fields, got {len(fields)}" bounds = [(0, 59), (0, 23), (1, 31), (1, 12), (0, 6)] names = ["minute", "hour", "day-of-month", "month", "day-of-week"] for field, (lo, hi), name in zip(fields, bounds, names): err = _validate_cron_field(field, lo, hi) if err: return f"{name}: {err}" return None def save_durable_jobs(): durable = [asdict(job) for job in scheduled_jobs.values() if job.durable] DURABLE_PATH.write_text(json.dumps(durable, indent=2)) def load_durable_jobs(): if not DURABLE_PATH.exists(): return try: for item in json.loads(DURABLE_PATH.read_text()): job = CronJob(**item) if not validate_cron(job.cron): scheduled_jobs[job.id] = job except Exception: pass def schedule_job(cron: str, prompt: str, recurring: bool = True, durable: bool = True) -> CronJob | str: err = validate_cron(cron) if err: return err job = CronJob( id=f"cron_{random.randint(0, 999999):06d}", cron=cron, prompt=prompt, recurring=recurring, durable=durable) with cron_lock: scheduled_jobs[job.id] = job if durable: save_durable_jobs() return job def cancel_job(job_id: str) -> str: with cron_lock: job = scheduled_jobs.pop(job_id, None) if not job: return f"Job {job_id} not found" if job.durable: save_durable_jobs() return f"Cancelled {job_id}" def cron_scheduler_loop(): while True: time.sleep(1) now = datetime.now() marker = now.strftime("%Y-%m-%d %H:%M") with cron_lock: for job in list(scheduled_jobs.values()): try: if cron_matches(job.cron, now) and _last_fired.get(job.id) != marker: cron_queue.append(job) _last_fired[job.id] = marker if not job.recurring: scheduled_jobs.pop(job.id, None) if job.durable: save_durable_jobs() except Exception as e: print(f" \033[31m[cron error] {job.id}: {e}\033[0m") def consume_cron_queue() -> list[CronJob]: with cron_lock: fired = list(cron_queue) cron_queue.clear() return fired def run_schedule_cron(cron: str, prompt: str, recurring: bool = True, durable: bool = True) -> str: result = schedule_job(cron, prompt, recurring, durable) if isinstance(result, str): return f"Error: {result}" return f"Scheduled {result.id}: '{cron}' -> {prompt}" def run_list_crons() -> str: with cron_lock: jobs = list(scheduled_jobs.values()) if not jobs: return "No cron jobs." return "\n".join( f" {job.id}: '{job.cron}' -> {job.prompt[:40]} " f"[{'recurring' if job.recurring else 'one-shot'}, " f"{'durable' if job.durable else 'session'}]" for job in jobs) def run_cancel_cron(job_id: str) -> str: return cancel_job(job_id) load_durable_jobs() threading.Thread(target=cron_scheduler_loop, daemon=True).start() # ── MCP System ── # MCP is modeled as late-bound tools: connect first, then discovered server # tools are merged into the normal tool pool with mcp__server__tool names. class MCPClient: """Discovers and calls tools on an MCP server (mock for teaching).""" def __init__(self, name: str): self.name = name self.tools: list[dict] = [] self._handlers: dict[str, callable] = {} def register(self, tool_defs: list[dict], handlers: dict[str, callable]): self.tools = tool_defs self._handlers = handlers def call_tool(self, tool_name: str, args: dict) -> str: handler = self._handlers.get(tool_name) if not handler: return f"MCP error: unknown tool '{tool_name}'" try: return handler(**args) except Exception as e: return f"MCP error: {e}" mcp_clients: dict[str, MCPClient] = {} _DISALLOWED_CHARS = re.compile(r'[^a-zA-Z0-9_-]') def normalize_mcp_name(name: str) -> str: """Replace non [a-zA-Z0-9_-] with underscore.""" return _DISALLOWED_CHARS.sub('_', name) def _mock_server_docs(): client = MCPClient("docs") client.register( tool_defs=[ {"name": "search", "description": "Search documentation. (readOnly)", "inputSchema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}}, {"name": "get_version", "description": "Get API version. (readOnly)", "inputSchema": {"type": "object", "properties": {}, "required": []}}, ], handlers={ "search": lambda query: f"[docs] Found 3 results for '{query}'", "get_version": lambda: "[docs] API v2.1.0", }) return client def _mock_server_deploy(): client = MCPClient("deploy") client.register( tool_defs=[ {"name": "trigger", "description": "Trigger a deployment. (destructive — requires approval in real CC)", "inputSchema": {"type": "object", "properties": {"service": {"type": "string"}}, "required": ["service"]}}, {"name": "status", "description": "Check deployment status. (readOnly)", "inputSchema": {"type": "object", "properties": {"service": {"type": "string"}}, "required": ["service"]}}, ], handlers={ "trigger": lambda service: f"[deploy] Triggered: {service}", "status": lambda service: f"[deploy] {service}: running (v1.4.2)", }) return client MOCK_SERVERS = { "docs": _mock_server_docs, "deploy": _mock_server_deploy, } def connect_mcp(name: str) -> str: if name in mcp_clients: return f"MCP server '{name}' already connected" factory = MOCK_SERVERS.get(name) if not factory: available = ", ".join(MOCK_SERVERS.keys()) return f"Unknown server '{name}'. Available: {available}" mcp_client = factory() mcp_clients[name] = mcp_client tool_names = [t["name"] for t in mcp_client.tools] print(f" \033[31m[mcp] connected: {name} → {tool_names}\033[0m") return (f"Connected to MCP server '{name}'. " f"Discovered {len(mcp_client.tools)} tools: {', '.join(tool_names)}") def assemble_tool_pool() -> tuple[list[dict], dict]: """Merge builtin tools + all MCP tools into one pool.""" tools = list(BUILTIN_TOOLS) handlers = dict(BUILTIN_HANDLERS) for server_name, mcp_client in mcp_clients.items(): safe_server = normalize_mcp_name(server_name) for tool_def in mcp_client.tools: safe_tool = normalize_mcp_name(tool_def["name"]) prefixed = f"mcp__{safe_server}__{safe_tool}" tools.append({ "name": prefixed, "description": tool_def.get("description", ""), "input_schema": tool_def.get("inputSchema", {}), }) handlers[prefixed] = ( lambda *, c=mcp_client, t=tool_def["name"], **kw: c.call_tool(t, kw)) return tools, handlers # ── Lead Worktree Tools ── def run_create_worktree(name: str, task_id: str = "") -> str: return create_worktree(name, task_id) def run_remove_worktree(name: str, discard_changes: bool = False) -> str: return remove_worktree(name, discard_changes) def run_keep_worktree(name: str) -> str: return keep_worktree(name) # ── Basic tool handlers ── def run_create_task(subject: str, description: str = "", blockedBy: list[str] | None = None) -> str: task = create_task(subject, description, blockedBy) deps = f" (blockedBy: {', '.join(blockedBy)})" if blockedBy else "" print(f" \033[34m[create] {task.subject}{deps}\033[0m") return f"Created {task.id}: {task.subject}{deps}" def run_list_tasks() -> str: tasks = list_tasks() if not tasks: return "No tasks." return "\n".join( f" {t.id}: {t.subject} [{t.status}]" + (f" (wt:{t.worktree})" if t.worktree else "") for t in tasks) def run_get_task(task_id: str) -> str: try: return get_task_json(task_id) except FileNotFoundError: return f"Error: task {task_id} not found" def run_claim_task(task_id: str) -> str: try: return claim_task(task_id, owner="agent") except FileNotFoundError: return f"Error: task {task_id} not found" def run_complete_task(task_id: str) -> str: try: return complete_task(task_id) except FileNotFoundError: return f"Error: task {task_id} not found" def run_spawn_teammate(name: str, role: str, prompt: str) -> str: return spawn_teammate_thread(name, role, prompt) def run_send_message(to: str, content: str) -> str: BUS.send("lead", to, content) return f"Sent to {to}" def run_check_inbox() -> str: msgs = consume_lead_inbox(route_protocol=True) if not msgs: return "(inbox empty)" lines = [] for m in msgs: meta = m.get("metadata", {}) req_id = meta.get("request_id", "") tag = f" [{m['type']} req:{req_id}]" if req_id else f" [{m['type']}]" lines.append(f" [{m['from']}]{tag} {m['content'][:200]}") return "\n".join(lines) def run_connect_mcp(name: str) -> str: return connect_mcp(name) # ── Tool Definitions ── # The model sees tool schemas; Python executes handlers. S20 keeps both tables # explicit so every added capability is visible in one place. BUILTIN_TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}, "run_in_background": {"type": "boolean"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}, "offset": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to a file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in a file once.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, {"name": "glob", "description": "Find files matching a glob pattern.", "input_schema": {"type": "object", "properties": {"pattern": {"type": "string"}}, "required": ["pattern"]}}, {"name": "todo_write", "description": "Create and manage a task list for the current session.", "input_schema": {"type": "object", "properties": {"todos": {"type": "array", "items": {"type": "object", "properties": { "content": {"type": "string"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}}, "required": ["content", "status"]}}}, "required": ["todos"]}}, {"name": "task", "description": "Launch a focused subagent. Returns only its final summary.", "input_schema": {"type": "object", "properties": {"description": {"type": "string"}}, "required": ["description"]}}, {"name": "load_skill", "description": "Load the full content of a skill by name.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}}, {"name": "compact", "description": "Summarize earlier conversation and continue with compacted context.", "input_schema": {"type": "object", "properties": {"focus": {"type": "string"}}, "required": []}}, {"name": "create_task", "description": "Create a task.", "input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}, "blockedBy": {"type": "array", "items": {"type": "string"}}}, "required": ["subject"]}}, {"name": "list_tasks", "description": "List all tasks.", "input_schema": {"type": "object", "properties": {}, "required": []}}, {"name": "get_task", "description": "Get full task details.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "claim_task", "description": "Claim a pending task.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "complete_task", "description": "Complete an in-progress task.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "schedule_cron", "description": ("Schedule a cron job. cron is 5-field: min hour dom " "month dow. For one-shot reminders, compute the target " "minute and set recurring=false."), "input_schema": {"type": "object", "properties": {"cron": {"type": "string"}, "prompt": {"type": "string"}, "recurring": {"type": "boolean"}, "durable": {"type": "boolean"}}, "required": ["cron", "prompt"]}}, {"name": "list_crons", "description": "List registered cron jobs.", "input_schema": {"type": "object", "properties": {}, "required": []}}, {"name": "cancel_cron", "description": "Cancel a cron job by ID.", "input_schema": {"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]}}, {"name": "spawn_teammate", "description": "Spawn an autonomous teammate.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}}, {"name": "send_message", "description": "Send message to a teammate.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}}, "required": ["to", "content"]}}, {"name": "check_inbox", "description": "Check inbox for messages and protocol responses.", "input_schema": {"type": "object", "properties": {}, "required": []}}, {"name": "request_shutdown", "description": "Request a teammate to shut down.", "input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}}, {"name": "request_plan", "description": "Ask a teammate to submit a plan.", "input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}, "task": {"type": "string"}}, "required": ["teammate", "task"]}}, {"name": "review_plan", "description": "Approve or reject a submitted plan.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}}, {"name": "create_worktree", "description": "Create an isolated git worktree.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "task_id": {"type": "string"}}, "required": ["name"]}}, {"name": "remove_worktree", "description": "Remove a worktree. Refuses if changes exist.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "discard_changes": {"type": "boolean"}}, "required": ["name"]}}, {"name": "keep_worktree", "description": "Keep a worktree for manual review.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}}, {"name": "connect_mcp", "description": "Connect to an MCP server (docs, deploy) and discover tools.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}}, ] BUILTIN_HANDLERS = { "bash": run_bash, "read_file": run_read, "write_file": run_write, "edit_file": run_edit, "glob": run_glob, "todo_write": run_todo_write, "task": spawn_subagent, "load_skill": load_skill, "create_task": run_create_task, "list_tasks": run_list_tasks, "get_task": run_get_task, "claim_task": run_claim_task, "complete_task": run_complete_task, "schedule_cron": run_schedule_cron, "list_crons": run_list_crons, "cancel_cron": run_cancel_cron, "spawn_teammate": run_spawn_teammate, "send_message": run_send_message, "check_inbox": run_check_inbox, "request_shutdown": run_request_shutdown, "request_plan": run_request_plan, "review_plan": run_review_plan, "create_worktree": run_create_worktree, "remove_worktree": run_remove_worktree, "keep_worktree": run_keep_worktree, "connect_mcp": run_connect_mcp, } # ── Context ── MEMORY_DIR = WORKDIR / ".memory" MEMORY_INDEX = MEMORY_DIR / "MEMORY.md" def update_context(context: dict, messages: list) -> dict: memories = "" if MEMORY_INDEX.exists(): memories = MEMORY_INDEX.read_text()[:2000] return { "memories": memories, "connected_mcp": list(mcp_clients.keys()), "active_teammates": list(active_teammates.keys()), } # ── Agent Loop ── rounds_since_todo = 0 agent_lock = threading.Lock() def prepare_context(messages: list) -> list: # Every LLM turn enters through the same context budget pipeline. messages[:] = tool_result_budget(messages) messages[:] = snip_compact(messages) messages[:] = micro_compact(messages) if estimate_size(messages) > CONTEXT_LIMIT: messages[:] = compact_history(messages) return messages def build_user_content(results: list[dict]) -> list[dict]: # Tool results and completed background notifications are both returned to # the model as user-side content, matching the tool_result feedback loop. content = list(results) for note in collect_background_results(): content.append({"type": "text", "text": note}) return content def inject_background_notifications(messages: list): notes = collect_background_results() if notes: messages.append({"role": "user", "content": [ {"type": "text", "text": note} for note in notes]}) def call_llm(messages: list, context: dict, tools: list, state: RecoveryState, max_tokens: int): system = assemble_system_prompt(context) return with_retry( lambda: client.messages.create( model=state.current_model, system=system, messages=messages, tools=tools, max_tokens=max_tokens), state) def agent_loop(messages: list, context: dict): global rounds_since_todo tools, handlers = assemble_tool_pool() state = RecoveryState() max_tokens = DEFAULT_MAX_TOKENS while True: # One cycle: inject scheduled/background work, prepare context, call # the model, execute tool_use blocks, append tool_results, repeat. fired = consume_cron_queue() for job in fired: messages.append({"role": "user", "content": f"[Scheduled] {job.prompt}"}) print(f" \033[35m[cron inject] {job.prompt[:60]}\033[0m") inject_background_notifications(messages) if rounds_since_todo >= 3: messages.append({"role": "user", "content": "Update your todos."}) rounds_since_todo = 0 prepare_context(messages) context = update_context(context, messages) tools, handlers = assemble_tool_pool() try: response = call_llm(messages, context, tools, state, max_tokens) except Exception as e: if is_prompt_too_long_error(e) and not state.has_attempted_reactive_compact: messages[:] = reactive_compact(messages) state.has_attempted_reactive_compact = True continue messages.append({"role": "assistant", "content": [ {"type": "text", "text": f"[Error] {type(e).__name__}: {e}"}]}) return if response.stop_reason == "max_tokens": if not state.has_escalated: max_tokens = ESCALATED_MAX_TOKENS state.has_escalated = True print(f" \033[33m[max_tokens] retry with {max_tokens}\033[0m") continue messages.append({"role": "assistant", "content": response.content}) if state.recovery_count < MAX_RECOVERY_RETRIES: messages.append({"role": "user", "content": CONTINUATION_PROMPT}) state.recovery_count += 1 continue return max_tokens = DEFAULT_MAX_TOKENS state.has_escalated = False messages.append({"role": "assistant", "content": response.content}) if not has_tool_use(response.content): trigger_hooks("Stop", messages) return results = [] compacted_now = False for block in response.content: if block.type != "tool_use": continue print(f"\033[36m> {block.name}\033[0m") if block.name == "compact": messages[:] = compact_history(messages) messages.append({"role": "user", "content": "[Compacted. Continue with summarized context.]"}) compacted_now = True break blocked = trigger_hooks("PreToolUse", block) if blocked: results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(blocked)}) continue if should_run_background(block.name, block.input): bg_id = start_background_task(block, handlers) output = (f"[Background task {bg_id} started] " "Result will arrive as a task_notification.") results.append({"type": "tool_result", "tool_use_id": block.id, "content": output}) continue handler = handlers.get(block.name) output = call_tool_handler(handler, block.input, block.name) trigger_hooks("PostToolUse", block, output) print(str(output)[:300]) if block.name == "todo_write": rounds_since_todo = 0 else: rounds_since_todo += 1 results.append({"type": "tool_result", "tool_use_id": block.id, "content": output}) if compacted_now: continue messages.append({"role": "user", "content": build_user_content(results)}) def print_turn_assistants(messages: list, turn_start: int): for msg in messages[turn_start:]: if msg.get("role") != "assistant": continue for block in msg.get("content", []): if getattr(block, "type", None) == "text": terminal_print(block.text) def cron_autorun_loop(history: list, context: dict): while True: time.sleep(1) fired = consume_cron_queue() if not fired: continue with agent_lock: turn_start = len(history) for job in fired: history.append({"role": "user", "content": f"[Scheduled] {job.prompt}"}) terminal_print( f" \033[35m[cron auto] {job.prompt[:60]}\033[0m") agent_loop(history, context) context.update(update_context(context, history)) print_turn_assistants(history, turn_start) if __name__ == "__main__": CLI_ACTIVE = True print("s20: comprehensive agent") print("Enter a question, press Enter to send. Type q to quit.\n") history = [] context = update_context({}, []) threading.Thread(target=cron_autorun_loop, args=(history, context), daemon=True).start() while True: try: query = input(PROMPT) except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break trigger_hooks("UserPromptSubmit", query) turn_start = len(history) history.append({"role": "user", "content": query}) with agent_lock: agent_loop(history, context) context = update_context(context, history) print_turn_assistants(history, turn_start) inbox = consume_lead_inbox(route_protocol=True) if inbox: def inbox_label(msg): req_id = msg.get("metadata", {}).get("request_id", "") suffix = f" req:{req_id}" if req_id else "" return f"{msg.get('type', 'message')}{suffix}" inbox_text = "\n".join( f"From {m['from']} [{inbox_label(m)}]: " f"{m['content'][:200]}" for m in inbox) history.append({"role": "user", "content": f"[Inbox]\n{inbox_text}"}) print()