Files
gui-yue 1baf1aca5a Follow up PR #265: refine chapters, diagrams, and add S20 (#283)
* feat: s01-s14 docs quality overhaul — tool pipeline, single-agent, knowledge & resilience

Rewrite code.py and README (zh/en/ja) for s01-s14, each chapter building
incrementally on the previous. Key fixes across chapters:

- s01-s04: agent loop, tool dispatch, permission pipeline, hooks
- s05-s08: todo write, subagent, skill loading, context compact
- s09-s11: memory system, system prompt assembly, error recovery
- s12-s14: task graph, background tasks, cron scheduler

All chapters CC source-verified. Code inherits fixes forward (PROMPT_SECTIONS,
json.dumps cache, real-state context, can_start dep protection, etc.).

* feat: s15-s19 docs quality overhaul — multi-agent platform: teams, protocols, autonomy, worktree, MCP tools

Rewrite code.py and README (zh/en/ja) for s15-s19, the multi-agent platform
chapters. Each chapter inherits all previous fixes and adds one mechanism:

- s15: agent teams (TeamCreate, teammate threads, shared task list)
- s16: team protocols (plan approval, shutdown handshake, consume_inbox)
- s17: autonomous agents (idle polling, auto-claim, consume_lead_inbox)
- s18: worktree isolation (git worktree, bind_task, cwd switching, safety)
- s19: MCP tools (MCPClient, normalize_mcp_name, assemble_tool_pool, no cache)

All appendix source code references verified against CC source. Config priority
corrected: claude.ai < plugin < user < project < local.

* fix: 5 regressions across s05-s19 — glob safety, todo validation, memory extraction, protocol types, dep crash

- s05-s09: glob results now filter with is_relative_to(WORKDIR) (inherited from s02)
- s06-s08: todo_write validates content/status required fields (inherited from s05)
- s09: extract_memories uses pre-compression snapshot instead of compacted messages
- s16: submit_plan docstring clarifies protocol-only (not code-level gate)
- s17-s19: match_response restores type mismatch validation (from s16)
- s17-s19: claim_task deps list handles missing dep files without crashing

* fix: s12 Todo V2 logic reversal, s14/s15 cron range validation, s18/s19 worktree name validation

- s12 README (zh/en/ja): fix Todo V2 direction — interactive defaults to Task,
  non-interactive/SDK defaults to TodoWrite. Fix env var name to
  CLAUDE_CODE_ENABLE_TASKS (not TODO_V2).
- s14/s15: add _validate_cron_field with per-field range checks (minute 0-59,
  hour 0-23, dom 1-31, month 1-12, dow 0-6), step > 0, range lo <= hi.
  Replace old try/except validation that only caught exceptions.
- s18/s19: add validate_worktree_name() to remove_worktree and keep_worktree,
  not just create_worktree.

* fix: align s16-s19 teaching tool consistency

* fix pr265 chapter diagrams

* Add comprehensive s20 harness chapter

* Fix chapter smoke test regressions

* Clarify README tutorial track transition

---------

Co-authored-by: Haoran <bill-billion@outlook.com>
2026-05-20 21:45:38 +08:00

997 lines
37 KiB
Python

