#!/usr/bin/env python3 """ s15: Agent Teams — MessageBus + spawn_teammate_thread + inbox injection. Run: python s15_agent_teams/code.py Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY Changes from s14: - MessageBus class: file-based mailboxes (.mailboxes/*.jsonl) - spawn_teammate_thread: creates teammate in background thread - Teammate runs own simplified agent_loop (bash, read, write, send_message) - Lead tools: spawn_teammate, send_message, check_inbox (3 new) - Lead inbox: teammate messages injected into history (not just printed) - Teaching version: teammates limited to 10 rounds (real CC uses idle loop) ASCII flow: Lead: cron_queue → messages → prompt → LLM → TOOLS ────→ loop ↑ ↓ | └── inbox ← MessageBus ← teammate.send_message ←┘ Teammate: inbox → LLM → bash/read/write/send → loop (max 10 turns) """ import os, subprocess, json, time, random, threading from pathlib import Path from datetime import datetime from dataclasses import dataclass, asdict try: import readline readline.parse_and_bind('set bind-tty-special-chars off') except ImportError: pass from anthropic import Anthropic from dotenv import load_dotenv load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() MEMORY_DIR = WORKDIR / ".memory" MEMORY_INDEX = MEMORY_DIR / "MEMORY.md" client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) MODEL = os.environ["MODEL_ID"] # ── Task System (from s12, synced) ── TASKS_DIR = WORKDIR / ".tasks" TASKS_DIR.mkdir(exist_ok=True) @dataclass class Task: id: str subject: str description: str status: str # pending | in_progress | completed owner: str | None blockedBy: list[str] def _task_path(task_id: str) -> Path: return TASKS_DIR / f"{task_id}.json" def create_task(subject: str, description: str = "", blockedBy: list[str] | None = None) -> Task: task = Task( id=f"task_{int(time.time())}_{random.randint(0, 9999):04d}", subject=subject, description=description, status="pending", owner=None, blockedBy=blockedBy or [], ) save_task(task) return task def save_task(task: Task): _task_path(task.id).write_text(json.dumps(asdict(task), indent=2)) def load_task(task_id: str) -> Task: return Task(**json.loads(_task_path(task_id).read_text())) def list_tasks() -> list[Task]: return [Task(**json.loads(p.read_text())) for p in sorted(TASKS_DIR.glob("task_*.json"))] def get_task(task_id: str) -> str: """Return full task details as JSON.""" task = load_task(task_id) return json.dumps(asdict(task), indent=2) def can_start(task_id: str) -> bool: """Check if all blockedBy dependencies are completed. Missing dependencies are treated as blocked.""" task = load_task(task_id) for dep_id in task.blockedBy: if not _task_path(dep_id).exists(): return False if load_task(dep_id).status != "completed": return False return True def claim_task(task_id: str, owner: str = "agent") -> str: task = load_task(task_id) if task.status != "pending": return f"Task {task_id} is {task.status}, cannot claim" if not can_start(task_id): deps = [d for d in task.blockedBy if not _task_path(d).exists() or load_task(d).status != "completed"] return f"Blocked by: {deps}" task.owner = owner task.status = "in_progress" save_task(task) print(f" \033[36m[claim] {task.subject} → in_progress (owner: {owner})\033[0m") return f"Claimed {task.id} ({task.subject})" def complete_task(task_id: str) -> str: task = load_task(task_id) if task.status != "in_progress": return f"Task {task_id} is {task.status}, cannot complete" task.status = "completed" save_task(task) unblocked = [t.subject for t in list_tasks() if t.status == "pending" and t.blockedBy and can_start(t.id)] print(f" \033[32m[complete] {task.subject} ✓\033[0m") msg = f"Completed {task.id} ({task.subject})" if unblocked: msg += f"\nUnblocked: {', '.join(unblocked)}" print(f" \033[33m[unblocked] {', '.join(unblocked)}\033[0m") return msg # ── Prompt Assembly (from s10, synced) ── PROMPT_SECTIONS = { "identity": "You are a coding agent. Act, don't explain.", "tools": "Available tools: bash, read_file, write_file, " "get_task, create_task, list_tasks, claim_task, complete_task, " "schedule_cron, list_crons, cancel_cron, " "spawn_teammate, send_message, check_inbox.", "workspace": f"Working directory: {WORKDIR}", "memory": "Relevant memories are injected below when available.", } def assemble_system_prompt(context: dict) -> str: sections = [PROMPT_SECTIONS["identity"], PROMPT_SECTIONS["tools"], PROMPT_SECTIONS["workspace"]] memories = context.get("memories", "") if memories: sections.append(f"Relevant memories:\n{memories}") return "\n\n".join(sections) _last_context_key, _last_prompt = None, None def get_system_prompt(context: dict) -> str: global _last_context_key, _last_prompt key = json.dumps(context, sort_keys=True, ensure_ascii=False, default=str) if key == _last_context_key and _last_prompt: return _last_prompt _last_context_key = key _last_prompt = assemble_system_prompt(context) return _last_prompt # ── Tools ── def safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str, run_in_background: bool = False) -> str: # run_in_background is handled by agent_loop dispatch, not here try: r = subprocess.run(command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int | None = None) -> str: try: lines = safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"] return "\n".join(lines) except Exception as e: return f"Error: {e}" def run_write(path: str, content: str) -> str: try: fp = safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes to {path}" except Exception as e: return f"Error: {e}" # Task tools def run_create_task(subject: str, description: str = "", blockedBy: list[str] | None = None) -> str: task = create_task(subject, description, blockedBy) deps = f" (blockedBy: {', '.join(blockedBy)})" if blockedBy else "" print(f" \033[34m[create] {task.subject}{deps}\033[0m") return f"Created {task.id}: {task.subject}{deps}" def run_list_tasks() -> str: tasks = list_tasks() if not tasks: return "No tasks. Use create_task to add some." lines = [] for t in tasks: icon = {"pending": "○", "in_progress": "●", "completed": "✓"}.get(t.status, "?") deps = f" (blockedBy: {', '.join(t.blockedBy)})" if t.blockedBy else "" owner = f" [{t.owner}]" if t.owner else "" lines.append(f" {icon} {t.id}: {t.subject} " f"[{t.status}]{owner}{deps}") return "\n".join(lines) def run_get_task(task_id: str) -> str: try: return get_task(task_id) except FileNotFoundError: return f"Error: Task {task_id} not found" def run_claim_task(task_id: str) -> str: return claim_task(task_id, owner="agent") def run_complete_task(task_id: str) -> str: return complete_task(task_id) # ── Background Tasks (from s13, synced) ── _bg_counter = 0 background_tasks: dict[str, dict] = {} background_results: dict[str, str] = {} background_lock = threading.Lock() def is_slow_operation(tool_name: str, tool_input: dict) -> bool: """Fallback heuristic: commands likely to take > 30s.""" if tool_name != "bash": return False cmd = tool_input.get("command", "").lower() slow_keywords = ["install", "build", "test", "deploy", "compile", "docker build", "pip install", "npm install", "cargo build", "pytest", "make"] return any(kw in cmd for kw in slow_keywords) def should_run_background(tool_name: str, tool_input: dict) -> bool: """Model explicit request takes priority; fallback to heuristic.""" if tool_input.get("run_in_background"): return True return is_slow_operation(tool_name, tool_input) def execute_tool(block) -> str: """Execute a tool call block, return output.""" handler = { "bash": run_bash, "read_file": run_read, "write_file": run_write, "create_task": run_create_task, "list_tasks": run_list_tasks, "get_task": run_get_task, "claim_task": run_claim_task, "complete_task": run_complete_task, "schedule_cron": run_schedule_cron, "list_crons": run_list_crons, "cancel_cron": run_cancel_cron, "spawn_teammate": run_spawn_teammate, "send_message": run_send_message, "check_inbox": run_check_inbox, }.get(block.name) if handler: return handler(**block.input) return f"Unknown tool: {block.name}" def start_background_task(block) -> str: """Run tool in a daemon thread. Returns background task ID.""" global _bg_counter _bg_counter += 1 bg_id = f"bg_{_bg_counter:04d}" cmd = block.input.get("command", block.name) def worker(): result = execute_tool(block) with background_lock: background_tasks[bg_id]["status"] = "completed" background_results[bg_id] = result with background_lock: background_tasks[bg_id] = { "tool_use_id": block.id, "command": cmd, "status": "running", } threading.Thread(target=worker, daemon=True).start() print(f" \033[33m[background] dispatched {bg_id}: {cmd[:40]}\033[0m") return bg_id def collect_background_results() -> list[str]: """Collect completed background results as task_notification messages.""" with background_lock: ready_ids = [bid for bid, task in background_tasks.items() if task["status"] == "completed"] notifications = [] for bg_id in ready_ids: with background_lock: task = background_tasks.pop(bg_id) output = background_results.pop(bg_id, "") summary = output[:200] if len(output) > 200 else output notifications.append( f"\n" f" {bg_id}\n" f" completed\n" f" {task['command']}\n" f" {summary}\n" f"") print(f" \033[32m[background done] {bg_id}: " f"{task['command'][:40]} ({len(output)} chars)\033[0m") return notifications # ── Cron Scheduler (from s14, synced) ── DURABLE_PATH = WORKDIR / ".scheduled_tasks.json" @dataclass class CronJob: id: str cron: str # "0 9 * * *" prompt: str # message to inject when fired recurring: bool # True = recurring, False = one-shot durable: bool # True = persist to disk scheduled_jobs: dict[str, CronJob] = {} cron_queue: list[CronJob] = [] cron_lock = threading.Lock() _last_fired: dict[str, str] = {} # job_id → "YYYY-MM-DD HH:MM" def _cron_field_matches(field: str, value: int) -> bool: """Match a single cron field against a value.""" if field == "*": return True if field.startswith("*/"): step = int(field[2:]) return step > 0 and value % step == 0 if "," in field: return any(_cron_field_matches(f.strip(), value) for f in field.split(",")) if "-" in field: lo, hi = field.split("-", 1) return int(lo) <= value <= int(hi) return value == int(field) def cron_matches(cron_expr: str, dt: datetime) -> bool: """Check if a 5-field cron expression matches the given datetime. Standard cron semantics: DOM and DOW use OR when both are constrained.""" fields = cron_expr.strip().split() if len(fields) != 5: return False minute, hour, dom, month, dow = fields dow_val = (dt.weekday() + 1) % 7 # Python Monday=0 → cron Sunday=0 m = _cron_field_matches(minute, dt.minute) h = _cron_field_matches(hour, dt.hour) dom_ok = _cron_field_matches(dom, dt.day) month_ok = _cron_field_matches(month, dt.month) dow_ok = _cron_field_matches(dow, dow_val) # Minute, hour, month must all match if not (m and h and month_ok): return False # DOM and DOW: if both constrained, either matching is enough (OR) dom_unconstrained = dom == "*" dow_unconstrained = dow == "*" if dom_unconstrained and dow_unconstrained: return True if dom_unconstrained: return dow_ok if dow_unconstrained: return dom_ok return dom_ok or dow_ok def _validate_cron_field(field: str, lo: int, hi: int) -> str | None: """Validate a single cron field value is within [lo, hi].""" if field == "*": return None if field.startswith("*/"): step_str = field[2:] if not step_str.isdigit(): return f"Invalid step: {field}" step = int(step_str) if step <= 0: return f"Step must be > 0: {field}" return None if "," in field: for part in field.split(","): err = _validate_cron_field(part.strip(), lo, hi) if err: return err return None if "-" in field: parts = field.split("-", 1) if not parts[0].isdigit() or not parts[1].isdigit(): return f"Invalid range: {field}" a, b = int(parts[0]), int(parts[1]) if a < lo or a > hi or b < lo or b > hi: return f"Range {field} out of bounds [{lo}-{hi}]" if a > b: return f"Range start > end: {field}" return None if not field.isdigit(): return f"Invalid field: {field}" val = int(field) if val < lo or val > hi: return f"Value {val} out of bounds [{lo}-{hi}]" return None def validate_cron(cron_expr: str) -> str | None: """Validate a cron expression. Returns error message or None.""" fields = cron_expr.strip().split() if len(fields) != 5: return f"Expected 5 fields, got {len(fields)}" bounds = [(0, 59), (0, 23), (1, 31), (1, 12), (0, 6)] names = ["minute", "hour", "day-of-month", "month", "day-of-week"] for i, (field, (lo, hi), name) in enumerate(zip(fields, bounds, names)): err = _validate_cron_field(field, lo, hi) if err: return f"{name}: {err}" return None def save_durable_jobs(): """Persist durable jobs to .scheduled_tasks.json.""" durable = [asdict(j) for j in scheduled_jobs.values() if j.durable] DURABLE_PATH.write_text(json.dumps(durable, indent=2)) def load_durable_jobs(): """Load durable jobs from disk on startup.""" if not DURABLE_PATH.exists(): return try: jobs = json.loads(DURABLE_PATH.read_text()) for j in jobs: job = CronJob(**j) err = validate_cron(job.cron) if err: print(f" \033[31m[cron] skipping invalid job {job.id}: {err}\033[0m") continue scheduled_jobs[job.id] = job valid = [j for j in jobs if j["id"] in scheduled_jobs] if valid: print(f" \033[35m[cron] loaded {len(valid)} durable job(s)\033[0m") except Exception: pass def schedule_job(cron: str, prompt: str, recurring: bool = True, durable: bool = True) -> CronJob | str: """Register a new cron job. Returns CronJob or error string.""" err = validate_cron(cron) if err: return err job = CronJob( id=f"cron_{random.randint(0, 999999):06d}", cron=cron, prompt=prompt, recurring=recurring, durable=durable, ) with cron_lock: scheduled_jobs[job.id] = job if durable: save_durable_jobs() print(f" \033[35m[cron register] {job.id} '{cron}' → {prompt[:40]}\033[0m") return job def cancel_job(job_id: str) -> str: """Cancel a cron job.""" with cron_lock: job = scheduled_jobs.pop(job_id, None) if not job: return f"Job {job_id} not found" if job.durable: save_durable_jobs() print(f" \033[31m[cron cancel] {job_id}\033[0m") return f"Cancelled {job_id}" def cron_scheduler_loop(): """Independent daemon thread: poll every 1s, fire matching jobs. Individual job errors are caught to prevent one bad job from killing the entire scheduler thread.""" while True: time.sleep(1) now = datetime.now() # Date-aware marker prevents daily jobs from skipping on day 2+ minute_marker = now.strftime("%Y-%m-%d %H:%M") with cron_lock: for job in list(scheduled_jobs.values()): try: if cron_matches(job.cron, now): if _last_fired.get(job.id) != minute_marker: cron_queue.append(job) _last_fired[job.id] = minute_marker print(f" \033[35m[cron fire] {job.id} → " f"{job.prompt[:40]}\033[0m") if not job.recurring: scheduled_jobs.pop(job.id, None) if job.durable: save_durable_jobs() except Exception as e: print(f" \033[31m[cron error] {job.id}: {e}\033[0m") def consume_cron_queue() -> list[CronJob]: """Consume fired jobs from cron_queue (called by agent_loop).""" with cron_lock: fired = list(cron_queue) cron_queue.clear() return fired # Load durable jobs on startup, then start scheduler thread load_durable_jobs() threading.Thread(target=cron_scheduler_loop, daemon=True).start() print(" \033[35m[cron] scheduler thread started\033[0m") # Cron tool handlers def run_schedule_cron(cron: str, prompt: str, recurring: bool = True, durable: bool = True) -> str: result = schedule_job(cron, prompt, recurring, durable) if isinstance(result, str): return f"Error: {result}" return f"Scheduled {result.id}: '{cron}' → {prompt}" def run_list_crons() -> str: with cron_lock: jobs = list(scheduled_jobs.values()) if not jobs: return "No cron jobs. Use schedule_cron to add one." lines = [] for j in jobs: tag = "recurring" if j.recurring else "one-shot" dur = "durable" if j.durable else "session" lines.append(f" {j.id}: '{j.cron}' → {j.prompt[:40]} " f"[{tag}, {dur}]") return "\n".join(lines) def run_cancel_cron(job_id: str) -> str: return cancel_job(job_id) # ── MessageBus (s15 new) ── # Teaching version uses simple file append + unlink. # Real CC uses proper-lockfile for concurrent write safety. MAILBOX_DIR = WORKDIR / ".mailboxes" MAILBOX_DIR.mkdir(exist_ok=True) class MessageBus: """File-based message bus. Each agent has a .jsonl inbox. Read is destructive: read_text + unlink (consumes messages). Teaching version: no file locking; real CC uses proper-lockfile.""" def send(self, from_agent: str, to_agent: str, content: str, msg_type: str = "message"): msg = {"from": from_agent, "to": to_agent, "content": content, "type": msg_type, "ts": time.time()} inbox = MAILBOX_DIR / f"{to_agent}.jsonl" with open(inbox, "a") as f: f.write(json.dumps(msg) + "\n") print(f" \033[33m[bus] {from_agent} → {to_agent}: " f"{content[:50]}\033[0m") def read_inbox(self, agent: str) -> list[dict]: inbox = MAILBOX_DIR / f"{agent}.jsonl" if not inbox.exists(): return [] msgs = [json.loads(line) for line in inbox.read_text().splitlines() if line.strip()] inbox.unlink() # consume: read + delete return msgs BUS = MessageBus() # Track spawned teammates active_teammates: dict[str, bool] = {} # ── Teammate Thread (s15 new) ── def spawn_teammate_thread(name: str, role: str, prompt: str) -> str: """Spawn a teammate agent in a background thread. Teaching version: max 10 rounds per teammate. Real CC: teammates use idle loop (wait for inbox, work, repeat) until shutdown_request.""" if name in active_teammates: return f"Teammate '{name}' already exists" system = (f"You are '{name}', a {role}. " f"Use tools to complete tasks. " f"Send results via send_message to 'lead'.") def run(): messages = [{"role": "user", "content": prompt}] sub_tools = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to a file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "send_message", "description": "Send a message to another agent.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}}, "required": ["to", "content"]}}, ] sub_handlers = { "bash": run_bash, "read_file": run_read, "write_file": run_write, "send_message": lambda to, content: (BUS.send(name, to, content), "Sent")[1], } for _ in range(10): inbox = BUS.read_inbox(name) if inbox: messages.append({"role": "user", "content": f"{json.dumps(inbox)}"}) try: response = client.messages.create( model=MODEL, system=system, messages=messages[-20:], tools=sub_tools, max_tokens=8000) except Exception: break messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": break results = [] for block in response.content: if block.type == "tool_use": handler = sub_handlers.get(block.name) output = handler(**block.input) if handler else "Unknown" results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)}) messages.append({"role": "user", "content": results}) # Send final summary to Lead summary = "Done." for msg in reversed(messages): if msg["role"] == "assistant" and isinstance(msg["content"], list): for b in msg["content"]: if getattr(b, "type", None) == "text": summary = b.text break else: continue break BUS.send(name, "lead", summary, "result") active_teammates.pop(name, None) print(f" \033[32m[teammate] {name} finished\033[0m") active_teammates[name] = True threading.Thread(target=run, daemon=True).start() print(f" \033[36m[teammate] {name} spawned as {role}\033[0m") return f"Teammate '{name}' spawned as {role}" # ── Team Tool Handlers (s15 new) ── def run_spawn_teammate(name: str, role: str, prompt: str) -> str: return spawn_teammate_thread(name, role, prompt) def run_send_message(to: str, content: str) -> str: BUS.send("lead", to, content) return f"Sent to {to}" def run_check_inbox() -> str: msgs = BUS.read_inbox("lead") if not msgs: return "(inbox empty)" lines = [] for m in msgs: lines.append(f" [{m['from']}] {m['content'][:200]}") return "\n".join(lines) # ── Tool Definitions ── TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": { "command": {"type": "string"}, "run_in_background": {"type": "boolean"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to a file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "create_task", "description": "Create a new task with optional blockedBy dependencies.", "input_schema": {"type": "object", "properties": { "subject": {"type": "string"}, "description": {"type": "string"}, "blockedBy": {"type": "array", "items": {"type": "string"}}}, "required": ["subject"]}}, {"name": "list_tasks", "description": "List all tasks with status, owner, and dependencies.", "input_schema": {"type": "object", "properties": {}, "required": []}}, {"name": "get_task", "description": "Get full details of a specific task by ID.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "claim_task", "description": "Claim a pending task. Sets owner, changes status to in_progress.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "complete_task", "description": "Complete an in-progress task. Reports unblocked downstream tasks.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "schedule_cron", "description": "Schedule a cron job. cron is 5-field: min hour dom month dow.", "input_schema": {"type": "object", "properties": { "cron": {"type": "string", "description": "5-field cron expression"}, "prompt": {"type": "string", "description": "Message to inject when fired"}, "recurring": {"type": "boolean", "description": "True=recurring, False=one-shot"}, "durable": {"type": "boolean", "description": "True=persist to disk"}}, "required": ["cron", "prompt"]}}, {"name": "list_crons", "description": "List all registered cron jobs.", "input_schema": {"type": "object", "properties": {}, "required": []}}, {"name": "cancel_cron", "description": "Cancel a cron job by ID.", "input_schema": {"type": "object", "properties": {"job_id": {"type": "string"}}, "required": ["job_id"]}}, {"name": "spawn_teammate", "description": "Spawn a teammate agent in a background thread.", "input_schema": {"type": "object", "properties": { "name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}}, {"name": "send_message", "description": "Send a message to a teammate via MessageBus.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}}, "required": ["to", "content"]}}, {"name": "check_inbox", "description": "Check Lead's inbox for teammate messages.", "input_schema": {"type": "object", "properties": {}, "required": []}}, ] # ── Context ── def update_context(context: dict, messages: list) -> dict: """Derive context from real state.""" memories = "" if MEMORY_INDEX.exists(): content = MEMORY_INDEX.read_text().strip() if content: memories = content return { "enabled_tools": [t["name"] for t in TOOLS], "workspace": str(WORKDIR), "memories": memories, } # ── Agent Loop ── # Teaching code keeps a basic agent loop. S11's full error recovery is omitted. # Cron queue is consumed when agent_loop is called; real CC auto-wakes via # queue processor (useQueueProcessor.ts) when items arrive. def agent_loop(messages: list, context: dict): system = get_system_prompt(context) while True: # Consume fired cron jobs → inject as messages fired = consume_cron_queue() for job in fired: messages.append({"role": "user", "content": f"[Scheduled] {job.prompt}"}) print(f" \033[35m[inject cron] {job.prompt[:50]}\033[0m") try: response = client.messages.create( model=MODEL, system=system, messages=messages, tools=TOOLS, max_tokens=8000) except Exception as e: messages.append({"role": "assistant", "content": [ {"type": "text", "text": f"[Error] {type(e).__name__}: {e}"}]}) return messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": return results = [] for block in response.content: if block.type != "tool_use": continue print(f"\033[36m> {block.name}\033[0m") if should_run_background(block.name, block.input): bg_id = start_background_task(block) results.append({"type": "tool_result", "tool_use_id": block.id, "content": f"[Background task {bg_id} started] " f"Result will be available when complete."}) else: output = execute_tool(block) print(str(output)[:300]) results.append({"type": "tool_result", "tool_use_id": block.id, "content": output}) # Merge background notifications + tool results into one user message user_content = [] bg_notifications = collect_background_results() if bg_notifications: for notif in bg_notifications: user_content.append({"type": "text", "text": notif}) user_content.extend(results) messages.append({"role": "user", "content": user_content}) context = update_context(context, messages) system = get_system_prompt(context) if __name__ == "__main__": print("s15: agent teams") print("Enter a question, press Enter to send. Type q to quit.\n") history = [] context = update_context({}, []) while True: try: query = input("\033[36ms15 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break history.append({"role": "user", "content": query}) agent_loop(history, context) context = update_context(context, history) for block in history[-1]["content"]: if getattr(block, "type", None) == "text": print(block.text) # Check inbox for teammate results → inject into history inbox = BUS.read_inbox("lead") if inbox: inbox_text = "\n".join( f"From {m['from']}: {m['content'][:200]}" for m in inbox) history.append({"role": "user", "content": f"[Inbox]\n{inbox_text}"}) print(f"\n\033[33m[Inbox: {len(inbox)} messages injected]\033[0m") print()