#!/usr/bin/env python3 """ s11: Error Recovery — three recovery paths + exponential backoff. Run: python s11_error_recovery/code.py Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY Changes from s10: - LLM call wrapped in try/except with three recovery paths - Path 1: max_tokens -> escalate 8K->64K (no append on first escalation), then continuation prompt (max 3) - Path 2: prompt_too_long -> reactive compact -> retry (once) - Path 3: 429/529 -> exponential backoff with jitter (max 10), fallback model on consecutive 529 - with_retry wrapper for transient errors - RecoveryState tracks escalation / compact / 529 / model ASCII flow: messages -> prompt assembly -> compress+load -> [try] LLM [except] -> tools -> loop | | stop_reason error type max_tokens? prompt_too_long? -> compact escalate / 429/529? -> backoff continue other? -> log + exit """ import os, subprocess, time, random, json from pathlib import Path try: import readline readline.parse_and_bind('set bind-tty-special-chars off') except ImportError: pass from anthropic import Anthropic from dotenv import load_dotenv load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() MEMORY_DIR = WORKDIR / ".memory" MEMORY_INDEX = MEMORY_DIR / "MEMORY.md" client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) PRIMARY_MODEL = os.environ["MODEL_ID"] FALLBACK_MODEL = os.getenv("FALLBACK_MODEL_ID") # ── Constants ── ESCALATED_MAX_TOKENS = 64000 DEFAULT_MAX_TOKENS = 8000 MAX_RECOVERY_RETRIES = 3 MAX_RETRIES = 10 BASE_DELAY_MS = 500 MAX_CONSECUTIVE_529 = 3 CONTINUATION_PROMPT = ( "Output token limit hit. Resume directly — " "no apology, no recap. Pick up mid-thought." ) # ── Prompt Assembly (from s10, synced) ── PROMPT_SECTIONS = { "identity": "You are a coding agent. Act, don't explain.", "tools": "Available tools: bash, read_file, write_file.", "workspace": f"Working directory: {WORKDIR}", "memory": "Relevant memories are injected below when available.", } def assemble_system_prompt(context: dict) -> str: sections = [PROMPT_SECTIONS["identity"], PROMPT_SECTIONS["tools"], PROMPT_SECTIONS["workspace"]] memories = context.get("memories", "") if memories: sections.append(f"Relevant memories:\n{memories}") return "\n\n".join(sections) _last_context_key, _last_prompt = None, None def get_system_prompt(context: dict) -> str: global _last_context_key, _last_prompt key = json.dumps(context, sort_keys=True, ensure_ascii=False, default=str) if key == _last_context_key and _last_prompt: print(" \033[90m[cache hit] system prompt unchanged\033[0m") return _last_prompt _last_context_key = key _last_prompt = assemble_system_prompt(context) loaded = ["identity", "tools", "workspace"] if context.get("memories"): loaded.append("memory") print(f" \033[32m[assembled] sections: {', '.join(loaded)}\033[0m") return _last_prompt # ── Tools (unchanged) ── def safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str) -> str: try: r = subprocess.run(command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int | None = None) -> str: try: lines = safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"] return "\n".join(lines) except Exception as e: return f"Error: {e}" def run_write(path: str, content: str) -> str: try: file_path = safe_path(path) file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(content) return f"Wrote {len(content)} bytes to {path}" except Exception as e: return f"Error: {e}" TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to a file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, ] TOOL_HANDLERS = {"bash": run_bash, "read_file": run_read, "write_file": run_write} # ── Error Recovery (s11 new) ── class RecoveryState: """Track recovery attempts across the loop.""" def __init__(self): self.has_escalated = False self.recovery_count = 0 self.consecutive_529 = 0 self.has_attempted_reactive_compact = False self.current_model = PRIMARY_MODEL def retry_delay(attempt, retry_after=None): """Exponential backoff with jitter. Retry-After takes priority.""" if retry_after: return retry_after base = min(BASE_DELAY_MS * (2 ** attempt), 32000) / 1000 jitter = random.uniform(0, base * 0.25) return base + jitter def with_retry(fn, state: RecoveryState): """Exponential backoff for transient errors (429/529). Non-transient errors are re-raised for the outer handler.""" for attempt in range(MAX_RETRIES): try: result = fn() state.consecutive_529 = 0 return result except Exception as e: name = type(e).__name__ msg = str(e).lower() # 429 rate limit -> exponential backoff if "ratelimit" in name.lower() or "429" in msg: delay = retry_delay(attempt) print(f" \033[33m[429 rate limit] retry {attempt+1}/{MAX_RETRIES}," f" wait {delay:.1f}s\033[0m") time.sleep(delay) continue # 529 overloaded -> exponential backoff + fallback model if "overloaded" in name.lower() or "529" in msg or "overloaded" in msg: state.consecutive_529 += 1 if state.consecutive_529 >= MAX_CONSECUTIVE_529: if FALLBACK_MODEL: state.current_model = FALLBACK_MODEL state.consecutive_529 = 0 print(f" \033[31m[529 x{MAX_CONSECUTIVE_529}]" f" switching to {FALLBACK_MODEL}\033[0m") else: state.consecutive_529 = 0 print(f" \033[31m[529 x{MAX_CONSECUTIVE_529}]" f" no FALLBACK_MODEL_ID configured, continuing retry\033[0m") delay = retry_delay(attempt) print(f" \033[33m[529 overloaded] retry {attempt+1}/{MAX_RETRIES}," f" wait {delay:.1f}s\033[0m") time.sleep(delay) continue # Not transient -> re-raise for outer try/except raise raise RuntimeError(f"Max retries ({MAX_RETRIES}) exceeded") def is_prompt_too_long_error(e: Exception) -> bool: """Check whether an API error indicates prompt/context too long.""" msg = str(e).lower() return (("prompt" in msg and "long" in msg) or "prompt_is_too_long" in msg or "context_length_exceeded" in msg or "max_context_window" in msg) def reactive_compact(messages: list) -> list: """Emergency compact — teaching version keeps last N messages. Real CC generates a compact summary via LLM, then retries with the compacted message list. Teaching version simplifies to tail retention since s08/s09 already cover LLM-based compact.""" print(" \033[31m[reactive compact] trimming to last 5 messages\033[0m") tail = messages[-5:] return [{"role": "user", "content": "[Reactive compact] Earlier conversation trimmed. " "Continue from where you left off."}, *tail] # ── Context ── def update_context(context: dict, messages: list) -> dict: """Derive context from real state: which tools exist, whether memory files exist.""" memories = "" if MEMORY_INDEX.exists(): content = MEMORY_INDEX.read_text().strip() if content: memories = content return { "enabled_tools": list(TOOL_HANDLERS.keys()), "workspace": str(WORKDIR), "memories": memories, } # ── Agent Loop ── def agent_loop(messages: list, context: dict): """Main loop with error recovery wrapping LLM calls.""" system = get_system_prompt(context) state = RecoveryState() max_tokens = DEFAULT_MAX_TOKENS while True: # ── LLM call: with_retry handles 429/529, outer handles rest ── try: response = with_retry( lambda mt=max_tokens, mdl=state.current_model: client.messages.create( model=mdl, system=system, messages=messages, tools=TOOLS, max_tokens=mt), state) except Exception as e: # Path 2: prompt_too_long -> reactive compact (once) if is_prompt_too_long_error(e): if not state.has_attempted_reactive_compact: messages[:] = reactive_compact(messages) state.has_attempted_reactive_compact = True continue print(" \033[31m[unrecoverable] still too long after compact\033[0m") messages.append({"role": "assistant", "content": [ {"type": "text", "text": "[Error] Context too large, cannot continue."}]}) return # Unrecoverable name = type(e).__name__ print(f" \033[31m[unrecoverable] {name}: {str(e)[:100]}\033[0m") messages.append({"role": "assistant", "content": [ {"type": "text", "text": f"[Error] {name}: {str(e)[:200]}"}]}) return # ── Path 1: max_tokens -> escalate or continue ── if response.stop_reason == "max_tokens": # First escalation: don't append truncated output, retry same request if not state.has_escalated: max_tokens = ESCALATED_MAX_TOKENS state.has_escalated = True print(f" \033[33m[max_tokens] escalating" f" {DEFAULT_MAX_TOKENS} -> {ESCALATED_MAX_TOKENS}\033[0m") continue # 64K still truncated: save truncated output + continuation prompt messages.append({"role": "assistant", "content": response.content}) if state.recovery_count < MAX_RECOVERY_RETRIES: messages.append({"role": "user", "content": CONTINUATION_PROMPT}) state.recovery_count += 1 print(f" \033[33m[max_tokens] continuation" f" {state.recovery_count}/{MAX_RECOVERY_RETRIES}\033[0m") continue print(" \033[31m[max_tokens] recovery limit reached\033[0m") return # Normal completion: append assistant response messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": return # ── Tool execution ── results = [] for block in response.content: if block.type != "tool_use": continue print(f"\033[36m> {block.name}\033[0m") handler = TOOL_HANDLERS.get(block.name) output = handler(**block.input) if handler else f"Unknown: {block.name}" print(str(output)[:200]) results.append({"type": "tool_result", "tool_use_id": block.id, "content": output}) messages.append({"role": "user", "content": results}) context = update_context(context, messages) system = get_system_prompt(context) if __name__ == "__main__": print("s11: error recovery") print("Enter a question, press Enter to send. Type q to quit.\n") history = [] context = update_context({}, []) while True: try: query = input("\033[36ms11 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break history.append({"role": "user", "content": query}) agent_loop(history, context) context = update_context(context, history) for block in history[-1]["content"]: if getattr(block, "type", None) == "text": print(block.text) print()