Files
analysis_claude_code/s13_background_tasks/code.py
gui-yue 1baf1aca5a Follow up PR #265: refine chapters, diagrams, and add S20 (#283)
* feat: s01-s14 docs quality overhaul — tool pipeline, single-agent, knowledge & resilience

Rewrite code.py and README (zh/en/ja) for s01-s14, each chapter building
incrementally on the previous. Key fixes across chapters:

- s01-s04: agent loop, tool dispatch, permission pipeline, hooks
- s05-s08: todo write, subagent, skill loading, context compact
- s09-s11: memory system, system prompt assembly, error recovery
- s12-s14: task graph, background tasks, cron scheduler

All chapters CC source-verified. Code inherits fixes forward (PROMPT_SECTIONS,
json.dumps cache, real-state context, can_start dep protection, etc.).

* feat: s15-s19 docs quality overhaul — multi-agent platform: teams, protocols, autonomy, worktree, MCP tools

Rewrite code.py and README (zh/en/ja) for s15-s19, the multi-agent platform
chapters. Each chapter inherits all previous fixes and adds one mechanism:

- s15: agent teams (TeamCreate, teammate threads, shared task list)
- s16: team protocols (plan approval, shutdown handshake, consume_inbox)
- s17: autonomous agents (idle polling, auto-claim, consume_lead_inbox)
- s18: worktree isolation (git worktree, bind_task, cwd switching, safety)
- s19: MCP tools (MCPClient, normalize_mcp_name, assemble_tool_pool, no cache)

All appendix source code references verified against CC source. Config priority
corrected: claude.ai < plugin < user < project < local.

* fix: 5 regressions across s05-s19 — glob safety, todo validation, memory extraction, protocol types, dep crash

- s05-s09: glob results now filter with is_relative_to(WORKDIR) (inherited from s02)
- s06-s08: todo_write validates content/status required fields (inherited from s05)
- s09: extract_memories uses pre-compression snapshot instead of compacted messages
- s16: submit_plan docstring clarifies protocol-only (not code-level gate)
- s17-s19: match_response restores type mismatch validation (from s16)
- s17-s19: claim_task deps list handles missing dep files without crashing

* fix: s12 Todo V2 logic reversal, s14/s15 cron range validation, s18/s19 worktree name validation

- s12 README (zh/en/ja): fix Todo V2 direction — interactive defaults to Task,
  non-interactive/SDK defaults to TodoWrite. Fix env var name to
  CLAUDE_CODE_ENABLE_TASKS (not TODO_V2).
- s14/s15: add _validate_cron_field with per-field range checks (minute 0-59,
  hour 0-23, dom 1-31, month 1-12, dow 0-6), step > 0, range lo <= hi.
  Replace old try/except validation that only caught exceptions.
- s18/s19: add validate_worktree_name() to remove_worktree and keep_worktree,
  not just create_worktree.

* fix: align s16-s19 teaching tool consistency

* fix pr265 chapter diagrams

* Add comprehensive s20 harness chapter

* Fix chapter smoke test regressions

* Clarify README tutorial track transition

---------

Co-authored-by: Haoran <bill-billion@outlook.com>
2026-05-20 21:45:38 +08:00

480 lines
17 KiB
Python

