#!/usr/bin/env python3 """ s10_team_protocols.py - Team Protocols Shutdown protocol and plan approval protocol, both using the same request_id correlation pattern. Builds on s09's team messaging. Shutdown FSM: pending -> approved | rejected Lead Teammate +---------------------+ +---------------------+ | shutdown_request | | | | { | -------> | receives request | | request_id: abc | | decides: approve? | | } | | | +---------------------+ +---------------------+ | +---------------------+ +-------v-------------+ | shutdown_response | <------- | shutdown_response | | { | | { | | request_id: abc | | request_id: abc | | approve: true | | approve: true | | } | | } | +---------------------+ +---------------------+ | v status -> "shutdown", thread stops Plan approval FSM: pending -> approved | rejected Teammate Lead +---------------------+ +---------------------+ | plan_approval | | | | submit: {plan:"..."}| -------> | reviews plan text | +---------------------+ | approve/reject? | +---------------------+ | +---------------------+ +-------v-------------+ | plan_approval_resp | <------- | plan_approval | | {approve: true} | | review: {req_id, | +---------------------+ | approve: true} | +---------------------+ Trackers: {request_id: {"target|from": name, "status": "pending|..."}} Key insight: "Same request_id correlation pattern, two domains." """ import json import os import subprocess import threading import time import uuid from pathlib import Path from anthropic import Anthropic from dotenv import load_dotenv load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) MODEL = os.environ["MODEL_ID"] TEAM_DIR = WORKDIR / ".team" INBOX_DIR = TEAM_DIR / "inbox" SYSTEM = f"You are a team lead at {WORKDIR}. Manage teammates with shutdown and plan approval protocols." VALID_MSG_TYPES = { "message", "broadcast", "shutdown_request", "shutdown_response", "plan_approval_response", } # -- Request trackers: correlate by request_id -- shutdown_requests = {} plan_requests = {} _tracker_lock = threading.Lock() # -- MessageBus: JSONL inbox per teammate -- class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: if msg_type not in VALID_MSG_TYPES: return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}" msg = { "type": msg_type, "from": sender, "content": content, "timestamp": time.time(), } if extra: msg.update(extra) inbox_path = self.dir / f"{to}.jsonl" with open(inbox_path, "a") as f: f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] messages = [] for line in inbox_path.read_text().strip().splitlines(): if line: messages.append(json.loads(line)) inbox_path.write_text("") return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: count = 0 for name in teammates: if name != sender: self.send(sender, name, content, "broadcast") count += 1 return f"Broadcast to {count} teammates" BUS = MessageBus(INBOX_DIR) # -- TeammateManager with shutdown + plan approval -- class TeammateManager: def __init__(self, team_dir: Path): self.dir = team_dir self.dir.mkdir(exist_ok=True) self.config_path = self.dir / "config.json" self.config = self._load_config() self.threads = {} def _load_config(self) -> dict: if self.config_path.exists(): return json.loads(self.config_path.read_text()) return {"team_name": "default", "members": []} def _save_config(self): self.config_path.write_text(json.dumps(self.config, indent=2)) def _find_member(self, name: str) -> dict: for m in self.config["members"]: if m["name"] == name: return m return None def spawn(self, name: str, role: str, prompt: str) -> str: member = self._find_member(name) if member: if member["status"] not in ("idle", "shutdown"): return f"Error: '{name}' is currently {member['status']}" member["status"] = "working" member["role"] = role else: member = {"name": name, "role": role, "status": "working"} self.config["members"].append(member) self._save_config() thread = threading.Thread( target=self._teammate_loop, args=(name, role, prompt), daemon=True, ) self.threads[name] = thread thread.start() return f"Spawned '{name}' (role: {role})" def _teammate_loop(self, name: str, role: str, prompt: str): sys_prompt = ( f"You are '{name}', role: {role}, at {WORKDIR}. " f"Submit plans via plan_approval before major work. " f"Respond to shutdown_request with shutdown_response." ) messages = [{"role": "user", "content": prompt}] tools = self._teammate_tools() should_exit = False for _ in range(50): inbox = BUS.read_inbox(name) for msg in inbox: messages.append({"role": "user", "content": json.dumps(msg)}) if should_exit: break try: response = client.messages.create( model=MODEL, system=sys_prompt, messages=messages, tools=tools, max_tokens=8000, ) except Exception: break messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": break results = [] for block in response.content: if block.type == "tool_use": output = self._exec(name, block.name, block.input) print(f" [{name}] {block.name}: {str(output)[:120]}") results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) if block.name == "shutdown_response" and block.input.get("approve"): should_exit = True messages.append({"role": "user", "content": results}) member = self._find_member(name) if member: member["status"] = "shutdown" if should_exit else "idle" self._save_config() def _exec(self, sender: str, tool_name: str, args: dict) -> str: # these base tools are unchanged from s02 if tool_name == "bash": return _run_bash(args["command"]) if tool_name == "read_file": return _run_read(args["path"]) if tool_name == "write_file": return _run_write(args["path"], args["content"]) if tool_name == "edit_file": return _run_edit(args["path"], args["old_text"], args["new_text"]) if tool_name == "send_message": return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message")) if tool_name == "read_inbox": return json.dumps(BUS.read_inbox(sender), indent=2) if tool_name == "shutdown_response": req_id = args["request_id"] approve = args["approve"] with _tracker_lock: if req_id in shutdown_requests: shutdown_requests[req_id]["status"] = "approved" if approve else "rejected" BUS.send( sender, "lead", args.get("reason", ""), "shutdown_response", {"request_id": req_id, "approve": approve}, ) return f"Shutdown {'approved' if approve else 'rejected'}" if tool_name == "plan_approval": plan_text = args.get("plan", "") req_id = str(uuid.uuid4())[:8] with _tracker_lock: plan_requests[req_id] = {"from": sender, "plan": plan_text, "status": "pending"} BUS.send( sender, "lead", plan_text, "plan_approval_response", {"request_id": req_id, "plan": plan_text}, ) return f"Plan submitted (request_id={req_id}). Waiting for lead approval." return f"Unknown tool: {tool_name}" def _teammate_tools(self) -> list: # these base tools are unchanged from s02 return [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, {"name": "send_message", "description": "Send message to a teammate.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, {"name": "read_inbox", "description": "Read and drain your inbox.", "input_schema": {"type": "object", "properties": {}}}, {"name": "shutdown_response", "description": "Respond to a shutdown request. Approve to shut down, reject to keep working.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "reason": {"type": "string"}}, "required": ["request_id", "approve"]}}, {"name": "plan_approval", "description": "Submit a plan for lead approval. Provide plan text.", "input_schema": {"type": "object", "properties": {"plan": {"type": "string"}}, "required": ["plan"]}}, ] def list_all(self) -> str: if not self.config["members"]: return "No teammates." lines = [f"Team: {self.config['team_name']}"] for m in self.config["members"]: lines.append(f" {m['name']} ({m['role']}): {m['status']}") return "\n".join(lines) def member_names(self) -> list: return [m["name"] for m in self.config["members"]] TEAM = TeammateManager(TEAM_DIR) # -- Base tool implementations (these base tools are unchanged from s02) -- def _safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def _run_bash(command: str) -> str: dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run( command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120, ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def _run_read(path: str, limit: int = None) -> str: try: lines = _safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def _run_write(path: str, content: str) -> str: try: fp = _safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def _run_edit(path: str, old_text: str, new_text: str) -> str: try: fp = _safe_path(path) c = fp.read_text() if old_text not in c: return f"Error: Text not found in {path}" fp.write_text(c.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" # -- Lead-specific protocol handlers -- def handle_shutdown_request(teammate: str) -> str: req_id = str(uuid.uuid4())[:8] with _tracker_lock: shutdown_requests[req_id] = {"target": teammate, "status": "pending"} BUS.send( "lead", teammate, "Please shut down gracefully.", "shutdown_request", {"request_id": req_id}, ) return f"Shutdown request {req_id} sent to '{teammate}' (status: pending)" def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str: with _tracker_lock: req = plan_requests.get(request_id) if not req: return f"Error: Unknown plan request_id '{request_id}'" with _tracker_lock: req["status"] = "approved" if approve else "rejected" BUS.send( "lead", req["from"], feedback, "plan_approval_response", {"request_id": request_id, "approve": approve, "feedback": feedback}, ) return f"Plan {req['status']} for '{req['from']}'" def _check_shutdown_status(request_id: str) -> str: with _tracker_lock: return json.dumps(shutdown_requests.get(request_id, {"error": "not found"})) # -- Lead tool dispatch (12 tools) -- TOOL_HANDLERS = { "bash": lambda **kw: _run_bash(kw["command"]), "read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: _run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]), "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]), "list_teammates": lambda **kw: TEAM.list_all(), "send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")), "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2), "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()), "shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]), "shutdown_response": lambda **kw: _check_shutdown_status(kw.get("request_id", "")), "plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")), } # these base tools are unchanged from s02 TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, {"name": "spawn_teammate", "description": "Spawn a persistent teammate.", "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}}, {"name": "list_teammates", "description": "List all teammates.", "input_schema": {"type": "object", "properties": {}}}, {"name": "send_message", "description": "Send a message to a teammate.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, {"name": "read_inbox", "description": "Read and drain the lead's inbox.", "input_schema": {"type": "object", "properties": {}}}, {"name": "broadcast", "description": "Send a message to all teammates.", "input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}}, {"name": "shutdown_request", "description": "Request a teammate to shut down gracefully. Returns a request_id for tracking.", "input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}}, {"name": "shutdown_response", "description": "Check the status of a shutdown request by request_id.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}}, "required": ["request_id"]}}, {"name": "plan_approval", "description": "Approve or reject a teammate's plan. Provide request_id + approve + optional feedback.", "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}}, ] def agent_loop(messages: list): while True: inbox = BUS.read_inbox("lead") if inbox: messages.append({ "role": "user", "content": f"{json.dumps(inbox, indent=2)}", }) messages.append({ "role": "assistant", "content": "Noted inbox messages.", }) response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": return results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}: {str(output)[:200]}") results.append({ "type": "tool_result", "tool_use_id": block.id, "content": str(output), }) messages.append({"role": "user", "content": results}) if __name__ == "__main__": history = [] while True: try: query = input("\033[36ms10 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break if query.strip() == "/team": print(TEAM.list_all()) continue if query.strip() == "/inbox": print(json.dumps(BUS.read_inbox("lead"), indent=2)) continue history.append({"role": "user", "content": query}) agent_loop(history) response_content = history[-1]["content"] if isinstance(response_content, list): for block in response_content: if hasattr(block, "text"): print(block.text) print()