mirror of
https://github.com/shareAI-lab/analysis_claude_code.git
synced 2026-03-22 10:25:41 +08:00
248 lines
9.5 KiB
Python
248 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
s06_context_compact.py - Compact
|
|
|
|
Three-layer compression pipeline so the agent can work forever:
|
|
|
|
Every turn:
|
|
+------------------+
|
|
| Tool call result |
|
|
+------------------+
|
|
|
|
|
v
|
|
[Layer 1: micro_compact] (silent, every turn)
|
|
Replace tool_result content older than last 3
|
|
with "[Previous: used {tool_name}]"
|
|
|
|
|
v
|
|
[Check: tokens > 50000?]
|
|
| |
|
|
no yes
|
|
| |
|
|
v v
|
|
continue [Layer 2: auto_compact]
|
|
Save full transcript to .transcripts/
|
|
Ask LLM to summarize conversation.
|
|
Replace all messages with [summary].
|
|
|
|
|
v
|
|
[Layer 3: compact tool]
|
|
Model calls compact -> immediate summarization.
|
|
Same as auto, triggered manually.
|
|
|
|
Key insight: "The agent can forget strategically and keep working forever."
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from anthropic import Anthropic
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(override=True)
|
|
|
|
if os.getenv("ANTHROPIC_BASE_URL"):
|
|
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
|
|
|
|
WORKDIR = Path.cwd()
|
|
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
|
|
MODEL = os.environ["MODEL_ID"]
|
|
|
|
SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks."
|
|
|
|
THRESHOLD = 50000
|
|
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
|
|
KEEP_RECENT = 3
|
|
|
|
|
|
def estimate_tokens(messages: list) -> int:
|
|
"""Rough token count: ~4 chars per token."""
|
|
return len(str(messages)) // 4
|
|
|
|
|
|
# -- Layer 1: micro_compact - replace old tool results with placeholders --
|
|
def micro_compact(messages: list) -> list:
|
|
# Collect (msg_index, part_index, tool_result_dict) for all tool_result entries
|
|
tool_results = []
|
|
for msg_idx, msg in enumerate(messages):
|
|
if msg["role"] == "user" and isinstance(msg.get("content"), list):
|
|
for part_idx, part in enumerate(msg["content"]):
|
|
if isinstance(part, dict) and part.get("type") == "tool_result":
|
|
tool_results.append((msg_idx, part_idx, part))
|
|
if len(tool_results) <= KEEP_RECENT:
|
|
return messages
|
|
# Find tool_name for each result by matching tool_use_id in prior assistant messages
|
|
tool_name_map = {}
|
|
for msg in messages:
|
|
if msg["role"] == "assistant":
|
|
content = msg.get("content", [])
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
if hasattr(block, "type") and block.type == "tool_use":
|
|
tool_name_map[block.id] = block.name
|
|
# Clear old results (keep last KEEP_RECENT)
|
|
to_clear = tool_results[:-KEEP_RECENT]
|
|
for _, _, result in to_clear:
|
|
if isinstance(result.get("content"), str) and len(result["content"]) > 100:
|
|
tool_id = result.get("tool_use_id", "")
|
|
tool_name = tool_name_map.get(tool_id, "unknown")
|
|
result["content"] = f"[Previous: used {tool_name}]"
|
|
return messages
|
|
|
|
|
|
# -- Layer 2: auto_compact - save transcript, summarize, replace messages --
|
|
def auto_compact(messages: list) -> list:
|
|
# Save full transcript to disk
|
|
TRANSCRIPT_DIR.mkdir(exist_ok=True)
|
|
transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
|
|
with open(transcript_path, "w") as f:
|
|
for msg in messages:
|
|
f.write(json.dumps(msg, default=str) + "\n")
|
|
print(f"[transcript saved: {transcript_path}]")
|
|
# Ask LLM to summarize
|
|
conversation_text = json.dumps(messages, default=str)[:80000]
|
|
response = client.messages.create(
|
|
model=MODEL,
|
|
messages=[{"role": "user", "content":
|
|
"Summarize this conversation for continuity. Include: "
|
|
"1) What was accomplished, 2) Current state, 3) Key decisions made. "
|
|
"Be concise but preserve critical details.\n\n" + conversation_text}],
|
|
max_tokens=2000,
|
|
)
|
|
summary = response.content[0].text
|
|
# Replace all messages with compressed summary
|
|
return [
|
|
{"role": "user", "content": f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"},
|
|
{"role": "assistant", "content": "Understood. I have the context from the summary. Continuing."},
|
|
]
|
|
|
|
|
|
# -- Tool implementations --
|
|
def safe_path(p: str) -> Path:
|
|
path = (WORKDIR / p).resolve()
|
|
if not path.is_relative_to(WORKDIR):
|
|
raise ValueError(f"Path escapes workspace: {p}")
|
|
return path
|
|
|
|
def run_bash(command: str) -> str:
|
|
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
|
|
if any(d in command for d in dangerous):
|
|
return "Error: Dangerous command blocked"
|
|
try:
|
|
r = subprocess.run(command, shell=True, cwd=WORKDIR,
|
|
capture_output=True, text=True, timeout=120)
|
|
out = (r.stdout + r.stderr).strip()
|
|
return out[:50000] if out else "(no output)"
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Timeout (120s)"
|
|
|
|
def run_read(path: str, limit: int = None) -> str:
|
|
try:
|
|
lines = safe_path(path).read_text().splitlines()
|
|
if limit and limit < len(lines):
|
|
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
|
|
return "\n".join(lines)[:50000]
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def run_write(path: str, content: str) -> str:
|
|
try:
|
|
fp = safe_path(path)
|
|
fp.parent.mkdir(parents=True, exist_ok=True)
|
|
fp.write_text(content)
|
|
return f"Wrote {len(content)} bytes"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def run_edit(path: str, old_text: str, new_text: str) -> str:
|
|
try:
|
|
fp = safe_path(path)
|
|
content = fp.read_text()
|
|
if old_text not in content:
|
|
return f"Error: Text not found in {path}"
|
|
fp.write_text(content.replace(old_text, new_text, 1))
|
|
return f"Edited {path}"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
TOOL_HANDLERS = {
|
|
"bash": lambda **kw: run_bash(kw["command"]),
|
|
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
|
|
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
|
|
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
|
|
"compact": lambda **kw: "Manual compression requested.",
|
|
}
|
|
|
|
TOOLS = [
|
|
{"name": "bash", "description": "Run a shell command.",
|
|
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
|
|
{"name": "read_file", "description": "Read file contents.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
|
|
{"name": "write_file", "description": "Write content to file.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
|
|
{"name": "edit_file", "description": "Replace exact text in file.",
|
|
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
|
|
{"name": "compact", "description": "Trigger manual conversation compression.",
|
|
"input_schema": {"type": "object", "properties": {"focus": {"type": "string", "description": "What to preserve in the summary"}}}},
|
|
]
|
|
|
|
|
|
def agent_loop(messages: list):
|
|
while True:
|
|
# Layer 1: micro_compact before each LLM call
|
|
micro_compact(messages)
|
|
# Layer 2: auto_compact if token estimate exceeds threshold
|
|
if estimate_tokens(messages) > THRESHOLD:
|
|
print("[auto_compact triggered]")
|
|
messages[:] = auto_compact(messages)
|
|
response = client.messages.create(
|
|
model=MODEL, system=SYSTEM, messages=messages,
|
|
tools=TOOLS, max_tokens=8000,
|
|
)
|
|
messages.append({"role": "assistant", "content": response.content})
|
|
if response.stop_reason != "tool_use":
|
|
return
|
|
results = []
|
|
manual_compact = False
|
|
for block in response.content:
|
|
if block.type == "tool_use":
|
|
if block.name == "compact":
|
|
manual_compact = True
|
|
output = "Compressing..."
|
|
else:
|
|
handler = TOOL_HANDLERS.get(block.name)
|
|
try:
|
|
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
|
|
except Exception as e:
|
|
output = f"Error: {e}"
|
|
print(f"> {block.name}: {str(output)[:200]}")
|
|
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
|
|
messages.append({"role": "user", "content": results})
|
|
# Layer 3: manual compact triggered by the compact tool
|
|
if manual_compact:
|
|
print("[manual compact]")
|
|
messages[:] = auto_compact(messages)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
history = []
|
|
while True:
|
|
try:
|
|
query = input("\033[36ms06 >> \033[0m")
|
|
except (EOFError, KeyboardInterrupt):
|
|
break
|
|
if query.strip().lower() in ("q", "exit", ""):
|
|
break
|
|
history.append({"role": "user", "content": query})
|
|
agent_loop(history)
|
|
response_content = history[-1]["content"]
|
|
if isinstance(response_content, list):
|
|
for block in response_content:
|
|
if hasattr(block, "text"):
|
|
print(block.text)
|
|
print()
|