#!/usr/bin/env python3
"""
s13: Background Tasks — thread-based async execution + notification injection.
Run: python s13_background_tasks/code.py
Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY
Changes from s12:
- threading.Thread for background execution
- background_tasks dict for lifecycle tracking (bg_id, command, status)
- background_results dict + threading.Lock for thread-safe storage
- should_run_background: model explicit request via run_in_background param
- is_slow_operation: fallback heuristic when model doesn't specify
- start_background_task: dispatch to daemon thread, return bg task id
- collect_background_results: gather completed, return as notifications
- agent_loop: slow ops → background + placeholder, inject notifications
- Notifications use <task_notification> format, not reused tool_use_id
Note: Teaching code keeps a basic agent loop to stay focused on background
tasks. S11's full error recovery (RecoveryState, backoff, escalation,
reactive compact, fallback model) is omitted.
"""
import os, subprocess, json, time, random, threading
from pathlib import Path
from dataclasses import dataclass, asdict
try:
import readline
readline.parse_and_bind('set bind-tty-special-chars off')
except ImportError:
pass
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
MEMORY_DIR = WORKDIR / ".memory"
MEMORY_INDEX = MEMORY_DIR / "MEMORY.md"
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
# ── Task System (from s12, synced) ──
TASKS_DIR = WORKDIR / ".tasks"
TASKS_DIR.mkdir(exist_ok=True)
@dataclass
class Task:
id: str
subject: str
description: str
status: str # pending | in_progress | completed
owner: str | None
blockedBy: list[str]
def _task_path(task_id: str) -> Path:
return TASKS_DIR / f"{task_id}.json"
def create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> Task:
task = Task(
id=f"task_{int(time.time())}_{random.randint(0, 9999):04d}",
subject=subject, description=description,
status="pending", owner=None,
blockedBy=blockedBy or [],
)
save_task(task)
return task
def save_task(task: Task):
_task_path(task.id).write_text(json.dumps(asdict(task), indent=2))
def load_task(task_id: str) -> Task:
return Task(**json.loads(_task_path(task_id).read_text()))
def list_tasks() -> list[Task]:
return [Task(**json.loads(p.read_text()))
for p in sorted(TASKS_DIR.glob("task_*.json"))]
def get_task(task_id: str) -> str:
"""Return full task details as JSON."""
task = load_task(task_id)
return json.dumps(asdict(task), indent=2)
def can_start(task_id: str) -> bool:
"""Check if all blockedBy dependencies are completed.
Missing dependencies are treated as blocked."""
task = load_task(task_id)
for dep_id in task.blockedBy:
if not _task_path(dep_id).exists():
return False
if load_task(dep_id).status != "completed":
return False
return True
def claim_task(task_id: str, owner: str = "agent") -> str:
task = load_task(task_id)
if task.status != "pending":
return f"Task {task_id} is {task.status}, cannot claim"
if not can_start(task_id):
deps = [d for d in task.blockedBy
if not _task_path(d).exists() or load_task(d).status != "completed"]
return f"Blocked by: {deps}"
task.owner = owner
task.status = "in_progress"
save_task(task)
print(f" \033[36m[claim] {task.subject} → in_progress (owner: {owner})\033[0m")
return f"Claimed {task.id} ({task.subject})"
def complete_task(task_id: str) -> str:
task = load_task(task_id)
if task.status != "in_progress":
return f"Task {task_id} is {task.status}, cannot complete"
task.status = "completed"
save_task(task)
unblocked = [t.subject for t in list_tasks()
if t.status == "pending" and t.blockedBy and can_start(t.id)]
print(f" \033[32m[complete] {task.subject}\033[0m")
msg = f"Completed {task.id} ({task.subject})"
if unblocked:
msg += f"\nUnblocked: {', '.join(unblocked)}"
print(f" \033[33m[unblocked] {', '.join(unblocked)}\033[0m")
return msg
# ── Prompt Assembly (from s10, synced) ──
PROMPT_SECTIONS = {
"identity": "You are a coding agent. Act, don't explain.",
"tools": "Available tools: bash, read_file, write_file, "
"create_task, list_tasks, get_task, claim_task, complete_task.",
"workspace": f"Working directory: {WORKDIR}",
"memory": "Relevant memories are injected below when available.",
}
def assemble_system_prompt(context: dict) -> str:
sections = [PROMPT_SECTIONS["identity"],
PROMPT_SECTIONS["tools"],
PROMPT_SECTIONS["workspace"]]
memories = context.get("memories", "")
if memories:
sections.append(f"Relevant memories:\n{memories}")
return "\n\n".join(sections)
_last_context_key, _last_prompt = None, None
def get_system_prompt(context: dict) -> str:
global _last_context_key, _last_prompt
key = json.dumps(context, sort_keys=True, ensure_ascii=False, default=str)
if key == _last_context_key and _last_prompt:
return _last_prompt
_last_context_key = key
_last_prompt = assemble_system_prompt(context)
return _last_prompt
# ── Tools ──
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str, run_in_background: bool = False) -> str:
# run_in_background is handled by agent_loop dispatch, not here
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int | None = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"
# Task tools
def run_create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> str:
task = create_task(subject, description, blockedBy)
deps = f" (blockedBy: {', '.join(blockedBy)})" if blockedBy else ""
print(f" \033[34m[create] {task.subject}{deps}\033[0m")
return f"Created {task.id}: {task.subject}{deps}"
def run_list_tasks() -> str:
tasks = list_tasks()
if not tasks:
return "No tasks. Use create_task to add some."
lines = []
for t in tasks:
icon = {"pending": "", "in_progress": "",
"completed": ""}.get(t.status, "?")
deps = f" (blockedBy: {', '.join(t.blockedBy)})" if t.blockedBy else ""
owner = f" [{t.owner}]" if t.owner else ""
lines.append(f" {icon} {t.id}: {t.subject} "
f"[{t.status}]{owner}{deps}")
return "\n".join(lines)
def run_get_task(task_id: str) -> str:
try:
return get_task(task_id)
except FileNotFoundError:
return f"Error: Task {task_id} not found"
def run_claim_task(task_id: str) -> str:
return claim_task(task_id, owner="agent")
def run_complete_task(task_id: str) -> str:
return complete_task(task_id)
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object",
"properties": {
"command": {"type": "string"},
"run_in_background": {"type": "boolean"}},
"required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"limit": {"type": "integer"}},
"required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"content": {"type": "string"}},
"required": ["path", "content"]}},
{"name": "create_task",
"description": "Create a new task with optional blockedBy dependencies.",
"input_schema": {"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
"blockedBy": {"type": "array",
"items": {"type": "string"}}},
"required": ["subject"]}},
{"name": "list_tasks",
"description": "List all tasks with status, owner, and dependencies.",
"input_schema": {"type": "object", "properties": {},
"required": []}},
{"name": "get_task",
"description": "Get full details of a specific task by ID.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "claim_task",
"description": "Claim a pending task. Sets owner, changes status to in_progress.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "complete_task",
"description": "Complete an in-progress task. Reports unblocked downstream tasks.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
]
TOOL_HANDLERS = {
"bash": run_bash, "read_file": run_read, "write_file": run_write,
"create_task": run_create_task, "list_tasks": run_list_tasks,
"get_task": run_get_task, "claim_task": run_claim_task,
"complete_task": run_complete_task,
}
# ── Background Tasks (s13 new) ──
_bg_counter = 0
background_tasks: dict[str, dict] = {} # bg_id → {tool_use_id, command, status}
background_results: dict[str, str] = {} # bg_id → output
background_lock = threading.Lock()
def is_slow_operation(tool_name: str, tool_input: dict) -> bool:
"""Fallback heuristic: commands likely to take > 30s."""
if tool_name != "bash":
return False
cmd = tool_input.get("command", "").lower()
slow_keywords = ["install", "build", "test", "deploy", "compile",
"docker build", "pip install", "npm install",
"cargo build", "pytest", "make"]
return any(kw in cmd for kw in slow_keywords)
def should_run_background(tool_name: str, tool_input: dict) -> bool:
"""Model explicit request takes priority; fallback to heuristic."""
if tool_input.get("run_in_background"):
return True
return is_slow_operation(tool_name, tool_input)
def execute_tool(block) -> str:
"""Execute a tool call block, return output."""
handler = TOOL_HANDLERS.get(block.name)
if handler:
return handler(**block.input)
return f"Unknown tool: {block.name}"
def start_background_task(block) -> str:
"""Run tool in a daemon thread. Returns background task ID."""
global _bg_counter
_bg_counter += 1
bg_id = f"bg_{_bg_counter:04d}"
cmd = block.input.get("command", block.name)
def worker():
result = execute_tool(block)
with background_lock:
background_tasks[bg_id]["status"] = "completed"
background_results[bg_id] = result
with background_lock:
background_tasks[bg_id] = {
"tool_use_id": block.id,
"command": cmd,
"status": "running",
}
thread = threading.Thread(target=worker, daemon=True)
thread.start()
print(f" \033[33m[background] dispatched {bg_id}: {cmd[:40]}\033[0m")
return bg_id
def collect_background_results() -> list[str]:
"""Collect completed background results as task_notification messages."""
with background_lock:
ready_ids = [bid for bid, task in background_tasks.items()
if task["status"] == "completed"]
notifications = []
for bg_id in ready_ids:
with background_lock:
task = background_tasks.pop(bg_id)
output = background_results.pop(bg_id, "")
summary = output[:200] if len(output) > 200 else output
notifications.append(
f"<task_notification>\n"
f" <task_id>{bg_id}</task_id>\n"
f" <status>completed</status>\n"
f" <command>{task['command']}</command>\n"
f" <summary>{summary}</summary>\n"
f"</task_notification>")
print(f" \033[32m[background done] {bg_id}: "
f"{task['command'][:40]} ({len(output)} chars)\033[0m")
return notifications
# ── Context ──
def update_context(context: dict, messages: list) -> dict:
"""Derive context from real state."""
memories = ""
if MEMORY_INDEX.exists():
content = MEMORY_INDEX.read_text().strip()
if content:
memories = content
return {
"enabled_tools": list(TOOL_HANDLERS.keys()),
"workspace": str(WORKDIR),
"memories": memories,
}
# ── Agent Loop (simplified, focused on background tasks) ──
def agent_loop(messages: list, context: dict):
system = get_system_prompt(context)
while True:
try:
response = client.messages.create(
model=MODEL, system=system, messages=messages,
tools=TOOLS, max_tokens=8000)
except Exception as e:
messages.append({"role": "assistant", "content": [
{"type": "text",
"text": f"[Error] {type(e).__name__}: {e}"}]})
return
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type != "tool_use":
continue
print(f"\033[36m> {block.name}\033[0m")
if should_run_background(block.name, block.input):
bg_id = start_background_task(block)
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": f"[Background task {bg_id} started] "
f"Command: {block.input.get('command', '')}. "
f"Result will be available when complete."})
else:
output = execute_tool(block)
print(str(output)[:300])
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": output})
# Inject background notifications + tool results in one user message
user_content = []
bg_notifications = collect_background_results()
if bg_notifications:
for notif in bg_notifications:
user_content.append({"type": "text", "text": notif})
print(f" \033[32m[inject] {len(bg_notifications)} background "
f"notification(s)\033[0m")
user_content.extend(results)
messages.append({"role": "user", "content": user_content})
context = update_context(context, messages)
system = get_system_prompt(context)
if __name__ == "__main__":
print("s13: background tasks")
print("Enter a question, press Enter to send. Type q to quit.\n")
history = []
context = update_context({}, [])
while True:
try:
query = input("\033[36ms13 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history, context)
context = update_context(context, history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text":
print(block.text)
print()