Files

479 lines
17 KiB
Python

#!/usr/bin/env python3
"""
s13: Background Tasks — thread-based async execution + notification injection.
Run: python s13_background_tasks/code.py
Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY
Changes from s12:
- threading.Thread for background execution
- background_tasks dict for lifecycle tracking (bg_id, command, status)
- background_results dict + threading.Lock for thread-safe storage
- should_run_background: model explicit request via run_in_background param
- is_slow_operation: fallback heuristic when model doesn't specify
- start_background_task: dispatch to daemon thread, return bg task id
- collect_background_results: gather completed, return as notifications
- agent_loop: slow ops → background + placeholder, inject notifications
- Notifications use <task_notification> format, not reused tool_use_id
Note: Teaching code keeps a basic agent loop to stay focused on background
tasks. S11's full error recovery (RecoveryState, backoff, escalation,
reactive compact, fallback model) is omitted.
"""
import os, subprocess, json, time, random, threading
from pathlib import Path
from dataclasses import dataclass, asdict
try:
import readline
readline.parse_and_bind('set bind-tty-special-chars off')
except ImportError:
pass
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
MEMORY_DIR = WORKDIR / ".memory"
MEMORY_INDEX = MEMORY_DIR / "MEMORY.md"
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
# ── Task System (from s12, synced) ──
TASKS_DIR = WORKDIR / ".tasks"
TASKS_DIR.mkdir(exist_ok=True)
@dataclass
class Task:
id: str
subject: str
description: str
status: str # pending | in_progress | completed
owner: str | None
blockedBy: list[str]
def _task_path(task_id: str) -> Path:
return TASKS_DIR / f"{task_id}.json"
def create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> Task:
task = Task(
id=f"task_{int(time.time())}_{random.randint(0, 9999):04d}",
subject=subject, description=description,
status="pending", owner=None,
blockedBy=blockedBy or [],
)
save_task(task)
return task
def save_task(task: Task):
_task_path(task.id).write_text(json.dumps(asdict(task), indent=2))
def load_task(task_id: str) -> Task:
return Task(**json.loads(_task_path(task_id).read_text()))
def list_tasks() -> list[Task]:
return [Task(**json.loads(p.read_text()))
for p in sorted(TASKS_DIR.glob("task_*.json"))]
def get_task(task_id: str) -> str:
"""Return full task details as JSON."""
task = load_task(task_id)
return json.dumps(asdict(task), indent=2)
def can_start(task_id: str) -> bool:
"""Check if all blockedBy dependencies are completed.
Missing dependencies are treated as blocked."""
task = load_task(task_id)
for dep_id in task.blockedBy:
if not _task_path(dep_id).exists():
return False
if load_task(dep_id).status != "completed":
return False
return True
def claim_task(task_id: str, owner: str = "agent") -> str:
task = load_task(task_id)
if task.status != "pending":
return f"Task {task_id} is {task.status}, cannot claim"
if not can_start(task_id):
deps = [d for d in task.blockedBy
if not _task_path(d).exists() or load_task(d).status != "completed"]
return f"Blocked by: {deps}"
task.owner = owner
task.status = "in_progress"
save_task(task)
print(f" \033[36m[claim] {task.subject} → in_progress (owner: {owner})\033[0m")
return f"Claimed {task.id} ({task.subject})"
def complete_task(task_id: str) -> str:
task = load_task(task_id)
if task.status != "in_progress":
return f"Task {task_id} is {task.status}, cannot complete"
task.status = "completed"
save_task(task)
unblocked = [t.subject for t in list_tasks()
if t.status == "pending" and t.blockedBy and can_start(t.id)]
print(f" \033[32m[complete] {task.subject}\033[0m")
msg = f"Completed {task.id} ({task.subject})"
if unblocked:
msg += f"\nUnblocked: {', '.join(unblocked)}"
print(f" \033[33m[unblocked] {', '.join(unblocked)}\033[0m")
return msg
# ── Prompt Assembly (from s10, synced) ──
PROMPT_SECTIONS = {
"identity": "You are a coding agent. Act, don't explain.",
"tools": "Available tools: bash, read_file, write_file, "
"create_task, list_tasks, get_task, claim_task, complete_task.",
"workspace": f"Working directory: {WORKDIR}",
"memory": "Relevant memories are injected below when available.",
}
def assemble_system_prompt(context: dict) -> str:
sections = [PROMPT_SECTIONS["identity"],
PROMPT_SECTIONS["tools"],
PROMPT_SECTIONS["workspace"]]
memories = context.get("memories", "")
if memories:
sections.append(f"Relevant memories:\n{memories}")
return "\n\n".join(sections)
_last_context_key, _last_prompt = None, None
def get_system_prompt(context: dict) -> str:
global _last_context_key, _last_prompt
key = json.dumps(context, sort_keys=True, ensure_ascii=False, default=str)
if key == _last_context_key and _last_prompt:
return _last_prompt
_last_context_key = key
_last_prompt = assemble_system_prompt(context)
return _last_prompt
# ── Tools ──
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str, run_in_background: bool = False) -> str:
# run_in_background is handled by agent_loop dispatch, not here
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int | None = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"
# Task tools
def run_create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> str:
task = create_task(subject, description, blockedBy)
deps = f" (blockedBy: {', '.join(blockedBy)})" if blockedBy else ""
print(f" \033[34m[create] {task.subject}{deps}\033[0m")
return f"Created {task.id}: {task.subject}{deps}"
def run_list_tasks() -> str:
tasks = list_tasks()
if not tasks:
return "No tasks. Use create_task to add some."
lines = []
for t in tasks:
icon = {"pending": "", "in_progress": "",
"completed": ""}.get(t.status, "?")
deps = f" (blockedBy: {', '.join(t.blockedBy)})" if t.blockedBy else ""
owner = f" [{t.owner}]" if t.owner else ""
lines.append(f" {icon} {t.id}: {t.subject} "
f"[{t.status}]{owner}{deps}")
return "\n".join(lines)
def run_get_task(task_id: str) -> str:
try:
return get_task(task_id)
except FileNotFoundError:
return f"Error: Task {task_id} not found"
def run_claim_task(task_id: str) -> str:
return claim_task(task_id, owner="agent")
def run_complete_task(task_id: str) -> str:
return complete_task(task_id)
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object",
"properties": {
"command": {"type": "string"},
"run_in_background": {"type": "boolean"}},
"required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"limit": {"type": "integer"}},
"required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"content": {"type": "string"}},
"required": ["path", "content"]}},
{"name": "create_task",
"description": "Create a new task with optional blockedBy dependencies.",
"input_schema": {"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
"blockedBy": {"type": "array",
"items": {"type": "string"}}},
"required": ["subject"]}},
{"name": "list_tasks",
"description": "List all tasks with status, owner, and dependencies.",
"input_schema": {"type": "object", "properties": {},
"required": []}},
{"name": "get_task",
"description": "Get full details of a specific task by ID.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "claim_task",
"description": "Claim a pending task. Sets owner, changes status to in_progress.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "complete_task",
"description": "Complete an in-progress task. Reports unblocked downstream tasks.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
]
TOOL_HANDLERS = {
"bash": run_bash, "read_file": run_read, "write_file": run_write,
"create_task": run_create_task, "list_tasks": run_list_tasks,
"get_task": run_get_task, "claim_task": run_claim_task,
"complete_task": run_complete_task,
}
# ── Background Tasks (s13 new) ──
_bg_counter = 0
background_tasks: dict[str, dict] = {} # bg_id → {tool_use_id, command, status}
background_results: dict[str, str] = {} # bg_id → output
background_lock = threading.Lock()
def is_slow_operation(tool_name: str, tool_input: dict) -> bool:
"""Fallback heuristic: commands likely to take > 30s."""
if tool_name != "bash":
return False
cmd = tool_input.get("command", "").lower()
slow_keywords = ["install", "build", "test", "deploy", "compile",
"docker build", "pip install", "npm install",
"cargo build", "pytest", "make"]
return any(kw in cmd for kw in slow_keywords)
def should_run_background(tool_name: str, tool_input: dict) -> bool:
"""Model explicit request takes priority; fallback to heuristic."""
if tool_input.get("run_in_background"):
return True
return is_slow_operation(tool_name, tool_input)
def execute_tool(block) -> str:
"""Execute a tool call block, return output."""
handler = TOOL_HANDLERS.get(block.name)
if handler:
return handler(**block.input)
return f"Unknown tool: {block.name}"
def start_background_task(block) -> str:
"""Run tool in a daemon thread. Returns background task ID."""
global _bg_counter
_bg_counter += 1
bg_id = f"bg_{_bg_counter:04d}"
cmd = block.input.get("command", block.name)
def worker():
result = execute_tool(block)
with background_lock:
background_tasks[bg_id]["status"] = "completed"
background_results[bg_id] = result
with background_lock:
background_tasks[bg_id] = {
"tool_use_id": block.id,
"command": cmd,
"status": "running",
}
thread = threading.Thread(target=worker, daemon=True)
thread.start()
print(f" \033[33m[background] dispatched {bg_id}: {cmd[:40]}\033[0m")
return bg_id
def collect_background_results() -> list[str]:
"""Collect completed background results as task_notification messages."""
with background_lock:
ready_ids = [bid for bid, task in background_tasks.items()
if task["status"] == "completed"]
notifications = []
for bg_id in ready_ids:
with background_lock:
task = background_tasks.pop(bg_id)
output = background_results.pop(bg_id, "")
summary = output[:200] if len(output) > 200 else output
notifications.append(
f"<task_notification>\n"
f" <task_id>{bg_id}</task_id>\n"
f" <status>completed</status>\n"
f" <command>{task['command']}</command>\n"
f" <summary>{summary}</summary>\n"
f"</task_notification>")
print(f" \033[32m[background done] {bg_id}: "
f"{task['command'][:40]} ({len(output)} chars)\033[0m")
return notifications
# ── Context ──
def update_context(context: dict, messages: list) -> dict:
"""Derive context from real state."""
memories = ""
if MEMORY_INDEX.exists():
content = MEMORY_INDEX.read_text().strip()
if content:
memories = content
return {
"enabled_tools": list(TOOL_HANDLERS.keys()),
"workspace": str(WORKDIR),
"memories": memories,
}
# ── Agent Loop (simplified, focused on background tasks) ──
def agent_loop(messages: list, context: dict):
system = get_system_prompt(context)
while True:
try:
response = client.messages.create(
model=MODEL, system=system, messages=messages,
tools=TOOLS, max_tokens=8000)
except Exception as e:
messages.append({"role": "assistant", "content": [
{"type": "text",
"text": f"[Error] {type(e).__name__}: {e}"}]})
return
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type != "tool_use":
continue
print(f"\033[36m> {block.name}\033[0m")
if should_run_background(block.name, block.input):
bg_id = start_background_task(block)
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": f"[Background task {bg_id} started] "
f"Command: {block.input.get('command', '')}. "
f"Result will be available when complete."})
else:
output = execute_tool(block)
print(str(output)[:300])
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": output})
# Inject tool results + background notifications in one user message
user_content = list(results)
bg_notifications = collect_background_results()
if bg_notifications:
for notif in bg_notifications:
user_content.append({"type": "text", "text": notif})
print(f" \033[32m[inject] {len(bg_notifications)} background "
f"notification(s)\033[0m")
messages.append({"role": "user", "content": user_content})
context = update_context(context, messages)
system = get_system_prompt(context)
if __name__ == "__main__":
print("s13: background tasks")
print("Enter a question, press Enter to send. Type q to quit.\n")
history = []
context = update_context({}, [])
while True:
try:
query = input("\033[36ms13 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history, context)
context = update_context(context, history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text":
print(block.text)
print()