#!/usr/bin/env python3 """ s12_worktree_task_isolation.py - Worktree + Task Isolation Directory-level isolation for parallel task execution. Tasks are the control plane and worktrees are the execution plane. .tasks/task_12.json { "id": 12, "subject": "Implement auth refactor", "status": "in_progress", "worktree": "auth-refactor" } .worktrees/index.json { "worktrees": [ { "name": "auth-refactor", "path": ".../.worktrees/auth-refactor", "branch": "wt/auth-refactor", "task_id": 12, "status": "active" } ] } Key insight: "Isolate by directory, coordinate by task ID." """ import json import os import re import subprocess import time from pathlib import Path from anthropic import Anthropic from dotenv import load_dotenv load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) MODEL = os.environ["MODEL_ID"] def detect_repo_root(cwd: Path) -> Path | None: """Return git repo root if cwd is inside a repo, else None.""" try: r = subprocess.run( ["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True, timeout=10, ) if r.returncode != 0: return None root = Path(r.stdout.strip()) return root if root.exists() else None except Exception: return None REPO_ROOT = detect_repo_root(WORKDIR) or WORKDIR SYSTEM = ( f"You are a coding agent at {WORKDIR}. " "Use task + worktree tools for multi-task work. " "For parallel or risky changes: create tasks, allocate worktree lanes, " "run commands in those lanes, then choose keep/remove for closeout. " "Use worktree_events when you need lifecycle visibility." ) # -- EventBus: append-only lifecycle events for observability -- class EventBus: def __init__(self, event_log_path: Path): self.path = event_log_path self.path.parent.mkdir(parents=True, exist_ok=True) if not self.path.exists(): self.path.write_text("") def emit( self, event: str, task: dict | None = None, worktree: dict | None = None, error: str | None = None, ): payload = { "event": event, "ts": time.time(), "task": task or {}, "worktree": worktree or {}, } if error: payload["error"] = error with self.path.open("a", encoding="utf-8") as f: f.write(json.dumps(payload) + "\n") def list_recent(self, limit: int = 20) -> str: n = max(1, min(int(limit or 20), 200)) lines = self.path.read_text(encoding="utf-8").splitlines() recent = lines[-n:] items = [] for line in recent: try: items.append(json.loads(line)) except Exception: items.append({"event": "parse_error", "raw": line}) return json.dumps(items, indent=2) # -- TaskManager: persistent task board with optional worktree binding -- class TaskManager: def __init__(self, tasks_dir: Path): self.dir = tasks_dir self.dir.mkdir(parents=True, exist_ok=True) self._next_id = self._max_id() + 1 def _max_id(self) -> int: ids = [] for f in self.dir.glob("task_*.json"): try: ids.append(int(f.stem.split("_")[1])) except Exception: pass return max(ids) if ids else 0 def _path(self, task_id: int) -> Path: return self.dir / f"task_{task_id}.json" def _load(self, task_id: int) -> dict: path = self._path(task_id) if not path.exists(): raise ValueError(f"Task {task_id} not found") return json.loads(path.read_text()) def _save(self, task: dict): self._path(task["id"]).write_text(json.dumps(task, indent=2)) def create(self, subject: str, description: str = "") -> str: task = { "id": self._next_id, "subject": subject, "description": description, "status": "pending", "owner": "", "worktree": "", "blockedBy": [], "created_at": time.time(), "updated_at": time.time(), } self._save(task) self._next_id += 1 return json.dumps(task, indent=2) def get(self, task_id: int) -> str: return json.dumps(self._load(task_id), indent=2) def exists(self, task_id: int) -> bool: return self._path(task_id).exists() def update(self, task_id: int, status: str = None, owner: str = None) -> str: task = self._load(task_id) if status: if status not in ("pending", "in_progress", "completed"): raise ValueError(f"Invalid status: {status}") task["status"] = status if owner is not None: task["owner"] = owner task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str: task = self._load(task_id) task["worktree"] = worktree if owner: task["owner"] = owner if task["status"] == "pending": task["status"] = "in_progress" task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def unbind_worktree(self, task_id: int) -> str: task = self._load(task_id) task["worktree"] = "" task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2) def list_all(self) -> str: tasks = [] for f in sorted(self.dir.glob("task_*.json")): tasks.append(json.loads(f.read_text())) if not tasks: return "No tasks." lines = [] for t in tasks: marker = { "pending": "[ ]", "in_progress": "[>]", "completed": "[x]", }.get(t["status"], "[?]") owner = f" owner={t['owner']}" if t.get("owner") else "" wt = f" wt={t['worktree']}" if t.get("worktree") else "" lines.append(f"{marker} #{t['id']}: {t['subject']}{owner}{wt}") return "\n".join(lines) TASKS = TaskManager(REPO_ROOT / ".tasks") EVENTS = EventBus(REPO_ROOT / ".worktrees" / "events.jsonl") # -- WorktreeManager: create/list/run/remove git worktrees + lifecycle index -- class WorktreeManager: def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus): self.repo_root = repo_root self.tasks = tasks self.events = events self.dir = repo_root / ".worktrees" self.dir.mkdir(parents=True, exist_ok=True) self.index_path = self.dir / "index.json" if not self.index_path.exists(): self.index_path.write_text(json.dumps({"worktrees": []}, indent=2)) self.git_available = self._is_git_repo() def _is_git_repo(self) -> bool: try: r = subprocess.run( ["git", "rev-parse", "--is-inside-work-tree"], cwd=self.repo_root, capture_output=True, text=True, timeout=10, ) return r.returncode == 0 except Exception: return False def _run_git(self, args: list[str]) -> str: if not self.git_available: raise RuntimeError("Not in a git repository. worktree tools require git.") r = subprocess.run( ["git", *args], cwd=self.repo_root, capture_output=True, text=True, timeout=120, ) if r.returncode != 0: msg = (r.stdout + r.stderr).strip() raise RuntimeError(msg or f"git {' '.join(args)} failed") return (r.stdout + r.stderr).strip() or "(no output)" def _load_index(self) -> dict: return json.loads(self.index_path.read_text()) def _save_index(self, data: dict): self.index_path.write_text(json.dumps(data, indent=2)) def _find(self, name: str) -> dict | None: idx = self._load_index() for wt in idx.get("worktrees", []): if wt.get("name") == name: return wt return None def _validate_name(self, name: str): if not re.fullmatch(r"[A-Za-z0-9._-]{1,40}", name or ""): raise ValueError( "Invalid worktree name. Use 1-40 chars: letters, numbers, ., _, -" ) def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str: self._validate_name(name) if self._find(name): raise ValueError(f"Worktree '{name}' already exists in index") if task_id is not None and not self.tasks.exists(task_id): raise ValueError(f"Task {task_id} not found") path = self.dir / name branch = f"wt/{name}" self.events.emit( "worktree.create.before", task={"id": task_id} if task_id is not None else {}, worktree={"name": name, "base_ref": base_ref}, ) try: self._run_git(["worktree", "add", "-b", branch, str(path), base_ref]) entry = { "name": name, "path": str(path), "branch": branch, "task_id": task_id, "status": "active", "created_at": time.time(), } idx = self._load_index() idx["worktrees"].append(entry) self._save_index(idx) if task_id is not None: self.tasks.bind_worktree(task_id, name) self.events.emit( "worktree.create.after", task={"id": task_id} if task_id is not None else {}, worktree={ "name": name, "path": str(path), "branch": branch, "status": "active", }, ) return json.dumps(entry, indent=2) except Exception as e: self.events.emit( "worktree.create.failed", task={"id": task_id} if task_id is not None else {}, worktree={"name": name, "base_ref": base_ref}, error=str(e), ) raise def list_all(self) -> str: idx = self._load_index() wts = idx.get("worktrees", []) if not wts: return "No worktrees in index." lines = [] for wt in wts: suffix = f" task={wt['task_id']}" if wt.get("task_id") else "" lines.append( f"[{wt.get('status', 'unknown')}] {wt['name']} -> " f"{wt['path']} ({wt.get('branch', '-')}){suffix}" ) return "\n".join(lines) def status(self, name: str) -> str: wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" path = Path(wt["path"]) if not path.exists(): return f"Error: Worktree path missing: {path}" r = subprocess.run( ["git", "status", "--short", "--branch"], cwd=path, capture_output=True, text=True, timeout=60, ) text = (r.stdout + r.stderr).strip() return text or "Clean worktree" def run(self, name: str, command: str) -> str: dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" path = Path(wt["path"]) if not path.exists(): return f"Error: Worktree path missing: {path}" try: r = subprocess.run( command, shell=True, cwd=path, capture_output=True, text=True, timeout=300, ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (300s)" def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str: wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" self.events.emit( "worktree.remove.before", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path")}, ) try: args = ["worktree", "remove"] if force: args.append("--force") args.append(wt["path"]) self._run_git(args) if complete_task and wt.get("task_id") is not None: task_id = wt["task_id"] before = json.loads(self.tasks.get(task_id)) self.tasks.update(task_id, status="completed") self.tasks.unbind_worktree(task_id) self.events.emit( "task.completed", task={ "id": task_id, "subject": before.get("subject", ""), "status": "completed", }, worktree={"name": name}, ) idx = self._load_index() for item in idx.get("worktrees", []): if item.get("name") == name: item["status"] = "removed" item["removed_at"] = time.time() self._save_index(idx) self.events.emit( "worktree.remove.after", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path"), "status": "removed"}, ) return f"Removed worktree '{name}'" except Exception as e: self.events.emit( "worktree.remove.failed", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={"name": name, "path": wt.get("path")}, error=str(e), ) raise def keep(self, name: str) -> str: wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" idx = self._load_index() kept = None for item in idx.get("worktrees", []): if item.get("name") == name: item["status"] = "kept" item["kept_at"] = time.time() kept = item self._save_index(idx) self.events.emit( "worktree.keep", task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {}, worktree={ "name": name, "path": wt.get("path"), "status": "kept", }, ) return json.dumps(kept, indent=2) if kept else f"Error: Unknown worktree '{name}'" WORKTREES = WorktreeManager(REPO_ROOT, TASKS, EVENTS) # -- Base tools (kept minimal, same style as previous sessions) -- def safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str) -> str: dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run( command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120, ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int = None) -> str: try: lines = safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def run_write(path: str, content: str) -> str: try: fp = safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def run_edit(path: str, old_text: str, new_text: str) -> str: try: fp = safe_path(path) c = fp.read_text() if old_text not in c: return f"Error: Text not found in {path}" fp.write_text(c.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" TOOL_HANDLERS = { "bash": lambda **kw: run_bash(kw["command"]), "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")), "task_list": lambda **kw: TASKS.list_all(), "task_get": lambda **kw: TASKS.get(kw["task_id"]), "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("owner")), "task_bind_worktree": lambda **kw: TASKS.bind_worktree(kw["task_id"], kw["worktree"], kw.get("owner", "")), "worktree_create": lambda **kw: WORKTREES.create(kw["name"], kw.get("task_id"), kw.get("base_ref", "HEAD")), "worktree_list": lambda **kw: WORKTREES.list_all(), "worktree_status": lambda **kw: WORKTREES.status(kw["name"]), "worktree_run": lambda **kw: WORKTREES.run(kw["name"], kw["command"]), "worktree_keep": lambda **kw: WORKTREES.keep(kw["name"]), "worktree_remove": lambda **kw: WORKTREES.remove(kw["name"], kw.get("force", False), kw.get("complete_task", False)), "worktree_events": lambda **kw: EVENTS.list_recent(kw.get("limit", 20)), } TOOLS = [ { "name": "bash", "description": "Run a shell command in the current workspace (blocking).", "input_schema": { "type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"], }, }, { "name": "read_file", "description": "Read file contents.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "limit": {"type": "integer"}, }, "required": ["path"], }, }, { "name": "write_file", "description": "Write content to file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, { "name": "edit_file", "description": "Replace exact text in file.", "input_schema": { "type": "object", "properties": { "path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}, }, "required": ["path", "old_text", "new_text"], }, }, { "name": "task_create", "description": "Create a new task on the shared task board.", "input_schema": { "type": "object", "properties": { "subject": {"type": "string"}, "description": {"type": "string"}, }, "required": ["subject"], }, }, { "name": "task_list", "description": "List all tasks with status, owner, and worktree binding.", "input_schema": {"type": "object", "properties": {}}, }, { "name": "task_get", "description": "Get task details by ID.", "input_schema": { "type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"], }, }, { "name": "task_update", "description": "Update task status or owner.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "integer"}, "status": { "type": "string", "enum": ["pending", "in_progress", "completed"], }, "owner": {"type": "string"}, }, "required": ["task_id"], }, }, { "name": "task_bind_worktree", "description": "Bind a task to a worktree name.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "integer"}, "worktree": {"type": "string"}, "owner": {"type": "string"}, }, "required": ["task_id", "worktree"], }, }, { "name": "worktree_create", "description": "Create a git worktree and optionally bind it to a task.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "task_id": {"type": "integer"}, "base_ref": {"type": "string"}, }, "required": ["name"], }, }, { "name": "worktree_list", "description": "List worktrees tracked in .worktrees/index.json.", "input_schema": {"type": "object", "properties": {}}, }, { "name": "worktree_status", "description": "Show git status for one worktree.", "input_schema": { "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"], }, }, { "name": "worktree_run", "description": "Run a shell command in a named worktree directory.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "command": {"type": "string"}, }, "required": ["name", "command"], }, }, { "name": "worktree_remove", "description": "Remove a worktree and optionally mark its bound task completed.", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "force": {"type": "boolean"}, "complete_task": {"type": "boolean"}, }, "required": ["name"], }, }, { "name": "worktree_keep", "description": "Mark a worktree as kept in lifecycle state without removing it.", "input_schema": { "type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"], }, }, { "name": "worktree_events", "description": "List recent worktree/task lifecycle events from .worktrees/events.jsonl.", "input_schema": { "type": "object", "properties": {"limit": {"type": "integer"}}, }, }, ] def agent_loop(messages: list): while True: response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": return results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}: {str(output)[:200]}") results.append( { "type": "tool_result", "tool_use_id": block.id, "content": str(output), } ) messages.append({"role": "user", "content": results}) if __name__ == "__main__": print(f"Repo root for s12: {REPO_ROOT}") if not WORKTREES.git_available: print("Note: Not in a git repo. worktree_* tools will return errors.") history = [] while True: try: query = input("\033[36ms12 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break history.append({"role": "user", "content": query}) agent_loop(history) response_content = history[-1]["content"] if isinstance(response_content, list): for block in response_content: if hasattr(block, "text"): print(block.text) print()