analysis_claude_code/agents/s10_team_protocols.py

487 lines
21 KiB
Python

#!/usr/bin/env python3
"""
s10_team_protocols.py - Team Protocols
Shutdown protocol and plan approval protocol, both using the same
request_id correlation pattern. Builds on s09's team messaging.
Shutdown FSM: pending -> approved | rejected
Lead Teammate
+---------------------+ +---------------------+
| shutdown_request | | |
| { | -------> | receives request |
| request_id: abc | | decides: approve? |
| } | | |
+---------------------+ +---------------------+
|
+---------------------+ +-------v-------------+
| shutdown_response | <------- | shutdown_response |
| { | | { |
| request_id: abc | | request_id: abc |
| approve: true | | approve: true |
| } | | } |
+---------------------+ +---------------------+
|
v
status -> "shutdown", thread stops
Plan approval FSM: pending -> approved | rejected
Teammate Lead
+---------------------+ +---------------------+
| plan_approval | | |
| submit: {plan:"..."}| -------> | reviews plan text |
+---------------------+ | approve/reject? |
+---------------------+
|
+---------------------+ +-------v-------------+
| plan_approval_resp | <------- | plan_approval |
| {approve: true} | | review: {req_id, |
+---------------------+ | approve: true} |
+---------------------+
Trackers: {request_id: {"target|from": name, "status": "pending|..."}}
Key insight: "Same request_id correlation pattern, two domains."
"""
import json
import os
import subprocess
import threading
import time
import uuid
from pathlib import Path
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TEAM_DIR = WORKDIR / ".team"
INBOX_DIR = TEAM_DIR / "inbox"
SYSTEM = f"You are a team lead at {WORKDIR}. Manage teammates with shutdown and plan approval protocols."
VALID_MSG_TYPES = {
"message",
"broadcast",
"shutdown_request",
"shutdown_response",
"plan_approval_response",
}
# -- Request trackers: correlate by request_id --
shutdown_requests = {}
plan_requests = {}
_tracker_lock = threading.Lock()
# -- MessageBus: JSONL inbox per teammate --
class MessageBus:
def __init__(self, inbox_dir: Path):
self.dir = inbox_dir
self.dir.mkdir(parents=True, exist_ok=True)
def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
if msg_type not in VALID_MSG_TYPES:
return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("")
return messages
def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"
BUS = MessageBus(INBOX_DIR)
# -- TeammateManager with shutdown + plan approval --
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads = {}
def _load_config(self) -> dict:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
return {"team_name": "default", "members": []}
def _save_config(self):
self.config_path.write_text(json.dumps(self.config, indent=2))
def _find_member(self, name: str) -> dict:
for m in self.config["members"]:
if m["name"] == name:
return m
return None
def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
member["role"] = role
else:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"
def _teammate_loop(self, name: str, role: str, prompt: str):
sys_prompt = (
f"You are '{name}', role: {role}, at {WORKDIR}. "
f"Submit plans via plan_approval before major work. "
f"Respond to shutdown_request with shutdown_response."
)
messages = [{"role": "user", "content": prompt}]
tools = self._teammate_tools()
should_exit = False
for _ in range(50):
inbox = BUS.read_inbox(name)
for msg in inbox:
messages.append({"role": "user", "content": json.dumps(msg)})
if should_exit:
break
try:
response = client.messages.create(
model=MODEL,
system=sys_prompt,
messages=messages,
tools=tools,
max_tokens=8000,
)
except Exception:
break
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
output = self._exec(name, block.name, block.input)
print(f" [{name}] {block.name}: {str(output)[:120]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
if block.name == "shutdown_response" and block.input.get("approve"):
should_exit = True
messages.append({"role": "user", "content": results})
member = self._find_member(name)
if member:
member["status"] = "shutdown" if should_exit else "idle"
self._save_config()
def _exec(self, sender: str, tool_name: str, args: dict) -> str:
# these base tools are unchanged from s02
if tool_name == "bash":
return _run_bash(args["command"])
if tool_name == "read_file":
return _run_read(args["path"])
if tool_name == "write_file":
return _run_write(args["path"], args["content"])
if tool_name == "edit_file":
return _run_edit(args["path"], args["old_text"], args["new_text"])
if tool_name == "send_message":
return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
if tool_name == "read_inbox":
return json.dumps(BUS.read_inbox(sender), indent=2)
if tool_name == "shutdown_response":
req_id = args["request_id"]
approve = args["approve"]
with _tracker_lock:
if req_id in shutdown_requests:
shutdown_requests[req_id]["status"] = "approved" if approve else "rejected"
BUS.send(
sender, "lead", args.get("reason", ""),
"shutdown_response", {"request_id": req_id, "approve": approve},
)
return f"Shutdown {'approved' if approve else 'rejected'}"
if tool_name == "plan_approval":
plan_text = args.get("plan", "")
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
plan_requests[req_id] = {"from": sender, "plan": plan_text, "status": "pending"}
BUS.send(
sender, "lead", plan_text, "plan_approval_response",
{"request_id": req_id, "plan": plan_text},
)
return f"Plan submitted (request_id={req_id}). Waiting for lead approval."
return f"Unknown tool: {tool_name}"
def _teammate_tools(self) -> list:
# these base tools are unchanged from s02
return [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "send_message", "description": "Send message to a teammate.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain your inbox.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "shutdown_response", "description": "Respond to a shutdown request. Approve to shut down, reject to keep working.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "reason": {"type": "string"}}, "required": ["request_id", "approve"]}},
{"name": "plan_approval", "description": "Submit a plan for lead approval. Provide plan text.",
"input_schema": {"type": "object", "properties": {"plan": {"type": "string"}}, "required": ["plan"]}},
]
def list_all(self) -> str:
if not self.config["members"]:
return "No teammates."
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
return "\n".join(lines)
def member_names(self) -> list:
return [m["name"] for m in self.config["members"]]
TEAM = TeammateManager(TEAM_DIR)
# -- Base tool implementations (these base tools are unchanged from s02) --
def _safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def _run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def _run_read(path: str, limit: int = None) -> str:
try:
lines = _safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"
def _run_write(path: str, content: str) -> str:
try:
fp = _safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"
def _run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = _safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
# -- Lead-specific protocol handlers --
def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
BUS.send(
"lead", teammate, "Please shut down gracefully.",
"shutdown_request", {"request_id": req_id},
)
return f"Shutdown request {req_id} sent to '{teammate}' (status: pending)"
def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
with _tracker_lock:
req = plan_requests.get(request_id)
if not req:
return f"Error: Unknown plan request_id '{request_id}'"
with _tracker_lock:
req["status"] = "approved" if approve else "rejected"
BUS.send(
"lead", req["from"], feedback, "plan_approval_response",
{"request_id": request_id, "approve": approve, "feedback": feedback},
)
return f"Plan {req['status']} for '{req['from']}'"
def _check_shutdown_status(request_id: str) -> str:
with _tracker_lock:
return json.dumps(shutdown_requests.get(request_id, {"error": "not found"}))
# -- Lead tool dispatch (12 tools) --
TOOL_HANDLERS = {
"bash": lambda **kw: _run_bash(kw["command"]),
"read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: _run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
"list_teammates": lambda **kw: TEAM.list_all(),
"send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
"read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
"broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
"shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]),
"shutdown_response": lambda **kw: _check_shutdown_status(kw.get("request_id", "")),
"plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")),
}
# these base tools are unchanged from s02
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "spawn_teammate", "description": "Spawn a persistent teammate.",
"input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}},
{"name": "list_teammates", "description": "List all teammates.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "send_message", "description": "Send a message to a teammate.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain the lead's inbox.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "broadcast", "description": "Send a message to all teammates.",
"input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}},
{"name": "shutdown_request", "description": "Request a teammate to shut down gracefully. Returns a request_id for tracking.",
"input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}},
{"name": "shutdown_response", "description": "Check the status of a shutdown request by request_id.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}}, "required": ["request_id"]}},
{"name": "plan_approval", "description": "Approve or reject a teammate's plan. Provide request_id + approve + optional feedback.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}},
]
def agent_loop(messages: list):
while True:
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({
"role": "user",
"content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
})
messages.append({
"role": "assistant",
"content": "Noted inbox messages.",
})
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}: {str(output)[:200]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})
if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms10 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
if query.strip() == "/team":
print(TEAM.list_all())
continue
if query.strip() == "/inbox":
print(json.dumps(BUS.read_inbox("lead"), indent=2))
continue
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()