#!/usr/bin/env python3 """ s06_context_compact.py - Compact Three-layer compression pipeline so the agent can work forever: Every turn: +------------------+ | Tool call result | +------------------+ | v [Layer 1: micro_compact] (silent, every turn) Replace tool_result content older than last 3 with "[Previous: used {tool_name}]" | v [Check: tokens > 50000?] | | no yes | | v v continue [Layer 2: auto_compact] Save full transcript to .transcripts/ Ask LLM to summarize conversation. Replace all messages with [summary]. | v [Layer 3: compact tool] Model calls compact -> immediate summarization. Same as auto, triggered manually. Key insight: "The agent can forget strategically and keep working forever." """ import json import os import subprocess import time from pathlib import Path from anthropic import Anthropic from dotenv import load_dotenv load_dotenv(override=True) if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) WORKDIR = Path.cwd() client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL")) MODEL = os.environ["MODEL_ID"] SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks." THRESHOLD = 50000 TRANSCRIPT_DIR = WORKDIR / ".transcripts" KEEP_RECENT = 3 def estimate_tokens(messages: list) -> int: """Rough token count: ~4 chars per token.""" return len(str(messages)) // 4 # -- Layer 1: micro_compact - replace old tool results with placeholders -- def micro_compact(messages: list) -> list: # Collect (msg_index, part_index, tool_result_dict) for all tool_result entries tool_results = [] for msg_idx, msg in enumerate(messages): if msg["role"] == "user" and isinstance(msg.get("content"), list): for part_idx, part in enumerate(msg["content"]): if isinstance(part, dict) and part.get("type") == "tool_result": tool_results.append((msg_idx, part_idx, part)) if len(tool_results) <= KEEP_RECENT: return messages # Find tool_name for each result by matching tool_use_id in prior assistant messages tool_name_map = {} for msg in messages: if msg["role"] == "assistant": content = msg.get("content", []) if isinstance(content, list): for block in content: if hasattr(block, "type") and block.type == "tool_use": tool_name_map[block.id] = block.name # Clear old results (keep last KEEP_RECENT) to_clear = tool_results[:-KEEP_RECENT] for _, _, result in to_clear: if isinstance(result.get("content"), str) and len(result["content"]) > 100: tool_id = result.get("tool_use_id", "") tool_name = tool_name_map.get(tool_id, "unknown") result["content"] = f"[Previous: used {tool_name}]" return messages # -- Layer 2: auto_compact - save transcript, summarize, replace messages -- def auto_compact(messages: list) -> list: # Save full transcript to disk TRANSCRIPT_DIR.mkdir(exist_ok=True) transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl" with open(transcript_path, "w") as f: for msg in messages: f.write(json.dumps(msg, default=str) + "\n") print(f"[transcript saved: {transcript_path}]") # Ask LLM to summarize conversation_text = json.dumps(messages, default=str)[:80000] response = client.messages.create( model=MODEL, messages=[{"role": "user", "content": "Summarize this conversation for continuity. Include: " "1) What was accomplished, 2) Current state, 3) Key decisions made. " "Be concise but preserve critical details.\n\n" + conversation_text}], max_tokens=2000, ) summary = response.content[0].text # Replace all messages with compressed summary return [ {"role": "user", "content": f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"}, {"role": "assistant", "content": "Understood. I have the context from the summary. Continuing."}, ] # -- Tool implementations -- def safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}") return path def run_bash(command: str) -> str: dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: r = subprocess.run(command, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" def run_read(path: str, limit: int = None) -> str: try: lines = safe_path(path).read_text().splitlines() if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more)"] return "\n".join(lines)[:50000] except Exception as e: return f"Error: {e}" def run_write(path: str, content: str) -> str: try: fp = safe_path(path) fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content) return f"Wrote {len(content)} bytes" except Exception as e: return f"Error: {e}" def run_edit(path: str, old_text: str, new_text: str) -> str: try: fp = safe_path(path) content = fp.read_text() if old_text not in content: return f"Error: Text not found in {path}" fp.write_text(content.replace(old_text, new_text, 1)) return f"Edited {path}" except Exception as e: return f"Error: {e}" TOOL_HANDLERS = { "bash": lambda **kw: run_bash(kw["command"]), "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), "compact": lambda **kw: "Manual compression requested.", } TOOLS = [ {"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, {"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, {"name": "write_file", "description": "Write content to file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, {"name": "edit_file", "description": "Replace exact text in file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, {"name": "compact", "description": "Trigger manual conversation compression.", "input_schema": {"type": "object", "properties": {"focus": {"type": "string", "description": "What to preserve in the summary"}}}}, ] def agent_loop(messages: list): while True: # Layer 1: micro_compact before each LLM call micro_compact(messages) # Layer 2: auto_compact if token estimate exceeds threshold if estimate_tokens(messages) > THRESHOLD: print("[auto_compact triggered]") messages[:] = auto_compact(messages) response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": return results = [] manual_compact = False for block in response.content: if block.type == "tool_use": if block.name == "compact": manual_compact = True output = "Compressing..." else: handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}: {str(output)[:200]}") results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)}) messages.append({"role": "user", "content": results}) # Layer 3: manual compact triggered by the compact tool if manual_compact: print("[manual compact]") messages[:] = auto_compact(messages) if __name__ == "__main__": history = [] while True: try: query = input("\033[36ms06 >> \033[0m") except (EOFError, KeyboardInterrupt): break if query.strip().lower() in ("q", "exit", ""): break history.append({"role": "user", "content": query}) agent_loop(history) print()