#!/usr/bin/env python3 """ s12: Task System — file-persisted task graph with blockedBy dependencies. Run: python s12_task_system/code.py Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY Changes from s11: - Task dataclass (id, subject, description, status, owner, blockedBy) - TASKS_DIR = .tasks/ for persistent JSON storage - create_task / save_task / load_task / list_tasks / get_task - can_start: checks blockedBy all completed (missing deps = blocked) - claim_task: set owner + pending -> in_progress - complete_task: set completed + report unblocked downstream - 5 new tools: create_task, list_tasks, get_task, claim_task, complete_task Note: Teaching code keeps a basic agent loop to stay focused on the task system. S11's full error recovery (RecoveryState, backoff, escalation, reactive compact, fallback model) is omitted — in real CC, tasks.ts and withRetry are independent layers that compose naturally. """ import os, subprocess, json, time, random from pathlib import Path from dataclasses import dataclass, asdict try: import readline readline.parse_and_bind('set bind-tty-special-chars off') except ImportError: pass from anthropic import Anthropic from dotenv import load_dotenv load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() MEMORY_DIR = WORKDIR / ".memory" MEMORY_INDEX = MEMORY_DIR / "MEMORY.md" client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) MODEL = os.environ["MODEL_ID"] # ── Task System ── TASKS_DIR = WORKDIR / ".tasks" TASKS_DIR.mkdir(exist_ok=True) @dataclass class Task: id: str subject: str description: str status: str # pending | in_progress | completed owner: str | None # Agent name (multi-agent scenarios) blockedBy: list[str] # Dependency task IDs def _task_path(task_id: str) -> Path: return TASKS_DIR / f"{task_id}.json" def create_task(subject: str, description: str = "", blockedBy: list[str] | None = None) -> Task: task = Task( id=f"task_{int(time.time())}_{random.randint(0, 9999):04d}", subject=subject, description=description, status="pending", owner=None, blockedBy=blockedBy or [], ) save_task(task) return task def save_task(task: Task): _task_path(task.id).write_text(json.dumps(asdict(task), indent=2)) def load_task(task_id: str) -> Task: return Task(**json.loads(_task_path(task_id).read_text())) def list_tasks() -> list[Task]: return [Task(**json.loads(p.read_text())) for p in sorted(TASKS_DIR.glob("task_*.json"))] def get_task(task_id: str) -> str: """Return full task details as JSON.""" task = load_task(task_id) return json.dumps(asdict(task), indent=2) def can_start(task_id: str) -> bool: """Check if all blockedBy dependencies are completed. Missing dependencies are treated as blocked.""" task = load_task(task_id) for dep_id in task.blockedBy: if not _task_path(dep_id).exists(): return False if load_task(dep_id).status != "completed": return False return True def claim_task(task_id: str, owner: str = "agent") -> str: task = load_task(task_id) if task.status != "pending": return f"Task {task_id} is {task.status}, cannot claim" if not can_start(task_id): deps = [d for d in task.blockedBy if not _task_path(d).exists() or load_task(d).status != "completed"] return f"Blocked by: {deps}" task.owner = owner task.status = "in_progress" save_task(task) print(f" \033[36m[claim] {task.subject} → in_progress (owner: {owner})\033[0m") return f"Claimed {task.id} ({task.subject})" def complete_task(task_id: str) -> str: task = load_task(task_id) if task.status != "in_progress": return f"Task {task_id} is {task.status}, cannot complete" task.status = "completed" save_task(task) unblocked = [t.subject for t in list_tasks() if t.status == "pending" and t.blockedBy and can_start(t.id)] print(f" \033[32m[complete] {task.subject} ✓\033[0m") msg = f"Completed {task.id} ({task.subject})" if unblocked: msg += f"\nUnblocked: {', '.join(unblocked)}" print(f" \033[33m[unblocked] {', '.join(unblocked)}\033[0m") return msg # ── Prompt Assembly (from s10, synced) ── PROMPT_SECTIONS = { "identity": "You are a coding agent. Act, don't explain.", "tools": "Available tools: bash, read_file, write_file, " "create_task, list_tasks, get_task, claim_task, complete_task.", "workspace": f"Working directory: {WORKDIR}", "memory": "Relevant memories are injected below when available.", } def assemble_system_prompt(context: dict) -> str: sections = [PROMPT_SECTIONS["identity"], PROMPT_SECTIONS["tools"], PROMPT_SECTIONS["workspace"]] memories = context.get("memories", "") if memories: sections.append(f"Relevant memories:\n{memories}") return "\n\n".join(sections) _last_context_key, _last_prompt = None, None def get_system_prompt(context: dict) -> str: global _last_context_key, _last_prompt key = json.dumps(context, sort_keys=True, ensure_ascii=False, default=str) if key == _last_context_key and _last_prompt: return _last_prompt _last_context_key = key _last_prompt = assemble_system_prompt(context) return _last_prompt # ── Tools ── def safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str) -> str: try: r = subprocess.run(command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int | None = None) -> str: try: lines = safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"] return "\n".join(lines) except Exception as e: return f"Error: {e}" def run_write(path: str, content: str) -> str: try: fp = safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes to {path}" except Exception as e: return f"Error: {e}" # Task tools def run_create_task(subject: str, description: str = "", blockedBy: list[str] | None = None) -> str: task = create_task(subject, description, blockedBy) deps = f" (blockedBy: {', '.join(blockedBy)})" if blockedBy else "" print(f" \033[34m[create] {task.subject}{deps}\033[0m") return f"Created {task.id}: {task.subject}{deps}" def run_list_tasks() -> str: tasks = list_tasks() if not tasks: return "No tasks. Use create_task to add some." lines = [] for t in tasks: icon = {"pending": "○", "in_progress": "●", "completed": "✓"}.get(t.status, "?") deps = f" (blockedBy: {', '.join(t.blockedBy)})" if t.blockedBy else "" owner = f" [{t.owner}]" if t.owner else "" lines.append(f" {icon} {t.id}: {t.subject} " f"[{t.status}]{owner}{deps}") return "\n".join(lines) def run_get_task(task_id: str) -> str: try: return get_task(task_id) except FileNotFoundError: return f"Error: Task {task_id} not found" def run_claim_task(task_id: str) -> str: return claim_task(task_id, owner="agent") def run_complete_task(task_id: str) -> str: return complete_task(task_id) TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to a file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "create_task", "description": "Create a new task with optional blockedBy dependencies.", "input_schema": {"type": "object", "properties": { "subject": {"type": "string"}, "description": {"type": "string"}, "blockedBy": {"type": "array", "items": {"type": "string"}}}, "required": ["subject"]}}, {"name": "list_tasks", "description": "List all tasks with status, owner, and dependencies.", "input_schema": {"type": "object", "properties": {}, "required": []}}, {"name": "get_task", "description": "Get full details of a specific task by ID.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "claim_task", "description": "Claim a pending task. Sets owner, changes status to in_progress.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, {"name": "complete_task", "description": "Complete an in-progress task. Reports unblocked downstream tasks.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}}, ] TOOL_HANDLERS = { "bash": run_bash, "read_file": run_read, "write_file": run_write, "create_task": run_create_task, "list_tasks": run_list_tasks, "get_task": run_get_task, "claim_task": run_claim_task, "complete_task": run_complete_task, } # ── Context ── def update_context(context: dict, messages: list) -> dict: """Derive context from real state.""" memories = "" if MEMORY_INDEX.exists(): content = MEMORY_INDEX.read_text().strip() if content: memories = content return { "enabled_tools": list(TOOL_HANDLERS.keys()), "workspace": str(WORKDIR), "memories": memories, } # ── Agent Loop (simplified, focused on task system) ── def agent_loop(messages: list, context: dict): system = get_system_prompt(context) while True: try: response = client.messages.create( model=MODEL, system=system, messages=messages, tools=TOOLS, max_tokens=8000) except Exception as e: messages.append({"role": "assistant", "content": [ {"type": "text", "text": f"[Error] {type(e).__name__}: {e}"}]}) return messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": return results = [] for block in response.content: if block.type != "tool_use": continue print(f"\033[36m> {block.name}\033[0m") handler = TOOL_HANDLERS.get(block.name) output = handler(**block.input) if handler else f"Unknown: {block.name}" print(str(output)[:300]) results.append({"type": "tool_result", "tool_use_id": block.id, "content": output}) messages.append({"role": "user", "content": results}) context = update_context(context, messages) system = get_system_prompt(context) if __name__ == "__main__": print("s12: task system") print("Enter a question, press Enter to send. Type q to quit.\n") history = [] context = update_context({}, []) while True: try: query = input("\033[36ms12 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break history.append({"role": "user", "content": query}) agent_loop(history, context) context = update_context(context, history) for block in history[-1]["content"]: if getattr(block, "type", None) == "text": print(block.text) print()