#!/usr/bin/env python3
"""
s18: Worktree Isolation — git worktree + task-directory binding + event log.
Run: python s18_worktree_isolation/code.py
Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY
Changes from s17:
- Task dataclass gains worktree field (str | None)
- validate_worktree_name: reject path traversal and illegal chars
- create_worktree: validate name, git worktree add, optional task binding
- bind_task_to_worktree: write worktree field only, keep task pending
- remove_worktree: safety check before force, no auto-complete
- run_git returns (ok, output), events only on success
- Teammate tools: + complete_task, run in worktree cwd when bound
- scan_unclaimed_tasks: uses can_start() for dependency checking
- idle_poll: checks claim result, dispatches shutdown in IDLE
- consume_lead_inbox: unified inbox consumer
- 3 new Lead tools: create_worktree, remove_worktree, keep_worktree
ASCII topology:
Main repo (/)
├── .worktrees/auth/ (branch: wt/auth) ← Task #1
├── .worktrees/ui/ (branch: wt/ui) ← Task #2
├── .tasks/task_xxx.json (worktree: "auth")
└── .worktrees/events.jsonl
"""
import os, subprocess, json, time, random, threading, re
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict, field
try:
import readline
readline.parse_and_bind('set bind-tty-special-chars off')
except ImportError:
pass
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
# ── Task System (from s12 + s18 worktree field) ──
TASKS_DIR = WORKDIR / ".tasks"
TASKS_DIR.mkdir(exist_ok=True)
@dataclass
class Task:
id: str
subject: str
description: str
status: str
owner: str | None
blockedBy: list[str]
worktree: str | None = None # s18: bound worktree name
def _task_path(task_id: str) -> Path:
return TASKS_DIR / f"{task_id}.json"
def create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> Task:
task = Task(
id=f"task_{int(time.time())}_{random.randint(0, 9999):04d}",
subject=subject, description=description,
status="pending", owner=None,
blockedBy=blockedBy or [],
)
save_task(task)
return task
def save_task(task: Task):
_task_path(task.id).write_text(json.dumps(asdict(task), indent=2))
def load_task(task_id: str) -> Task:
return Task(**json.loads(_task_path(task_id).read_text()))
def list_tasks() -> list[Task]:
return [Task(**json.loads(p.read_text()))
for p in sorted(TASKS_DIR.glob("task_*.json"))]
def get_task_json(task_id: str) -> str:
task = load_task(task_id)
return json.dumps(asdict(task), indent=2)
def can_start(task_id: str) -> bool:
task = load_task(task_id)
for dep_id in task.blockedBy:
if not _task_path(dep_id).exists():
return False
if load_task(dep_id).status != "completed":
return False
return True
def claim_task(task_id: str, owner: str = "agent") -> str:
task = load_task(task_id)
if task.status != "pending":
return f"Task {task_id} is {task.status}, cannot claim"
if task.owner:
return f"Task {task_id} already owned by {task.owner}"
if not can_start(task_id):
deps = [d for d in task.blockedBy
if _task_path(d).exists() and load_task(d).status != "completed"]
missing = [d for d in task.blockedBy if not _task_path(d).exists()]
parts = []
if deps: parts.append(f"blocked by: {deps}")
if missing: parts.append(f"missing deps: {missing}")
return "Cannot start — " + ", ".join(parts)
task.owner = owner
task.status = "in_progress"
save_task(task)
print(f" \033[36m[claim] {task.subject} → in_progress\033[0m")
return f"Claimed {task.id} ({task.subject})"
def complete_task(task_id: str) -> str:
task = load_task(task_id)
if task.status != "in_progress":
return f"Task {task_id} is {task.status}, cannot complete"
task.status = "completed"
save_task(task)
unblocked = [t.subject for t in list_tasks()
if t.status == "pending" and t.blockedBy and can_start(t.id)]
print(f" \033[32m[complete] {task.subject}\033[0m")
msg = f"Completed {task.id} ({task.subject})"
if unblocked:
msg += f"\nUnblocked: {', '.join(unblocked)}"
return msg
# ── Worktree System (s18 new) ──
WORKTREES_DIR = WORKDIR / ".worktrees"
WORKTREES_DIR.mkdir(exist_ok=True)
VALID_WT_NAME = re.compile(r'^[A-Za-z0-9._-]{1,64}$')
def validate_worktree_name(name: str) -> str | None:
"""Return error message if invalid, None if valid."""
if not name:
return "Worktree name cannot be empty"
if name == "." or name == "..":
return f"'{name}' is not a valid worktree name"
if not VALID_WT_NAME.match(name):
return (f"Invalid worktree name '{name}': "
"only letters, digits, dots, underscores, dashes (1-64 chars)")
return None
def run_git(args: list[str]) -> tuple[bool, str]:
"""Run git command. Return (ok, output)."""
try:
r = subprocess.run(["git"] + args, cwd=WORKDIR,
capture_output=True, text=True, timeout=30)
out = (r.stdout + r.stderr).strip()
out = out[:5000] if out else "(no output)"
return r.returncode == 0, out
except subprocess.TimeoutExpired:
return False, "Error: git timeout"
def log_event(event_type: str, worktree_name: str, task_id: str = ""):
"""Append a lifecycle event to events.jsonl."""
event = {"type": event_type, "worktree": worktree_name,
"task_id": task_id, "ts": time.time()}
events_file = WORKTREES_DIR / "events.jsonl"
with open(events_file, "a") as f:
f.write(json.dumps(event) + "\n")
def create_worktree(name: str, task_id: str = "") -> str:
"""Create a git worktree with a dedicated branch. Optionally bind to a task."""
err = validate_worktree_name(name)
if err:
return f"Error: {err}"
path = WORKTREES_DIR / name
if path.exists():
return f"Worktree '{name}' already exists at {path}"
ok, result = run_git(["worktree", "add", str(path), "-b", f"wt/{name}", "HEAD"])
if not ok:
return f"Git error: {result}"
if task_id:
bind_task_to_worktree(task_id, name)
log_event("create", name, task_id)
print(f" \033[33m[worktree] created: {name} at {path}\033[0m")
return f"Worktree '{name}' created at {path}"
def bind_task_to_worktree(task_id: str, worktree_name: str):
"""Write worktree field to task. Keep status as pending for auto-claim."""
task = load_task(task_id)
task.worktree = worktree_name
save_task(task)
print(f" \033[33m[bind] {task.subject} → worktree:{worktree_name}\033[0m")
def _count_worktree_changes(path: Path) -> tuple[int, int]:
"""Count uncommitted files and commits in a worktree."""
try:
r1 = subprocess.run(["git", "status", "--porcelain"],
cwd=path, capture_output=True, text=True, timeout=10)
files = len([l for l in r1.stdout.strip().splitlines() if l.strip()])
r2 = subprocess.run(["git", "log", "@{push}..HEAD", "--oneline"],
cwd=path, capture_output=True, text=True, timeout=10)
commits = len([l for l in r2.stdout.strip().splitlines() if l.strip()])
return files, commits
except Exception:
return -1, -1
def remove_worktree(name: str, discard_changes: bool = False) -> str:
"""Remove worktree. Refuses if uncommitted changes unless discard_changes."""
err = validate_worktree_name(name)
if err:
return err
path = WORKTREES_DIR / name
if not path.exists():
return f"Worktree '{name}' not found"
if not discard_changes:
files, commits = _count_worktree_changes(path)
if files < 0:
return (f"Cannot verify worktree '{name}' status. "
"Use discard_changes=true to force removal.")
if files > 0 or commits > 0:
return (f"Worktree '{name}' has {files} uncommitted file(s) "
f"and {commits} unpushed commit(s). "
"Use discard_changes=true to force removal, "
"or keep_worktree to preserve for review.")
ok1, _ = run_git(["worktree", "remove", str(path), "--force"])
if not ok1:
return f"Failed to remove worktree directory for '{name}'"
run_git(["branch", "-D", f"wt/{name}"])
log_event("remove", name)
print(f" \033[33m[worktree] removed: {name}\033[0m")
return f"Worktree '{name}' removed"
def keep_worktree(name: str) -> str:
"""Keep worktree for manual review. Branch preserved."""
err = validate_worktree_name(name)
if err:
return err
log_event("keep", name)
print(f" \033[36m[worktree] kept: {name}\033[0m")
return f"Worktree '{name}' kept for review (branch: wt/{name})"
# ── Prompt Assembly (from s10) ──
PROMPT_SECTIONS = {
"identity": "You are a coding agent. Act, don't explain.",
"tools": "Available tools: bash, read_file, write_file, "
"create_task, list_tasks, get_task, claim_task, complete_task, "
"spawn_teammate, send_message, check_inbox, "
"request_shutdown, request_plan, review_plan, "
"create_worktree, remove_worktree, keep_worktree.",
"workspace": f"Working directory: {WORKDIR}",
"memory": "Relevant memories are injected below when available.",
}
def assemble_system_prompt(context: dict) -> str:
sections = [PROMPT_SECTIONS["identity"],
PROMPT_SECTIONS["tools"],
PROMPT_SECTIONS["workspace"]]
if context.get("memories"):
sections.append(f"Relevant memories:\n{context['memories']}")
return "\n\n".join(sections)
_last_context_hash, _last_prompt = None, None
def get_system_prompt(context: dict) -> str:
global _last_context_hash, _last_prompt
h = json.dumps(context, sort_keys=True)
if h == _last_context_hash and _last_prompt:
return _last_prompt
_last_context_hash, _last_prompt = h, assemble_system_prompt(context)
return _last_prompt
# ── Basic Tools ──
def safe_path(p: str, cwd: Path = None) -> Path:
base = cwd or WORKDIR
path = (base / p).resolve()
if not path.is_relative_to(base):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str, cwd: Path = None) -> str:
try:
r = subprocess.run(command, shell=True, cwd=cwd or WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int | None = None, cwd: Path = None) -> str:
try:
lines = safe_path(path, cwd).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str, cwd: Path = None) -> str:
try:
fp = safe_path(path, cwd)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"
# ── MessageBus (from s15) ──
MAILBOX_DIR = WORKDIR / ".mailboxes"
MAILBOX_DIR.mkdir(exist_ok=True)
class MessageBus:
def send(self, from_agent: str, to_agent: str, content: str,
msg_type: str = "message", metadata: dict = None):
msg = {"from": from_agent, "to": to_agent,
"content": content, "type": msg_type,
"ts": time.time(), "metadata": metadata or {}}
inbox = MAILBOX_DIR / f"{to_agent}.jsonl"
with open(inbox, "a") as f:
f.write(json.dumps(msg) + "\n")
print(f" \033[33m[bus] {from_agent}{to_agent}: "
f"({msg_type}) {content[:50]}\033[0m")
def read_inbox(self, agent: str) -> list[dict]:
inbox = MAILBOX_DIR / f"{agent}.jsonl"
if not inbox.exists():
return []
msgs = [json.loads(line) for line in inbox.read_text().splitlines()
if line.strip()]
inbox.unlink()
return msgs
BUS = MessageBus()
active_teammates: dict[str, bool] = {}
# ── Protocol State (from s16) ──
@dataclass
class ProtocolState:
request_id: str
type: str
sender: str
target: str
status: str
payload: str
created_at: float = field(default_factory=time.time)
pending_requests: dict[str, ProtocolState] = {}
def new_request_id() -> str:
return f"req_{random.randint(0, 999999):06d}"
def match_response(response_type: str, request_id: str, approve: bool):
state = pending_requests.get(request_id)
if not state:
print(f" \033[31m[protocol] unknown request_id: {request_id}\033[0m")
return
if state.type == "shutdown" and response_type != "shutdown_response":
print(f" \033[31m[protocol] type mismatch: expected shutdown_response, "
f"got {response_type}\033[0m")
return
if state.type == "plan_approval" and response_type != "plan_approval_response":
print(f" \033[31m[protocol] type mismatch: expected plan_approval_response, "
f"got {response_type}\033[0m")
return
state.status = "approved" if approve else "rejected"
icon = "" if approve else ""
color = "32" if approve else "31"
print(f" \033[{color}m[protocol] {state.type} {icon} "
f"({request_id}: {state.status})\033[0m")
def consume_lead_inbox(route_protocol=True) -> list[dict]:
msgs = BUS.read_inbox("lead")
if route_protocol:
for msg in msgs:
meta = msg.get("metadata", {})
req_id = meta.get("request_id", "")
msg_type = msg.get("type", "")
if req_id and msg_type.endswith("_response"):
match_response(msg_type, req_id, meta.get("approve", False))
return msgs
# ── Autonomous Agent (from s17, + worktree cwd) ──
IDLE_POLL_INTERVAL = 5
IDLE_TIMEOUT = 60
def scan_unclaimed_tasks() -> list[dict]:
"""Find pending, unowned tasks with all dependencies completed."""
unclaimed = []
for f in sorted(TASKS_DIR.glob("task_*.json")):
task = json.loads(f.read_text())
if (task.get("status") == "pending"
and not task.get("owner")
and can_start(task["id"])):
unclaimed.append(task)
return unclaimed
def idle_poll(agent_name: str, messages: list,
name: str, role: str) -> str:
"""Poll for 60s. Return 'work', 'shutdown', or 'timeout'."""
for _ in range(IDLE_TIMEOUT // IDLE_POLL_INTERVAL):
time.sleep(IDLE_POLL_INTERVAL)
inbox = BUS.read_inbox(agent_name)
if inbox:
for msg in inbox:
if msg.get("type") == "shutdown_request":
req_id = msg.get("metadata", {}).get("request_id", "")
BUS.send(name, "lead", "Shutting down gracefully.",
"shutdown_response",
{"request_id": req_id, "approve": True})
print(f" \033[35m[protocol] {name} approved shutdown "
f"in idle ({req_id})\033[0m")
return "shutdown"
messages.append({"role": "user",
"content": "<inbox>" + json.dumps(inbox) + "</inbox>"})
print(f" \033[36m[idle] {name} found inbox messages\033[0m")
return "work"
unclaimed = scan_unclaimed_tasks()
if unclaimed:
task_data = unclaimed[0]
result = claim_task(task_data["id"], agent_name)
if "Claimed" in result:
wt_info = ""
if task_data.get("worktree"):
wt_path = WORKTREES_DIR / task_data["worktree"]
wt_info = f"\nWork directory: {wt_path}"
messages.append({"role": "user",
"content": f"<auto-claimed>Task {task_data['id']}: "
f"{task_data['subject']}{wt_info}</auto-claimed>"})
print(f" \033[32m[idle] {name} auto-claimed: "
f"{task_data['subject']}\033[0m")
return "work"
print(f" \033[33m[idle] {name} claim failed: "
f"{result}\033[0m")
print(f" \033[31m[idle] {name} timeout ({IDLE_TIMEOUT}s)\033[0m")
return "timeout"
# ── Teammate Thread (from s15 + s16 + s17 + s18) ──
def spawn_teammate_thread(name: str, role: str, prompt: str) -> str:
if name in active_teammates:
return f"Teammate '{name}' already exists"
system = (f"You are '{name}', a {role}. "
f"Use tools to complete tasks. "
f"You can list and claim tasks from the board. "
f"If a task has a worktree, work in that directory.")
def handle_inbox_message(name: str, msg: dict, messages: list):
msg_type = msg.get("type", "message")
meta = msg.get("metadata", {})
req_id = meta.get("request_id", "")
if msg_type == "shutdown_request":
BUS.send(name, "lead", "Shutting down gracefully.",
"shutdown_response",
{"request_id": req_id, "approve": True})
print(f" \033[35m[protocol] {name} approved shutdown "
f"({req_id})\033[0m")
return True
if msg_type == "plan_approval_response":
approve = meta.get("approve", False)
if approve:
messages.append({"role": "user",
"content": "[Plan approved] Proceed with the task."})
else:
messages.append({"role": "user",
"content": f"[Plan rejected] Feedback: {msg['content']}"})
return False
def run():
# Track current worktree for this teammate's cwd
wt_ctx = {"path": None}
def _wt_cwd() -> Path | None:
p = wt_ctx["path"]
return Path(p) if p else None
def _run_bash(command: str) -> str:
return run_bash(command, cwd=_wt_cwd())
def _run_read(path: str) -> str:
return run_read(path, cwd=_wt_cwd())
def _run_write(path: str, content: str) -> str:
return run_write(path, content, cwd=_wt_cwd())
def _run_list_tasks():
tasks = list_tasks()
if not tasks:
return "No tasks."
return "\n".join(
f" {t.id}: {t.subject} [{t.status}]"
+ (f" (wt:{t.worktree})" if t.worktree else "")
for t in tasks)
def _run_claim_task(task_id: str):
result = claim_task(task_id, owner=name)
if "Claimed" in result:
# Set worktree cwd if task has one
task = load_task(task_id)
if task.worktree:
wt_ctx["path"] = str(WORKTREES_DIR / task.worktree)
else:
wt_ctx["path"] = None
return result
def _run_complete_task(task_id: str):
result = complete_task(task_id)
wt_ctx["path"] = None
return result
messages = [{"role": "user", "content": prompt}]
sub_tools = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]}},
{"name": "read_file", "description": "Read file.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"]}},
{"name": "write_file", "description": "Write file.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"content": {"type": "string"}},
"required": ["path", "content"]}},
{"name": "send_message",
"description": "Send message to another agent.",
"input_schema": {"type": "object",
"properties": {"to": {"type": "string"},
"content": {"type": "string"}},
"required": ["to", "content"]}},
{"name": "submit_plan",
"description": "Submit a plan for Lead approval.",
"input_schema": {"type": "object",
"properties": {"plan": {"type": "string"}},
"required": ["plan"]}},
{"name": "list_tasks",
"description": "List all tasks on the board.",
"input_schema": {"type": "object", "properties": {},
"required": []}},
{"name": "claim_task",
"description": "Claim a pending task.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "complete_task",
"description": "Mark an in-progress task as completed.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
]
sub_handlers = {
"bash": _run_bash, "read_file": _run_read,
"write_file": _run_write,
"send_message": lambda to, content: (BUS.send(name, to, content),
"Sent")[1],
"submit_plan": lambda plan: _teammate_submit_plan(name, plan),
"list_tasks": _run_list_tasks,
"claim_task": _run_claim_task,
"complete_task": _run_complete_task,
}
# Outer loop: WORK → IDLE cycle
while True:
if len(messages) <= 3:
messages.insert(0, {"role": "user",
"content": f"<identity>You are '{name}', role: {role}. "
f"Continue your work.</identity>"})
# WORK phase
should_shutdown = False
for _ in range(10):
inbox = BUS.read_inbox(name)
for msg in inbox:
stopped = handle_inbox_message(name, msg, messages)
if stopped:
should_shutdown = True
break
if should_shutdown:
break
if inbox and not should_shutdown:
non_protocol = [m for m in inbox
if m.get("type") == "message"]
if non_protocol:
messages.append({"role": "user",
"content": "<inbox>" + json.dumps(non_protocol) + "</inbox>"})
try:
response = client.messages.create(
model=MODEL, system=system, messages=messages[-20:],
tools=sub_tools, max_tokens=8000)
except Exception:
break
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
handler = sub_handlers.get(block.name)
output = handler(**block.input) if handler else "Unknown"
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)})
messages.append({"role": "user", "content": results})
if should_shutdown:
break
# IDLE phase
idle_result = idle_poll(name, messages, name, role)
if idle_result == "shutdown":
break
if idle_result == "timeout":
break
# Summary
summary = "Done."
for msg in reversed(messages):
if msg["role"] == "assistant" and isinstance(msg["content"], list):
for b in msg["content"]:
if getattr(b, "type", None) == "text":
summary = b.text
break
else:
continue
break
BUS.send(name, "lead", summary, "result")
active_teammates.pop(name, None)
print(f" \033[32m[teammate] {name} finished\033[0m")
active_teammates[name] = True
threading.Thread(target=run, daemon=True).start()
print(f" \033[36m[teammate] {name} spawned as {role}\033[0m")
return f"Teammate '{name}' spawned as {role} (autonomous)"
def _teammate_submit_plan(from_name: str, plan: str) -> str:
req_id = new_request_id()
pending_requests[req_id] = ProtocolState(
request_id=req_id, type="plan_approval",
sender=from_name, target="lead",
status="pending", payload=plan)
BUS.send(from_name, "lead", plan,
"plan_approval_request",
{"request_id": req_id})
return f"Plan submitted ({req_id}). Waiting for approval..."
# ── Lead Protocol Tools (from s16) ──
def run_request_shutdown(teammate: str) -> str:
req_id = new_request_id()
pending_requests[req_id] = ProtocolState(
request_id=req_id, type="shutdown",
sender="lead", target=teammate,
status="pending", payload="")
BUS.send("lead", teammate, "Please shut down gracefully.",
"shutdown_request",
{"request_id": req_id})
print(f" \033[35m[protocol] shutdown_request → {teammate} "
f"({req_id})\033[0m")
return f"Shutdown request sent to {teammate} (req: {req_id})"
def run_request_plan(teammate: str, task: str) -> str:
BUS.send("lead", teammate, f"Please submit a plan for: {task}",
"message")
return f"Asked {teammate} to submit a plan"
def run_review_plan(request_id: str, approve: bool,
feedback: str = "") -> str:
state = pending_requests.get(request_id)
if not state:
return f"Request {request_id} not found"
if state.status != "pending":
return f"Request {request_id} already {state.status}"
state.status = "approved" if approve else "rejected"
BUS.send("lead", state.sender,
feedback or ("Approved" if approve else "Rejected"),
"plan_approval_response",
{"request_id": request_id, "approve": approve})
icon = "" if approve else ""
print(f" \033[32m[protocol] plan {icon} ({request_id})\033[0m")
return f"Plan {'approved' if approve else 'rejected'} ({request_id})"
# ── Lead Worktree Tools (s18 new) ──
def run_create_worktree(name: str, task_id: str = "") -> str:
return create_worktree(name, task_id)
def run_remove_worktree(name: str, discard_changes: bool = False) -> str:
return remove_worktree(name, discard_changes)
def run_keep_worktree(name: str) -> str:
return keep_worktree(name)
# ── Basic tool handlers ──
def run_create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> str:
task = create_task(subject, description, blockedBy)
deps = f" (blockedBy: {', '.join(blockedBy)})" if blockedBy else ""
print(f" \033[34m[create] {task.subject}{deps}\033[0m")
return f"Created {task.id}: {task.subject}{deps}"
def run_list_tasks() -> str:
tasks = list_tasks()
if not tasks:
return "No tasks."
return "\n".join(
f" {t.id}: {t.subject} [{t.status}]"
+ (f" (wt:{t.worktree})" if t.worktree else "")
for t in tasks)
def run_get_task(task_id: str) -> str:
return get_task_json(task_id)
def run_claim_task(task_id: str) -> str:
return claim_task(task_id, owner="agent")
def run_complete_task(task_id: str) -> str:
return complete_task(task_id)
def run_spawn_teammate(name: str, role: str, prompt: str) -> str:
return spawn_teammate_thread(name, role, prompt)
def run_send_message(to: str, content: str) -> str:
BUS.send("lead", to, content)
return f"Sent to {to}"
def run_check_inbox() -> str:
msgs = consume_lead_inbox(route_protocol=True)
if not msgs:
return "(inbox empty)"
lines = []
for m in msgs:
meta = m.get("metadata", {})
req_id = meta.get("request_id", "")
tag = f" [{m['type']} req:{req_id}]" if req_id else f" [{m['type']}]"
lines.append(f" [{m['from']}]{tag} {m['content'][:200]}")
return "\n".join(lines)
# ── Tool Definitions ──
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"limit": {"type": "integer"}},
"required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"content": {"type": "string"}},
"required": ["path", "content"]}},
{"name": "create_task",
"description": "Create a task.",
"input_schema": {"type": "object",
"properties": {"subject": {"type": "string"},
"description": {"type": "string"},
"blockedBy": {"type": "array",
"items": {"type": "string"}}},
"required": ["subject"]}},
{"name": "list_tasks",
"description": "List all tasks.",
"input_schema": {"type": "object", "properties": {}, "required": []}},
{"name": "get_task",
"description": "Get full details of a specific task.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "claim_task",
"description": "Claim a pending task.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "complete_task",
"description": "Complete an in-progress task.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "spawn_teammate",
"description": "Spawn an autonomous teammate agent.",
"input_schema": {"type": "object",
"properties": {"name": {"type": "string"},
"role": {"type": "string"},
"prompt": {"type": "string"}},
"required": ["name", "role", "prompt"]}},
{"name": "send_message",
"description": "Send message to a teammate.",
"input_schema": {"type": "object",
"properties": {"to": {"type": "string"},
"content": {"type": "string"}},
"required": ["to", "content"]}},
{"name": "check_inbox",
"description": "Check inbox for messages and protocol responses.",
"input_schema": {"type": "object", "properties": {}, "required": []}},
{"name": "request_shutdown",
"description": "Request a teammate to shut down gracefully.",
"input_schema": {"type": "object",
"properties": {"teammate": {"type": "string"}},
"required": ["teammate"]}},
{"name": "request_plan",
"description": "Ask a teammate to submit a plan for review.",
"input_schema": {"type": "object",
"properties": {"teammate": {"type": "string"},
"task": {"type": "string"}},
"required": ["teammate", "task"]}},
{"name": "review_plan",
"description": "Approve or reject a submitted plan.",
"input_schema": {"type": "object",
"properties": {
"request_id": {"type": "string"},
"approve": {"type": "boolean"},
"feedback": {"type": "string"}},
"required": ["request_id", "approve"]}},
# s18 new: worktree tools
{"name": "create_worktree",
"description": "Create an isolated git worktree with its own branch.",
"input_schema": {"type": "object",
"properties": {"name": {"type": "string"},
"task_id": {"type": "string"}},
"required": ["name"]}},
{"name": "remove_worktree",
"description": "Remove a worktree. Refuses if uncommitted changes unless discard_changes=true.",
"input_schema": {"type": "object",
"properties": {"name": {"type": "string"},
"discard_changes": {"type": "boolean"}},
"required": ["name"]}},
{"name": "keep_worktree",
"description": "Keep a worktree for manual review.",
"input_schema": {"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"]}},
]
TOOL_HANDLERS = {
"bash": run_bash, "read_file": run_read, "write_file": run_write,
"create_task": run_create_task, "list_tasks": run_list_tasks,
"get_task": run_get_task,
"claim_task": run_claim_task, "complete_task": run_complete_task,
"spawn_teammate": run_spawn_teammate,
"send_message": run_send_message, "check_inbox": run_check_inbox,
"request_shutdown": run_request_shutdown,
"request_plan": run_request_plan, "review_plan": run_review_plan,
"create_worktree": run_create_worktree,
"remove_worktree": run_remove_worktree,
"keep_worktree": run_keep_worktree,
}
# ── Context ──
MEMORY_DIR = WORKDIR / ".memory"
MEMORY_INDEX = MEMORY_DIR / "MEMORY.md"
def update_context(context: dict, messages: list) -> dict:
memories = ""
if MEMORY_INDEX.exists():
memories = MEMORY_INDEX.read_text()[:2000]
return {"memories": memories}
# ── Agent Loop ──
def agent_loop(messages: list, context: dict):
system = get_system_prompt(context)
while True:
try:
response = client.messages.create(
model=MODEL, system=system, messages=messages,
tools=TOOLS, max_tokens=8000)
except Exception as e:
messages.append({"role": "assistant", "content": [
{"type": "text", "text": f"[Error] {type(e).__name__}: {e}"}]})
return
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type != "tool_use":
continue
print(f"\033[36m> {block.name}\033[0m")
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else "Unknown"
print(str(output)[:300])
results.append({"type": "tool_result",
"tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": results})
context = update_context(context, messages)
system = get_system_prompt(context)
if __name__ == "__main__":
print("s18: worktree isolation")
print("Enter a question, press Enter to send. Type q to quit.\n")
history = []
context = {"memories": ""}
while True:
try:
query = input("\033[36ms18 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history, context)
context = update_context(context, history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text":
print(block.text)
# Consume lead inbox: route protocol + inject into history
inbox = consume_lead_inbox(route_protocol=True)
if inbox:
inbox_text = "\n".join(
f"From {m['from']} [{m.get('type', 'message')}]: "
f"{m['content'][:200]}" for m in inbox)
history.append({"role": "user",
"content": f"[Inbox]\n{inbox_text}"})
print()