Files
analysis_claude_code/s15_agent_teams/code.py

929 lines
33 KiB
Python

#!/usr/bin/env python3
"""
s15: Agent Teams — MessageBus + spawn_teammate_thread + inbox injection.
Run: python s15_agent_teams/code.py
Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY
Changes from s14:
- MessageBus class: file-based mailboxes (.mailboxes/*.jsonl)
- spawn_teammate_thread: creates teammate in background thread
- Teammate runs own simplified agent_loop (bash, read, write, send_message)
- Lead tools: spawn_teammate, send_message, check_inbox (3 new)
- Lead inbox: teammate messages injected into history (not just printed)
- Teaching version: teammates limited to 10 rounds (real CC uses idle loop)
ASCII flow:
Lead: cron_queue → messages → prompt → LLM → TOOLS ────→ loop
↑ ↓ |
└── inbox ← MessageBus ← teammate.send_message ←┘
Teammate: inbox → LLM → bash/read/write/send → loop (max 10 turns)
"""
import os, subprocess, json, time, random, threading
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
try:
import readline
readline.parse_and_bind('set bind-tty-special-chars off')
except ImportError:
pass
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
MEMORY_DIR = WORKDIR / ".memory"
MEMORY_INDEX = MEMORY_DIR / "MEMORY.md"
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
# ── Task System (from s12, synced) ──
TASKS_DIR = WORKDIR / ".tasks"
TASKS_DIR.mkdir(exist_ok=True)
@dataclass
class Task:
id: str
subject: str
description: str
status: str # pending | in_progress | completed
owner: str | None
blockedBy: list[str]
def _task_path(task_id: str) -> Path:
return TASKS_DIR / f"{task_id}.json"
def create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> Task:
task = Task(
id=f"task_{int(time.time())}_{random.randint(0, 9999):04d}",
subject=subject, description=description,
status="pending", owner=None,
blockedBy=blockedBy or [],
)
save_task(task)
return task
def save_task(task: Task):
_task_path(task.id).write_text(json.dumps(asdict(task), indent=2))
def load_task(task_id: str) -> Task:
return Task(**json.loads(_task_path(task_id).read_text()))
def list_tasks() -> list[Task]:
return [Task(**json.loads(p.read_text()))
for p in sorted(TASKS_DIR.glob("task_*.json"))]
def get_task(task_id: str) -> str:
"""Return full task details as JSON."""
task = load_task(task_id)
return json.dumps(asdict(task), indent=2)
def can_start(task_id: str) -> bool:
"""Check if all blockedBy dependencies are completed.
Missing dependencies are treated as blocked."""
task = load_task(task_id)
for dep_id in task.blockedBy:
if not _task_path(dep_id).exists():
return False
if load_task(dep_id).status != "completed":
return False
return True
def claim_task(task_id: str, owner: str = "agent") -> str:
task = load_task(task_id)
if task.status != "pending":
return f"Task {task_id} is {task.status}, cannot claim"
if not can_start(task_id):
deps = [d for d in task.blockedBy
if not _task_path(d).exists() or load_task(d).status != "completed"]
return f"Blocked by: {deps}"
task.owner = owner
task.status = "in_progress"
save_task(task)
print(f" \033[36m[claim] {task.subject} → in_progress (owner: {owner})\033[0m")
return f"Claimed {task.id} ({task.subject})"
def complete_task(task_id: str) -> str:
task = load_task(task_id)
if task.status != "in_progress":
return f"Task {task_id} is {task.status}, cannot complete"
task.status = "completed"
save_task(task)
unblocked = [t.subject for t in list_tasks()
if t.status == "pending" and t.blockedBy and can_start(t.id)]
print(f" \033[32m[complete] {task.subject}\033[0m")
msg = f"Completed {task.id} ({task.subject})"
if unblocked:
msg += f"\nUnblocked: {', '.join(unblocked)}"
print(f" \033[33m[unblocked] {', '.join(unblocked)}\033[0m")
return msg
# ── Prompt Assembly (from s10, synced) ──
PROMPT_SECTIONS = {
"identity": "You are a coding agent. Act, don't explain.",
"tools": "Available tools: bash, read_file, write_file, "
"get_task, create_task, list_tasks, claim_task, complete_task, "
"schedule_cron, list_crons, cancel_cron, "
"spawn_teammate, send_message, check_inbox.",
"workspace": f"Working directory: {WORKDIR}",
"memory": "Relevant memories are injected below when available.",
}
def assemble_system_prompt(context: dict) -> str:
sections = [PROMPT_SECTIONS["identity"],
PROMPT_SECTIONS["tools"],
PROMPT_SECTIONS["workspace"]]
memories = context.get("memories", "")
if memories:
sections.append(f"Relevant memories:\n{memories}")
return "\n\n".join(sections)
_last_context_key, _last_prompt = None, None
def get_system_prompt(context: dict) -> str:
global _last_context_key, _last_prompt
key = json.dumps(context, sort_keys=True, ensure_ascii=False, default=str)
if key == _last_context_key and _last_prompt:
return _last_prompt
_last_context_key = key
_last_prompt = assemble_system_prompt(context)
return _last_prompt
# ── Tools ──
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str, run_in_background: bool = False) -> str:
# run_in_background is handled by agent_loop dispatch, not here
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int | None = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"
# Task tools
def run_create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> str:
task = create_task(subject, description, blockedBy)
deps = f" (blockedBy: {', '.join(blockedBy)})" if blockedBy else ""
print(f" \033[34m[create] {task.subject}{deps}\033[0m")
return f"Created {task.id}: {task.subject}{deps}"
def run_list_tasks() -> str:
tasks = list_tasks()
if not tasks:
return "No tasks. Use create_task to add some."
lines = []
for t in tasks:
icon = {"pending": "", "in_progress": "",
"completed": ""}.get(t.status, "?")
deps = f" (blockedBy: {', '.join(t.blockedBy)})" if t.blockedBy else ""
owner = f" [{t.owner}]" if t.owner else ""
lines.append(f" {icon} {t.id}: {t.subject} "
f"[{t.status}]{owner}{deps}")
return "\n".join(lines)
def run_get_task(task_id: str) -> str:
try:
return get_task(task_id)
except FileNotFoundError:
return f"Error: Task {task_id} not found"
def run_claim_task(task_id: str) -> str:
return claim_task(task_id, owner="agent")
def run_complete_task(task_id: str) -> str:
return complete_task(task_id)
# ── Background Tasks (from s13, synced) ──
_bg_counter = 0
background_tasks: dict[str, dict] = {}
background_results: dict[str, str] = {}
background_lock = threading.Lock()
def is_slow_operation(tool_name: str, tool_input: dict) -> bool:
"""Fallback heuristic: commands likely to take > 30s."""
if tool_name != "bash":
return False
cmd = tool_input.get("command", "").lower()
slow_keywords = ["install", "build", "test", "deploy", "compile",
"docker build", "pip install", "npm install",
"cargo build", "pytest", "make"]
return any(kw in cmd for kw in slow_keywords)
def should_run_background(tool_name: str, tool_input: dict) -> bool:
"""Model explicit request takes priority; fallback to heuristic."""
if tool_input.get("run_in_background"):
return True
return is_slow_operation(tool_name, tool_input)
def execute_tool(block) -> str:
"""Execute a tool call block, return output."""
handler = {
"bash": run_bash, "read_file": run_read, "write_file": run_write,
"create_task": run_create_task, "list_tasks": run_list_tasks,
"get_task": run_get_task, "claim_task": run_claim_task,
"complete_task": run_complete_task,
"schedule_cron": run_schedule_cron, "list_crons": run_list_crons,
"cancel_cron": run_cancel_cron,
"spawn_teammate": run_spawn_teammate,
"send_message": run_send_message, "check_inbox": run_check_inbox,
}.get(block.name)
if handler:
return handler(**block.input)
return f"Unknown tool: {block.name}"
def start_background_task(block) -> str:
"""Run tool in a daemon thread. Returns background task ID."""
global _bg_counter
_bg_counter += 1
bg_id = f"bg_{_bg_counter:04d}"
cmd = block.input.get("command", block.name)
def worker():
result = execute_tool(block)
with background_lock:
background_tasks[bg_id]["status"] = "completed"
background_results[bg_id] = result
with background_lock:
background_tasks[bg_id] = {
"tool_use_id": block.id,
"command": cmd,
"status": "running",
}
threading.Thread(target=worker, daemon=True).start()
print(f" \033[33m[background] dispatched {bg_id}: {cmd[:40]}\033[0m")
return bg_id
def collect_background_results() -> list[str]:
"""Collect completed background results as task_notification messages."""
with background_lock:
ready_ids = [bid for bid, task in background_tasks.items()
if task["status"] == "completed"]
notifications = []
for bg_id in ready_ids:
with background_lock:
task = background_tasks.pop(bg_id)
output = background_results.pop(bg_id, "")
summary = output[:200] if len(output) > 200 else output
notifications.append(
f"<task_notification>\n"
f" <task_id>{bg_id}</task_id>\n"
f" <status>completed</status>\n"
f" <command>{task['command']}</command>\n"
f" <summary>{summary}</summary>\n"
f"</task_notification>")
print(f" \033[32m[background done] {bg_id}: "
f"{task['command'][:40]} ({len(output)} chars)\033[0m")
return notifications
# ── Cron Scheduler (from s14, synced) ──
DURABLE_PATH = WORKDIR / ".scheduled_tasks.json"
@dataclass
class CronJob:
id: str
cron: str # "0 9 * * *"
prompt: str # message to inject when fired
recurring: bool # True = recurring, False = one-shot
durable: bool # True = persist to disk
scheduled_jobs: dict[str, CronJob] = {}
cron_queue: list[CronJob] = []
cron_lock = threading.Lock()
_last_fired: dict[str, str] = {} # job_id → "YYYY-MM-DD HH:MM"
def _cron_field_matches(field: str, value: int) -> bool:
"""Match a single cron field against a value."""
if field == "*":
return True
if field.startswith("*/"):
step = int(field[2:])
return step > 0 and value % step == 0
if "," in field:
return any(_cron_field_matches(f.strip(), value)
for f in field.split(","))
if "-" in field:
lo, hi = field.split("-", 1)
return int(lo) <= value <= int(hi)
return value == int(field)
def cron_matches(cron_expr: str, dt: datetime) -> bool:
"""Check if a 5-field cron expression matches the given datetime.
Standard cron semantics: DOM and DOW use OR when both are constrained."""
fields = cron_expr.strip().split()
if len(fields) != 5:
return False
minute, hour, dom, month, dow = fields
dow_val = (dt.weekday() + 1) % 7 # Python Monday=0 → cron Sunday=0
m = _cron_field_matches(minute, dt.minute)
h = _cron_field_matches(hour, dt.hour)
dom_ok = _cron_field_matches(dom, dt.day)
month_ok = _cron_field_matches(month, dt.month)
dow_ok = _cron_field_matches(dow, dow_val)
# Minute, hour, month must all match
if not (m and h and month_ok):
return False
# DOM and DOW: if both constrained, either matching is enough (OR)
dom_unconstrained = dom == "*"
dow_unconstrained = dow == "*"
if dom_unconstrained and dow_unconstrained:
return True
if dom_unconstrained:
return dow_ok
if dow_unconstrained:
return dom_ok
return dom_ok or dow_ok
def _validate_cron_field(field: str, lo: int, hi: int) -> str | None:
"""Validate a single cron field value is within [lo, hi]."""
if field == "*":
return None
if field.startswith("*/"):
step_str = field[2:]
if not step_str.isdigit():
return f"Invalid step: {field}"
step = int(step_str)
if step <= 0:
return f"Step must be > 0: {field}"
return None
if "," in field:
for part in field.split(","):
err = _validate_cron_field(part.strip(), lo, hi)
if err: return err
return None
if "-" in field:
parts = field.split("-", 1)
if not parts[0].isdigit() or not parts[1].isdigit():
return f"Invalid range: {field}"
a, b = int(parts[0]), int(parts[1])
if a < lo or a > hi or b < lo or b > hi:
return f"Range {field} out of bounds [{lo}-{hi}]"
if a > b:
return f"Range start > end: {field}"
return None
if not field.isdigit():
return f"Invalid field: {field}"
val = int(field)
if val < lo or val > hi:
return f"Value {val} out of bounds [{lo}-{hi}]"
return None
def validate_cron(cron_expr: str) -> str | None:
"""Validate a cron expression. Returns error message or None."""
fields = cron_expr.strip().split()
if len(fields) != 5:
return f"Expected 5 fields, got {len(fields)}"
bounds = [(0, 59), (0, 23), (1, 31), (1, 12), (0, 6)]
names = ["minute", "hour", "day-of-month", "month", "day-of-week"]
for i, (field, (lo, hi), name) in enumerate(zip(fields, bounds, names)):
err = _validate_cron_field(field, lo, hi)
if err:
return f"{name}: {err}"
return None
def save_durable_jobs():
"""Persist durable jobs to .scheduled_tasks.json."""
durable = [asdict(j) for j in scheduled_jobs.values() if j.durable]
DURABLE_PATH.write_text(json.dumps(durable, indent=2))
def load_durable_jobs():
"""Load durable jobs from disk on startup."""
if not DURABLE_PATH.exists():
return
try:
jobs = json.loads(DURABLE_PATH.read_text())
for j in jobs:
job = CronJob(**j)
err = validate_cron(job.cron)
if err:
print(f" \033[31m[cron] skipping invalid job {job.id}: {err}\033[0m")
continue
scheduled_jobs[job.id] = job
valid = [j for j in jobs if j["id"] in scheduled_jobs]
if valid:
print(f" \033[35m[cron] loaded {len(valid)} durable job(s)\033[0m")
except Exception:
pass
def schedule_job(cron: str, prompt: str, recurring: bool = True,
durable: bool = True) -> CronJob | str:
"""Register a new cron job. Returns CronJob or error string."""
err = validate_cron(cron)
if err:
return err
job = CronJob(
id=f"cron_{random.randint(0, 999999):06d}",
cron=cron, prompt=prompt,
recurring=recurring, durable=durable,
)
with cron_lock:
scheduled_jobs[job.id] = job
if durable:
save_durable_jobs()
print(f" \033[35m[cron register] {job.id} '{cron}'{prompt[:40]}\033[0m")
return job
def cancel_job(job_id: str) -> str:
"""Cancel a cron job."""
with cron_lock:
job = scheduled_jobs.pop(job_id, None)
if not job:
return f"Job {job_id} not found"
if job.durable:
save_durable_jobs()
print(f" \033[31m[cron cancel] {job_id}\033[0m")
return f"Cancelled {job_id}"
def cron_scheduler_loop():
"""Independent daemon thread: poll every 1s, fire matching jobs.
Individual job errors are caught to prevent one bad job from
killing the entire scheduler thread."""
while True:
time.sleep(1)
now = datetime.now()
# Date-aware marker prevents daily jobs from skipping on day 2+
minute_marker = now.strftime("%Y-%m-%d %H:%M")
with cron_lock:
for job in list(scheduled_jobs.values()):
try:
if cron_matches(job.cron, now):
if _last_fired.get(job.id) != minute_marker:
cron_queue.append(job)
_last_fired[job.id] = minute_marker
print(f" \033[35m[cron fire] {job.id}"
f"{job.prompt[:40]}\033[0m")
if not job.recurring:
scheduled_jobs.pop(job.id, None)
if job.durable:
save_durable_jobs()
except Exception as e:
print(f" \033[31m[cron error] {job.id}: {e}\033[0m")
def consume_cron_queue() -> list[CronJob]:
"""Consume fired jobs from cron_queue (called by agent_loop)."""
with cron_lock:
fired = list(cron_queue)
cron_queue.clear()
return fired
# Load durable jobs on startup, then start scheduler thread
load_durable_jobs()
threading.Thread(target=cron_scheduler_loop, daemon=True).start()
print(" \033[35m[cron] scheduler thread started\033[0m")
# Cron tool handlers
def run_schedule_cron(cron: str, prompt: str,
recurring: bool = True, durable: bool = True) -> str:
result = schedule_job(cron, prompt, recurring, durable)
if isinstance(result, str):
return f"Error: {result}"
return f"Scheduled {result.id}: '{cron}'{prompt}"
def run_list_crons() -> str:
with cron_lock:
jobs = list(scheduled_jobs.values())
if not jobs:
return "No cron jobs. Use schedule_cron to add one."
lines = []
for j in jobs:
tag = "recurring" if j.recurring else "one-shot"
dur = "durable" if j.durable else "session"
lines.append(f" {j.id}: '{j.cron}'{j.prompt[:40]} "
f"[{tag}, {dur}]")
return "\n".join(lines)
def run_cancel_cron(job_id: str) -> str:
return cancel_job(job_id)
# ── MessageBus (s15 new) ──
# Teaching version uses simple file append + unlink.
# Real CC uses proper-lockfile for concurrent write safety.
MAILBOX_DIR = WORKDIR / ".mailboxes"
MAILBOX_DIR.mkdir(exist_ok=True)
class MessageBus:
"""File-based message bus. Each agent has a .jsonl inbox.
Read is destructive: read_text + unlink (consumes messages).
Teaching version: no file locking; real CC uses proper-lockfile."""
def send(self, from_agent: str, to_agent: str, content: str,
msg_type: str = "message"):
msg = {"from": from_agent, "to": to_agent,
"content": content, "type": msg_type,
"ts": time.time()}
inbox = MAILBOX_DIR / f"{to_agent}.jsonl"
with open(inbox, "a") as f:
f.write(json.dumps(msg) + "\n")
print(f" \033[33m[bus] {from_agent}{to_agent}: "
f"{content[:50]}\033[0m")
def read_inbox(self, agent: str) -> list[dict]:
inbox = MAILBOX_DIR / f"{agent}.jsonl"
if not inbox.exists():
return []
msgs = [json.loads(line) for line in inbox.read_text().splitlines()
if line.strip()]
inbox.unlink() # consume: read + delete
return msgs
BUS = MessageBus()
# Track spawned teammates
active_teammates: dict[str, bool] = {}
# ── Teammate Thread (s15 new) ──
def spawn_teammate_thread(name: str, role: str, prompt: str) -> str:
"""Spawn a teammate agent in a background thread.
Teaching version: max 10 rounds per teammate.
Real CC: teammates use idle loop (wait for inbox, work, repeat)
until shutdown_request."""
if name in active_teammates:
return f"Teammate '{name}' already exists"
system = (f"You are '{name}', a {role}. "
f"Use tools to complete tasks. "
f"Send results via send_message to 'lead'.")
def run():
messages = [{"role": "user", "content": prompt}]
sub_tools = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"content": {"type": "string"}},
"required": ["path", "content"]}},
{"name": "send_message",
"description": "Send a message to another agent.",
"input_schema": {"type": "object",
"properties": {"to": {"type": "string"},
"content": {"type": "string"}},
"required": ["to", "content"]}},
]
sub_handlers = {
"bash": run_bash, "read_file": run_read, "write_file": run_write,
"send_message": lambda to, content: (BUS.send(name, to, content),
"Sent")[1],
}
for _ in range(10):
inbox = BUS.read_inbox(name)
if inbox:
messages.append({"role": "user",
"content": f"<inbox>{json.dumps(inbox)}</inbox>"})
try:
response = client.messages.create(
model=MODEL, system=system, messages=messages[-20:],
tools=sub_tools, max_tokens=8000)
except Exception:
break
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
handler = sub_handlers.get(block.name)
output = handler(**block.input) if handler else "Unknown"
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)})
messages.append({"role": "user", "content": results})
# Send final summary to Lead
summary = "Done."
for msg in reversed(messages):
if msg["role"] == "assistant" and isinstance(msg["content"], list):
for b in msg["content"]:
if getattr(b, "type", None) == "text":
summary = b.text
break
else:
continue
break
BUS.send(name, "lead", summary, "result")
active_teammates.pop(name, None)
print(f" \033[32m[teammate] {name} finished\033[0m")
active_teammates[name] = True
threading.Thread(target=run, daemon=True).start()
print(f" \033[36m[teammate] {name} spawned as {role}\033[0m")
return f"Teammate '{name}' spawned as {role}"
# ── Team Tool Handlers (s15 new) ──
def run_spawn_teammate(name: str, role: str, prompt: str) -> str:
return spawn_teammate_thread(name, role, prompt)
def run_send_message(to: str, content: str) -> str:
BUS.send("lead", to, content)
return f"Sent to {to}"
def run_check_inbox() -> str:
msgs = BUS.read_inbox("lead")
if not msgs:
return "(inbox empty)"
lines = []
for m in msgs:
lines.append(f" [{m['from']}] {m['content'][:200]}")
return "\n".join(lines)
# ── Tool Definitions ──
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object",
"properties": {
"command": {"type": "string"},
"run_in_background": {"type": "boolean"}},
"required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"limit": {"type": "integer"}},
"required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"content": {"type": "string"}},
"required": ["path", "content"]}},
{"name": "create_task",
"description": "Create a new task with optional blockedBy dependencies.",
"input_schema": {"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
"blockedBy": {"type": "array",
"items": {"type": "string"}}},
"required": ["subject"]}},
{"name": "list_tasks",
"description": "List all tasks with status, owner, and dependencies.",
"input_schema": {"type": "object", "properties": {},
"required": []}},
{"name": "get_task",
"description": "Get full details of a specific task by ID.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "claim_task",
"description": "Claim a pending task. Sets owner, changes status to in_progress.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "complete_task",
"description": "Complete an in-progress task. Reports unblocked downstream tasks.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "schedule_cron",
"description": "Schedule a cron job. cron is 5-field: min hour dom month dow.",
"input_schema": {"type": "object",
"properties": {
"cron": {"type": "string",
"description": "5-field cron expression"},
"prompt": {"type": "string",
"description": "Message to inject when fired"},
"recurring": {"type": "boolean",
"description": "True=recurring, False=one-shot"},
"durable": {"type": "boolean",
"description": "True=persist to disk"}},
"required": ["cron", "prompt"]}},
{"name": "list_crons",
"description": "List all registered cron jobs.",
"input_schema": {"type": "object", "properties": {},
"required": []}},
{"name": "cancel_cron",
"description": "Cancel a cron job by ID.",
"input_schema": {"type": "object",
"properties": {"job_id": {"type": "string"}},
"required": ["job_id"]}},
{"name": "spawn_teammate",
"description": "Spawn a teammate agent in a background thread.",
"input_schema": {"type": "object",
"properties": {
"name": {"type": "string"},
"role": {"type": "string"},
"prompt": {"type": "string"}},
"required": ["name", "role", "prompt"]}},
{"name": "send_message",
"description": "Send a message to a teammate via MessageBus.",
"input_schema": {"type": "object",
"properties": {"to": {"type": "string"},
"content": {"type": "string"}},
"required": ["to", "content"]}},
{"name": "check_inbox",
"description": "Check Lead's inbox for teammate messages.",
"input_schema": {"type": "object", "properties": {},
"required": []}},
]
# ── Context ──
def update_context(context: dict, messages: list) -> dict:
"""Derive context from real state."""
memories = ""
if MEMORY_INDEX.exists():
content = MEMORY_INDEX.read_text().strip()
if content:
memories = content
return {
"enabled_tools": [t["name"] for t in TOOLS],
"workspace": str(WORKDIR),
"memories": memories,
}
# ── Agent Loop ──
# Teaching code keeps a basic agent loop. S11's full error recovery is omitted.
# Cron queue is consumed when agent_loop is called; real CC auto-wakes via
# queue processor (useQueueProcessor.ts) when items arrive.
def agent_loop(messages: list, context: dict):
system = get_system_prompt(context)
while True:
# Consume fired cron jobs → inject as messages
fired = consume_cron_queue()
for job in fired:
messages.append({"role": "user",
"content": f"[Scheduled] {job.prompt}"})
print(f" \033[35m[inject cron] {job.prompt[:50]}\033[0m")
try:
response = client.messages.create(
model=MODEL, system=system, messages=messages,
tools=TOOLS, max_tokens=8000)
except Exception as e:
messages.append({"role": "assistant", "content": [
{"type": "text",
"text": f"[Error] {type(e).__name__}: {e}"}]})
return
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type != "tool_use":
continue
print(f"\033[36m> {block.name}\033[0m")
if should_run_background(block.name, block.input):
bg_id = start_background_task(block)
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": f"[Background task {bg_id} started] "
f"Result will be available when complete."})
else:
output = execute_tool(block)
print(str(output)[:300])
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": output})
# Merge background tool results + notifications into one user message
user_content = list(results)
bg_notifications = collect_background_results()
if bg_notifications:
for notif in bg_notifications:
user_content.append({"type": "text", "text": notif})
messages.append({"role": "user", "content": user_content})
context = update_context(context, messages)
system = get_system_prompt(context)
if __name__ == "__main__":
print("s15: agent teams")
print("Enter a question, press Enter to send. Type q to quit.\n")
history = []
context = update_context({}, [])
while True:
try:
query = input("\033[36ms15 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history, context)
context = update_context(context, history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text":
print(block.text)
# Check inbox for teammate results → inject into history
inbox = BUS.read_inbox("lead")
if inbox:
inbox_text = "\n".join(
f"From {m['from']}: {m['content'][:200]}" for m in inbox)
history.append({"role": "user",
"content": f"[Inbox]\n{inbox_text}"})
print(f"\n\033[33m[Inbox: {len(inbox)} messages injected]\033[0m")
print()