feat: build an AI agent from 0 to 1 -- 11 progressive sessions

- 11 sessions from basic agent loop to autonomous teams
- Python MVP implementations for each session
- Mental-model-first docs in en/zh/ja
- Interactive web platform with step-through visualizations
- Incremental architecture: each session adds one mechanism
This commit is contained in:
CrazyBoyM
2026-02-21 14:37:42 +08:00
committed by CrazyBoyM
commit c6a27ef1d7
156 changed files with 28059 additions and 0 deletions

View File

@@ -0,0 +1,132 @@
# s01: The Agent Loop
> AIコーディングエージェントの秘密はすべて、モデルが「終了」と判断するまでツール結果をモデルにフィードバックし続けるwhileループにある。
## 問題
なぜ言語モデルは単体でコーディングの質問に答えられないのか。それはコーディングが「現実世界とのインタラクション」を必要とするからだ。モデルはファイルを読み、テストを実行し、エラーを確認し、反復する必要がある。一回のプロンプト-レスポンスのやり取りではこれは実現できない。
agent loopがなければ、ユーザーが自分でモデルの出力をコピーペーストして戻す必要がある。つまりユーザー自身がループの役割を果たすことになる。agent loopはこれを自動化する: モデルを呼び出し、モデルが要求したツールを実行し、結果をフィードバックし、モデルが「完了」と言うまで繰り返す。
単純なタスクを考えてみよう: 「helloと出力するPythonファイルを作成せよ」。モデルは(1)ファイルを書くことを決定し、(2)書き、(3)動作を検証する必要がある。最低でも3回のツール呼び出しが必要だ。ループがなければ、そのたびに手動の介入が必要になる。
## 解決策
```
+----------+ +-------+ +---------+
| User | ---> | LLM | ---> | Tool |
| prompt | | | | execute |
+----------+ +---+---+ +----+----+
^ |
| tool_result |
+---------------+
(loop continues)
The loop terminates when stop_reason != "tool_use".
That single condition is the entire control flow.
```
## 仕組み
1. ユーザーがプロンプトを入力する。これが最初のメッセージになる。
```python
history.append({"role": "user", "content": query})
```
2. メッセージ配列がツール定義と共にLLMに送信される。
```python
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
```
3. アシスタントのレスポンスがメッセージに追加される。
```python
messages.append({"role": "assistant", "content": response.content})
```
4. stop reasonを確認する。モデルがツールを呼び出さなかった場合、ループは終了する。これが唯一の終了条件だ。
```python
if response.stop_reason != "tool_use":
return
```
5. レスポンス中の各tool_useブロックについて、ツール(このセッションではbash)を実行し、結果を収集する。
```python
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
```
6. 結果がuserメッセージとして追加され、ループが続行する。
```python
messages.append({"role": "user", "content": results})
```
## 主要コード
最小限のエージェント -- パターン全体が30行未満
(`agents/s01_agent_loop.py` 66-86行目):
```python
def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
```
## 変更点
これはセッション1 -- 出発点である。前のセッションは存在しない。
| Component | Before | After |
|---------------|------------|--------------------------------|
| Agent loop | (none) | `while True` + stop_reason |
| Tools | (none) | `bash` (one tool) |
| Messages | (none) | Accumulating list |
| Control flow | (none) | `stop_reason != "tool_use"` |
## 設計原理
このループはすべてのLLMベースエージェントの普遍的な基盤だ。本番実装ではエラーハンドリング、トークンカウント、ストリーミング、リトライロジックが追加されるが、根本的な構造は変わらない。シンプルさこそがポイントだ: 1つの終了条件(`stop_reason != "tool_use"`)がフロー全体を制御する。本コースの他のすべて -- ツール、計画、圧縮、チーム -- はこのループの上に積み重なるが、ループ自体は変更しない。このループを理解することは、すべてのエージェントを理解することだ。
## 試してみる
```sh
cd learn-claude-code
python agents/s01_agent_loop.py
```
試せるプロンプト例:
1. `Create a file called hello.py that prints "Hello, World!"`
2. `List all Python files in this directory`
3. `What is the current git branch?`
4. `Create a directory called test_output and write 3 files in it`

141
docs/ja/s02-tool-use.md Normal file
View File

@@ -0,0 +1,141 @@
# s02: Tools
> ディスパッチマップがツール呼び出しをハンドラ関数にルーティングする -- ループ自体はまったく変更しない。
## 問題
`bash`だけでは、エージェントはすべてをシェル経由で行う: ファイルの読み取り、書き込み、編集。これは動くが脆弱だ。`cat`の出力は予期しないタイミングで切り詰められる。`sed`による置換は特殊文字で失敗する。直接的な関数呼び出しの方がシンプルなのに、モデルはシェルパイプラインの構築にトークンを浪費する。
さらに重要なのは、bashがセキュリティ上の攻撃面であること。bashの呼び出しはシェルでできることなら何でもできてしまう。`read_file``write_file`のような専用ツールがあれば、モデルが危険な操作を避けることを期待するのではなく、ツールレベルでパスのサンドボックス化や危険なパターンのブロックを強制できる。
重要な洞察は、ツールを追加してもループを変更する必要がないということだ。s01のループはそのまま同一で維持される。ツール配列にエントリを追加し、ハンドラ関数を追加し、ディスパッチマップで接続するだけだ。
## 解決策
```
+----------+ +-------+ +------------------+
| User | ---> | LLM | ---> | Tool Dispatch |
| prompt | | | | { |
+----------+ +---+---+ | bash: run_bash |
^ | read: run_read |
| | write: run_wr |
+----------+ edit: run_edit |
tool_result| } |
+------------------+
The dispatch map is a dict: {tool_name: handler_function}
One lookup replaces any if/elif chain.
```
## 仕組み
1. 各ツールのハンドラ関数を定義する。各関数はツールのinput_schemaに対応するキーワード引数を受け取り、文字列の結果を返す。
```python
def run_read(path: str, limit: int = None) -> str:
text = safe_path(path).read_text()
lines = text.splitlines()
if limit and limit < len(lines):
lines = lines[:limit]
return "\n".join(lines)[:50000]
```
2. ツール名とハンドラを結びつけるディスパッチマップを作成する。
```python
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"],
kw["new_text"]),
}
```
3. agent loop内で、ハードコードの代わりに名前でハンドラをルックアップする。
```python
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input)
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
```
4. パスのサンドボックス化により、モデルがワークスペースの外に出ることを防ぐ。
```python
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
```
## 主要コード
ディスパッチパターン(`agents/s02_tool_use.py` 93-129行目):
```python
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"],
kw["new_text"]),
}
def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler \
else f"Unknown tool: {block.name}"
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
```
## s01からの変更点
| Component | Before (s01) | After (s02) |
|----------------|--------------------|----------------------------|
| Tools | 1 (bash only) | 4 (bash, read, write, edit)|
| Dispatch | Hardcoded bash call | `TOOL_HANDLERS` dict |
| Path safety | None | `safe_path()` sandbox |
| Agent loop | Unchanged | Unchanged |
## 設計原理
ディスパッチマップパターンは線形にスケールする -- ツールの追加はハンドラ関数とスキーマエントリを1つずつ追加するだけだ。ループは決して変更しない。この関心の分離(ループ vs ハンドラ)こそが、エージェントフレームワークが制御フローの複雑さを増すことなく数十のツールをサポートできる理由だ。このパターンはまた、各ハンドラの独立テストも可能にする。ハンドラはループとの結合がない純粋関数だからだ。ディスパッチマップを超えるエージェントは、スケーリングの問題ではなく設計の問題を抱えている。
## 試してみる
```sh
cd learn-claude-code
python agents/s02_tool_use.py
```
試せるプロンプト例:
1. `Read the file requirements.txt`
2. `Create a file called greet.py with a greet(name) function`
3. `Edit greet.py to add a docstring to the function`
4. `Read greet.py to verify the edit worked`
5. `Run the greet function with bash: python -c "from greet import greet; greet('World')"`

156
docs/ja/s03-todo-write.md Normal file
View File

@@ -0,0 +1,156 @@
# s03: TodoWrite
> TodoManagerによりエージェントが自身の進捗を追跡でき、nagリマインダーの注入により更新を忘れた場合に強制的に更新させる。
## 問題
エージェントがマルチステップのタスクに取り組むとき、何を完了し何が残っているかを見失うことが多い。明示的な計画がなければ、モデルは作業を繰り返したり、ステップを飛ばしたり、脱線したりする可能性がある。ユーザーにはエージェントの内部計画が見えない。
これは見た目以上に深刻だ。長い会話ではモデルが「ドリフト」する -- コンテキストウィンドウがツール結果で埋まるにつれ、システムプロンプトの影響力が薄れていく。10ステップのリファクタリングタスクでステップ1-3を完了した後、モデルはステップ4-10の存在を忘れて即興で行動し始めるかもしれない。
解決策は構造化された状態管理だ: モデルが明示的に書き込むTodoManager。モデルは計画を作成し、作業中のアイテムをin_progressとしてマークし、完了時にcompletedとマークする。nagリマインダーは、モデルが3ラウンド以上todoを更新しなかった場合にナッジを注入する。
教育上の簡略化: nagの閾値3ラウンドは教育目的の可視化のために低く設定されている。本番のエージェントでは過剰なプロンプトを避けるため閾値は約10に設定されている。
## 解決策
```
+----------+ +-------+ +---------+
| User | ---> | LLM | ---> | Tools |
| prompt | | | | + todo |
+----------+ +---+---+ +----+----+
^ |
| tool_result |
+---------------+
|
+-----------+-----------+
| TodoManager state |
| [ ] task A |
| [>] task B <- doing |
| [x] task C |
+-----------------------+
|
if rounds_since_todo >= 3:
inject <reminder> into tool_result
```
## 仕組み
1. TodoManagerはアイテムのリストをバリデーションして保持する。`in_progress`にできるのは一度に1つだけ。
```python
class TodoManager:
def __init__(self):
self.items = []
def update(self, items: list) -> str:
validated = []
in_progress_count = 0
for item in items:
status = item.get("status", "pending")
if status == "in_progress":
in_progress_count += 1
validated.append({
"id": item["id"],
"text": item["text"],
"status": status,
})
if in_progress_count > 1:
raise ValueError("Only one task can be in_progress")
self.items = validated
return self.render()
```
2. `todo`ツールは他のツールと同様にディスパッチマップに追加される。
```python
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
# ...other tools...
"todo": lambda **kw: TODO.update(kw["items"]),
}
```
3. nagリマインダーは、モデルが3ラウンド以上`todo`を呼び出さなかった場合にtool_resultメッセージに`<reminder>`タグを注入する。
```python
def agent_loop(messages: list):
rounds_since_todo = 0
while True:
if rounds_since_todo >= 3 and messages:
last = messages[-1]
if (last["role"] == "user"
and isinstance(last.get("content"), list)):
last["content"].insert(0, {
"type": "text",
"text": "<reminder>Update your todos.</reminder>",
})
# ... rest of loop ...
rounds_since_todo = 0 if used_todo else rounds_since_todo + 1
```
4. システムプロンプトがモデルにtodoによる計画を指示する。
```python
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Use the todo tool to plan multi-step tasks.
Mark in_progress before starting, completed when done.
Prefer tools over prose."""
```
## 主要コード
TodoManagerとnag注入(`agents/s03_todo_write.py` 51-85行目および158-187行目):
```python
class TodoManager:
def update(self, items: list) -> str:
validated = []
in_progress_count = 0
for item in items:
status = item.get("status", "pending")
if status == "in_progress":
in_progress_count += 1
validated.append({
"id": item["id"],
"text": item["text"],
"status": status,
})
if in_progress_count > 1:
raise ValueError("Only one in_progress")
self.items = validated
return self.render()
# In agent_loop:
if rounds_since_todo >= 3:
last["content"].insert(0, {
"type": "text",
"text": "<reminder>Update your todos.</reminder>",
})
```
## s02からの変更点
| Component | Before (s02) | After (s03) |
|----------------|------------------|--------------------------|
| Tools | 4 | 5 (+todo) |
| Planning | None | TodoManager with statuses|
| Nag injection | None | `<reminder>` after 3 rounds|
| Agent loop | Simple dispatch | + rounds_since_todo counter|
## 設計原理
可視化された計画はタスク完了率を向上させる。モデルが自身の進捗を自己監視できるからだ。nagメカニズムはアカウンタビリティを生み出す -- これがなければ、会話コンテキストが増大し初期の指示が薄れるにつれ、モデルは実行途中で計画を放棄する可能性がある。「一度にin_progressは1つだけ」という制約は逐次的な集中を強制し、出力品質を低下させるコンテキストスイッチのオーバーヘッドを防ぐ。このパターンが機能するのは、モデルのワーキングメモリを注意力のドリフトに耐える構造化された状態に外部化するからだ。
## 試してみる
```sh
cd learn-claude-code
python agents/s03_todo_write.py
```
試せるプロンプト例:
1. `Refactor the file hello.py: add type hints, docstrings, and a main guard`
2. `Create a Python package with __init__.py, utils.py, and tests/test_utils.py`
3. `Review all Python files and fix any style issues`

144
docs/ja/s04-subagent.md Normal file
View File

@@ -0,0 +1,144 @@
# s04: Subagents
> サブエージェントは新しいメッセージリストで実行され、親とファイルシステムを共有し、要約のみを返す -- 親のコンテキストをクリーンに保つ。
## 問題
エージェントが作業するにつれ、メッセージ配列は膨張する。すべてのツール呼び出し、ファイル読み取り、bash出力が蓄積されていく。20-30回のツール呼び出しの後、コンテキストウィンドウは無関係な履歴で溢れる。ちょっとした質問に答えるために500行のファイルを読むと、永久に500行がコンテキストに追加される。
これは探索的タスクで特に深刻だ。「このプロジェクトはどのテストフレームワークを使っているか」という質問には5つのファイルを読む必要があるかもしれないが、親エージェントには5つのファイルの内容すべては不要だ -- 「pytest with conftest.py configuration」という回答だけが必要なのだ。
解決策はプロセスの分離だ: `messages=[]`で子エージェントを生成する。子は探索し、ファイルを読み、コマンドを実行する。終了時には最終的なテキストレスポンスだけが親に返される。子のメッセージ履歴全体は破棄される。
## 解決策
```
Parent agent Subagent
+------------------+ +------------------+
| messages=[...] | | messages=[] | <-- fresh
| | dispatch | |
| tool: task | ---------->| while tool_use: |
| prompt="..." | | call tools |
| | summary | append results |
| result = "..." | <--------- | return last text |
+------------------+ +------------------+
|
Parent context stays clean.
Subagent context is discarded.
```
## 仕組み
1. 親エージェントにサブエージェント生成をトリガーする`task`ツールが追加される。子は`task`を除くすべての基本ツールを取得する(再帰的な生成は不可)。
```python
PARENT_TOOLS = CHILD_TOOLS + [
{"name": "task",
"description": "Spawn a subagent with fresh context.",
"input_schema": {
"type": "object",
"properties": {
"prompt": {"type": "string"},
"description": {"type": "string"},
},
"required": ["prompt"],
}},
]
```
2. サブエージェントは委譲されたプロンプトのみを含む新しいメッセージリストで開始する。ファイルシステムは共有される。
```python
def run_subagent(prompt: str) -> str:
sub_messages = [{"role": "user", "content": prompt}]
for _ in range(30): # safety limit
response = client.messages.create(
model=MODEL, system=SUBAGENT_SYSTEM,
messages=sub_messages,
tools=CHILD_TOOLS, max_tokens=8000,
)
sub_messages.append({
"role": "assistant", "content": response.content
})
if response.stop_reason != "tool_use":
break
# execute tools, append results...
```
3. 最終テキストのみが親に返される。子の30回以上のツール呼び出し履歴は破棄される。
```python
return "".join(
b.text for b in response.content if hasattr(b, "text")
) or "(no summary)"
```
4. 親はこの要約を通常のtool_resultとして受け取る。
```python
if block.name == "task":
output = run_subagent(block.input["prompt"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
```
## 主要コード
サブエージェント関数(`agents/s04_subagent.py` 110-128行目):
```python
def run_subagent(prompt: str) -> str:
sub_messages = [{"role": "user", "content": prompt}]
for _ in range(30):
response = client.messages.create(
model=MODEL, system=SUBAGENT_SYSTEM,
messages=sub_messages,
tools=CHILD_TOOLS, max_tokens=8000,
)
sub_messages.append({"role": "assistant",
"content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input)
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)[:50000]})
sub_messages.append({"role": "user", "content": results})
return "".join(
b.text for b in response.content if hasattr(b, "text")
) or "(no summary)"
```
## s03からの変更点
| Component | Before (s03) | After (s04) |
|----------------|------------------|---------------------------|
| Tools | 5 | 5 (base) + task (parent) |
| Context | Single shared | Parent + child isolation |
| Subagent | None | `run_subagent()` function |
| Return value | N/A | Summary text only |
| Todo system | TodoManager | Removed (not needed here) |
## 設計原理
プロセス分離はコンテキスト分離を無料で提供する。新しい`messages[]`は、サブエージェントが親の会話履歴に混乱させられないことを意味する。トレードオフは通信オーバーヘッドだ -- 結果は親に圧縮して返す必要があり、詳細が失われる。これはOSのプロセス分離と同じトレードオフだ: シリアライゼーションコストと引き換えに安全性とクリーンさを得る。サブエージェントの深さ制限(再帰的なスポーンは不可)は無制限のリソース消費を防ぎ、最大反復回数は暴走した子プロセスの終了を保証する。
## 試してみる
```sh
cd learn-claude-code
python agents/s04_subagent.py
```
試せるプロンプト例:
1. `Use a subtask to find what testing framework this project uses`
2. `Delegate: read all .py files and summarize what each one does`
3. `Use a task to create a new module, then verify it from here`

View File

@@ -0,0 +1,153 @@
# s05: Skills
> 2層のスキル注入により、スキル名をシステムプロンプトに(低コスト)、スキル本体をtool_resultに(オンデマンド)配置することで、システムプロンプトの肥大化を回避する。
## 問題
エージェントに特定のドメインのワークフローを遵守させたい: gitの規約、テストパターン、コードレビューのチェックリストなど。単純なアプローチはすべてをシステムプロンプトに入れることだ。しかしシステムプロンプトの実効的な注意力は有限であり、テキストが多すぎるとモデルはその一部を無視し始める。
10個のスキルが各2000トークンあれば、20,000トークンのシステムプロンプトになる。モデルは先頭と末尾に注意を払い、中間部分は飛ばし読みする。さらに悪いことに、ほとんどのスキルは任意のタスクに対して無関係だ。ファイル編集のタスクにgitワークフローの指示は不要だ。
2層アプローチがこれを解決する: 第1層はシステムプロンプトにスキルの短い説明を置く(スキルあたり約100トークン)。第2層はモデルが`load_skill`を呼び出した時だけ、スキル本体の全文をtool_resultに読み込む。モデルはどのスキルが存在するかを知り(低コスト)、必要な時だけ読み込む(関連する時のみ)。
## 解決策
```
System prompt (Layer 1 -- always present):
+--------------------------------------+
| You are a coding agent. |
| Skills available: |
| - git: Git workflow helpers | ~100 tokens/skill
| - test: Testing best practices |
+--------------------------------------+
When model calls load_skill("git"):
+--------------------------------------+
| tool_result (Layer 2 -- on demand): |
| <skill name="git"> |
| Full git workflow instructions... | ~2000 tokens
| Step 1: ... |
| Step 2: ... |
| </skill> |
+--------------------------------------+
```
## 仕組み
1. スキルファイルは`.skills/`にYAMLフロントマター付きMarkdownとして配置される。
```
.skills/
git.md # ---\n description: Git workflow\n ---\n ...
test.md # ---\n description: Testing patterns\n ---\n ...
```
2. SkillLoaderがフロントマターを解析し、メタデータと本体を分離する。
```python
class SkillLoader:
def _parse_frontmatter(self, text: str) -> tuple:
match = re.match(
r"^---\n(.*?)\n---\n(.*)", text, re.DOTALL
)
if not match:
return {}, text
meta = {}
for line in match.group(1).strip().splitlines():
if ":" in line:
key, val = line.split(":", 1)
meta[key.strip()] = val.strip()
return meta, match.group(2).strip()
```
3. 第1層: `get_descriptions()`がシステムプロンプト用の短い行を返す。
```python
def get_descriptions(self) -> str:
lines = []
for name, skill in self.skills.items():
desc = skill["meta"].get("description", "No description")
lines.append(f" - {name}: {desc}")
return "\n".join(lines)
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Skills available:
{SKILL_LOADER.get_descriptions()}"""
```
4. 第2層: `get_content()``<skill>`タグで囲まれた本体全文を返す。
```python
def get_content(self, name: str) -> str:
skill = self.skills.get(name)
if not skill:
return f"Error: Unknown skill '{name}'."
return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"
```
5. `load_skill`ツールはディスパッチマップの単なる一エントリだ。
```python
TOOL_HANDLERS = {
# ...base tools...
"load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
}
```
## 主要コード
SkillLoaderクラス(`agents/s05_skill_loading.py` 51-97行目):
```python
class SkillLoader:
def __init__(self, skills_dir: Path):
self.skills = {}
for f in sorted(skills_dir.glob("*.md")):
text = f.read_text()
meta, body = self._parse_frontmatter(text)
self.skills[f.stem] = {
"meta": meta, "body": body
}
def get_descriptions(self) -> str:
lines = []
for name, skill in self.skills.items():
desc = skill["meta"].get("description", "")
lines.append(f" - {name}: {desc}")
return "\n".join(lines)
def get_content(self, name: str) -> str:
skill = self.skills.get(name)
if not skill:
return f"Error: Unknown skill '{name}'."
return (f"<skill name=\"{name}\">\n"
f"{skill['body']}\n</skill>")
```
## s04からの変更点
| Component | Before (s04) | After (s05) |
|----------------|------------------|----------------------------|
| Tools | 5 (base + task) | 5 (base + load_skill) |
| System prompt | Static string | + skill descriptions |
| Knowledge | None | .skills/*.md files |
| Injection | None | Two-layer (system + result)|
| Subagent | `run_subagent()` | Removed (different focus) |
## 設計原理
2層注入は注意力バジェットの問題を解決する。すべてのスキル内容をシステムプロンプトに入れると、未使用のスキルにトークンを浪費する。第1層(コンパクトな要約)は合計約120トークンのコストだ。第2層(完全な内容)はtool_resultを通じてオンデマンドで読み込まれる。これにより、モデルの注意力品質を劣化させることなく数十のスキルにスケールできる。重要な洞察は、モデルはどのスキルが存在するか(低コスト)を知るだけで、いつスキルを読み込むか(高コスト)を判断できるということだ。これはソフトウェアモジュールシステムで使われる遅延読み込みと同じ原理だ。
## 試してみる
```sh
cd learn-claude-code
python agents/s05_skill_loading.py
```
試せるプロンプト例:
1. `What skills are available?`
2. `Load the agent-builder skill and follow its instructions`
3. `I need to do a code review -- load the relevant skill first`
4. `Build an MCP server using the mcp-builder skill`

View File

@@ -0,0 +1,170 @@
# s06: Compact
> 3層の圧縮パイプラインにより、古いツール結果の戦略的な忘却、トークンが閾値を超えた時の自動要約、オンデマンドの手動圧縮を組み合わせて、エージェントを無期限に動作可能にする。
## 問題
コンテキストウィンドウは有限だ。十分なツール呼び出しの後、メッセージ配列がモデルのコンテキスト上限を超え、API呼び出しが失敗する。ハード制限に達する前でも、パフォーマンスは劣化する: モデルは遅くなり、精度が落ち、以前のメッセージを無視し始める。
200,000トークンのコンテキストウィンドウは大きく聞こえるが、1000行のソースファイルに対する一回の`read_file`で約4000トークンを消費する。30ファイルを読み20回のbashコマンドを実行すると、100,000トークン以上になる。何らかの圧縮がなければ、エージェントは大規模なコードベースで作業できない。
3層のパイプラインは積極性を段階的に上げて対処する:
第1層(micro-compact)は毎ターン静かに古いツール結果を置換する。
第2層(auto-compact)はトークンが閾値を超えた時に完全な要約を発動する。
第3層(manual compact)はモデル自身が圧縮をトリガーできる。
教育上の簡略化: ここでのトークン推定は大まかな「文字数/4」ヒューリスティックを使用している。本番システムでは正確なカウントのために適切なトークナイザーライブラリを使用する。
## 解決策
```
Every turn:
+------------------+
| Tool call result |
+------------------+
|
v
[Layer 1: micro_compact] (silent, every turn)
Replace tool_result > 3 turns old
with "[Previous: used {tool_name}]"
|
v
[Check: tokens > 50000?]
| |
no yes
| |
v v
continue [Layer 2: auto_compact]
Save transcript to .transcripts/
LLM summarizes conversation.
Replace all messages with [summary].
|
v
[Layer 3: compact tool]
Model calls compact explicitly.
Same summarization as auto_compact.
```
## 仕組み
1. **第1層 -- micro_compact**: 各LLM呼び出しの前に、直近3件以前のすべてのtool_resultエントリを見つけて内容を置換する。
```python
def micro_compact(messages: list) -> list:
tool_results = []
for i, msg in enumerate(messages):
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for j, part in enumerate(msg["content"]):
if isinstance(part, dict) and part.get("type") == "tool_result":
tool_results.append((i, j, part))
if len(tool_results) <= KEEP_RECENT:
return messages
to_clear = tool_results[:-KEEP_RECENT]
for _, _, part in to_clear:
if len(part.get("content", "")) > 100:
tool_id = part.get("tool_use_id", "")
tool_name = tool_name_map.get(tool_id, "unknown")
part["content"] = f"[Previous: used {tool_name}]"
return messages
```
2. **第2層 -- auto_compact**: 推定トークン数が50,000を超えた時、完全なトランスクリプトを保存し、LLMに要約を依頼する。
```python
def auto_compact(messages: list) -> list:
TRANSCRIPT_DIR.mkdir(exist_ok=True)
transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with open(transcript_path, "w") as f:
for msg in messages:
f.write(json.dumps(msg, default=str) + "\n")
response = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content":
"Summarize this conversation for continuity..."
+ json.dumps(messages, default=str)[:80000]}],
max_tokens=2000,
)
summary = response.content[0].text
return [
{"role": "user", "content": f"[Compressed]\n\n{summary}"},
{"role": "assistant", "content": "Understood. Continuing."},
]
```
3. **第3層 -- manual compact**: `compact`ツールが同じ要約処理をオンデマンドでトリガーする。
```python
if manual_compact:
messages[:] = auto_compact(messages)
```
4. agent loopが3つの層すべてを統合する。
```python
def agent_loop(messages: list):
while True:
micro_compact(messages)
if estimate_tokens(messages) > THRESHOLD:
messages[:] = auto_compact(messages)
response = client.messages.create(...)
# ... tool execution ...
if manual_compact:
messages[:] = auto_compact(messages)
```
## 主要コード
3層パイプライン(`agents/s06_context_compact.py` 67-93行目および189-223行目):
```python
THRESHOLD = 50000
KEEP_RECENT = 3
def micro_compact(messages):
# Replace old tool results with placeholders
...
def auto_compact(messages):
# Save transcript, LLM summarize, replace messages
...
def agent_loop(messages):
while True:
micro_compact(messages) # Layer 1
if estimate_tokens(messages) > THRESHOLD:
messages[:] = auto_compact(messages) # Layer 2
response = client.messages.create(...)
# ...
if manual_compact:
messages[:] = auto_compact(messages) # Layer 3
```
## s05からの変更点
| Component | Before (s05) | After (s06) |
|----------------|------------------|----------------------------|
| Tools | 5 | 5 (base + compact) |
| Context mgmt | None | Three-layer compression |
| Micro-compact | None | Old results -> placeholders|
| Auto-compact | None | Token threshold trigger |
| Manual compact | None | `compact` tool |
| Transcripts | None | Saved to .transcripts/ |
| Skills | load_skill | Removed (different focus) |
## 設計原理
コンテキストウィンドウは有限だが、エージェントセッションは無限にできる。3層の圧縮が異なる粒度でこれを解決する: micro-compact(古いツール出力の置換)、auto-compact(上限に近づいたときのLLM要約)、manual compact(ユーザートリガー)。重要な洞察は、忘却はバグではなく機能だということだ -- 無制限のセッションを可能にする。トランスクリプトはディスク上に完全な履歴を保存するため、何も真に失われず、アクティブなコンテキストの外に移動されるだけだ。層状のアプローチにより、各層がサイレントなターンごとのクリーンアップから完全な会話リセットまで、独自の粒度で独立して動作する。
## 試してみる
```sh
cd learn-claude-code
python agents/s06_context_compact.py
```
試せるプロンプト例:
1. `Read every Python file in the agents/ directory one by one`
(micro-compactが古い結果を置換するのを観察する)
2. `Keep reading files until compression triggers automatically`
3. `Use the compact tool to manually compress the conversation`

159
docs/ja/s07-task-system.md Normal file
View File

@@ -0,0 +1,159 @@
# s07: Tasks
> タスクはファイルシステム上にJSON形式で依存グラフ付きで永続化され、コンテキスト圧縮後も生き残り、複数エージェント間で共有できる。
## 問題
インメモリの状態であるTodoManager(s03)は、コンテキストが圧縮(s06)されると失われる。auto_compactがメッセージを要約で置換した後、todoリストは消える。エージェントは要約テキストからそれを再構成しなければならないが、これは不正確でエラーが起きやすい。
これがs06からs07への重要な橋渡しだ: TodoManagerのアイテムは圧縮と共に死ぬが、ファイルベースのタスクは死なない。状態をファイルシステムに移すことで、圧縮に対する耐性が得られる。
さらに根本的な問題として、インメモリの状態は他のエージェントからは見えない。最終的にチーム(s09以降)を構築する際、チームメイトには共有のタスクボードが必要だ。インメモリのデータ構造はプロセスローカルだ。
解決策はタスクを`.tasks/`にJSON形式で永続化すること。各タスクはID、件名、ステータス、依存グラフを持つ個別のファイルだ。タスク1を完了すると、タスク2が`blockedBy: [1]`を持つ場合、自動的にタスク2のブロックが解除される。ファイルシステムが信頼できる情報源となる。
## 解決策
```
.tasks/
task_1.json {"id":1, "status":"completed", ...}
task_2.json {"id":2, "blockedBy":[1], "status":"pending"}
task_3.json {"id":3, "blockedBy":[2], "status":"pending"}
Dependency resolution:
+----------+ +----------+ +----------+
| task 1 | --> | task 2 | --> | task 3 |
| complete | | blocked | | blocked |
+----------+ +----------+ +----------+
| ^
+--- completing task 1 removes it from
task 2's blockedBy list
```
## 仕組み
1. TaskManagerがCRUD操作を提供する。各タスクは1つのJSONファイル。
```python
class TaskManager:
def create(self, subject: str, description: str = "") -> str:
task = {
"id": self._next_id,
"subject": subject,
"description": description,
"status": "pending",
"blockedBy": [],
"blocks": [],
"owner": "",
}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)
```
2. タスクが完了とマークされると、`_clear_dependency`がそのIDを他のすべてのタスクの`blockedBy`リストから除去する。
```python
def _clear_dependency(self, completed_id: int):
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
```
3. `update`メソッドがステータス変更と双方向の依存関係の結線を処理する。
```python
def update(self, task_id, status=None,
add_blocked_by=None, add_blocks=None):
task = self._load(task_id)
if status:
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
if add_blocks:
task["blocks"] = list(set(task["blocks"] + add_blocks))
for blocked_id in add_blocks:
blocked = self._load(blocked_id)
if task_id not in blocked["blockedBy"]:
blocked["blockedBy"].append(task_id)
self._save(blocked)
self._save(task)
```
4. 4つのタスクツールがディスパッチマップに追加される。
```python
TOOL_HANDLERS = {
# ...base tools...
"task_create": lambda **kw: TASKS.create(kw["subject"]),
"task_update": lambda **kw: TASKS.update(kw["task_id"],
kw.get("status")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
}
```
## 主要コード
依存グラフ付きTaskManager(`agents/s07_task_system.py` 46-123行目):
```python
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1
def _load(self, task_id: int) -> dict:
path = self.dir / f"task_{task_id}.json"
return json.loads(path.read_text())
def _save(self, task: dict):
path = self.dir / f"task_{task['id']}.json"
path.write_text(json.dumps(task, indent=2))
def create(self, subject, description=""):
task = {"id": self._next_id, "subject": subject,
"status": "pending", "blockedBy": [],
"blocks": [], "owner": ""}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)
def _clear_dependency(self, completed_id):
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
```
## s06からの変更点
| Component | Before (s06) | After (s07) |
|----------------|------------------|----------------------------|
| Tools | 5 | 8 (+task_create/update/list/get)|
| State storage | In-memory only | JSON files in .tasks/ |
| Dependencies | None | blockedBy + blocks graph |
| Compression | Three-layer | Removed (different focus) |
| Persistence | Lost on compact | Survives compression |
## 設計原理
ファイルベースの状態はコンテキスト圧縮を生き延びる。エージェントの会話が圧縮されるとメモリ内の状態は失われるが、ディスクに書き込まれたタスクは永続する。依存グラフにより、コンテキストが失われた後でも正しい順序で実行される。これは一時的な会話と永続的な作業の橋渡しだ -- エージェントは会話の詳細を忘れても、タスクボードが常に何をすべきかを思い出させてくれる。ファイルシステムを信頼できる情報源とすることで、将来のマルチエージェント共有も可能になる。任意のプロセスが同じJSONファイルを読み取れるからだ。
## 試してみる
```sh
cd learn-claude-code
python agents/s07_task_system.py
```
試せるプロンプト例:
1. `Create 3 tasks: "Setup project", "Write code", "Write tests". Make them depend on each other in order.`
2. `List all tasks and show the dependency graph`
3. `Complete task 1 and then list tasks to see task 2 unblocked`
4. `Create a task board for refactoring: parse -> transform -> emit -> test`

View File

@@ -0,0 +1,177 @@
# s08: Background Tasks
> BackgroundManagerがコマンドを別スレッドで実行し、各LLM呼び出しの前に通知キューをドレインすることで、エージェントは長時間実行操作でブロックされなくなる。
## 問題
一部のコマンドは数分かかる: `npm install``pytest``docker build`。ブロッキングのagent loopでは、モデルはサブプロセスの終了を待って待機する。他のことは何もできない。ユーザーが「依存関係をインストールして、その間にconfigファイルを作成して」と言った場合、エージェントはまずインストールを行い、その後configを作成する -- 並列ではなく逐次的に。
エージェントには並行性が必要だ。agent loop自体の完全なマルチスレッディングではなく、長いコマンドを発射して実行中に作業を続ける能力だ。コマンドが終了したら、その結果は自然に会話に現れるべきだ。
解決策は、BackgroundManagerがコマンドをデーモンスレッドで実行し、結果を通知キューに収集すること。各LLM呼び出しの前にキューがドレインされ、結果がメッセージに注入される。
## 解決策
```
Main thread Background thread
+-----------------+ +-----------------+
| agent loop | | task executes |
| ... | | ... |
| [LLM call] <---+------- | enqueue(result) |
| ^drain queue | +-----------------+
+-----------------+
Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
| |
v v
[A runs] [B runs] (parallel)
| |
+-- notification queue --+
|
[results injected before
next LLM call]
```
## 仕組み
1. BackgroundManagerがタスクを追跡し、スレッドセーフな通知キューを維持する。
```python
class BackgroundManager:
def __init__(self):
self.tasks = {}
self._notification_queue = []
self._lock = threading.Lock()
```
2. `run()`がデーモンスレッドを開始し、task_idを即座に返す。
```python
def run(self, command: str) -> str:
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {
"status": "running",
"result": None,
"command": command,
}
thread = threading.Thread(
target=self._execute,
args=(task_id, command),
daemon=True,
)
thread.start()
return f"Background task {task_id} started"
```
3. スレッドのターゲットである`_execute`がサブプロセスを実行し、結果を通知キューにプッシュする。
```python
def _execute(self, task_id: str, command: str):
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=300)
output = (r.stdout + r.stderr).strip()[:50000]
status = "completed"
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
status = "timeout"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output
with self._lock:
self._notification_queue.append({
"task_id": task_id,
"status": status,
"result": output[:500],
})
```
4. `drain_notifications()`が保留中の結果を返してクリアする。
```python
def drain_notifications(self) -> list:
with self._lock:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs
```
5. agent loopが各LLM呼び出しの前に通知をドレインする。
```python
def agent_loop(messages: list):
while True:
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: "
f"{n['result']}" for n in notifs
)
messages.append({"role": "user",
"content": f"<background-results>"
f"\n{notif_text}\n"
f"</background-results>"})
messages.append({"role": "assistant",
"content": "Noted background results."})
response = client.messages.create(...)
```
## 主要コード
BackgroundManager(`agents/s08_background_tasks.py` 49-107行目):
```python
class BackgroundManager:
def __init__(self):
self.tasks = {}
self._notification_queue = []
self._lock = threading.Lock()
def run(self, command: str) -> str:
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {"status": "running",
"result": None,
"command": command}
thread = threading.Thread(
target=self._execute,
args=(task_id, command), daemon=True)
thread.start()
return f"Background task {task_id} started"
def _execute(self, task_id, command):
# run subprocess, push to queue
...
def drain_notifications(self) -> list:
with self._lock:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs
```
## s07からの変更点
| Component | Before (s07) | After (s08) |
|----------------|------------------|----------------------------|
| Tools | 8 | 6 (base + background_run + check)|
| Execution | Blocking only | Blocking + background threads|
| Notification | None | Queue drained per loop |
| Concurrency | None | Daemon threads |
| Task system | File-based CRUD | Removed (different focus) |
## 設計原理
エージェントループは本質的にシングルスレッドだ(一度に1つのLLM呼び出し)。バックグラウンドスレッドはI/Oバウンドな作業(テスト、ビルド、インストール)に対してこの制約を打破する。通知キューパターン(「次のLLM呼び出し前にドレイン」)により、結果はモデルの推論を途中で中断するのではなく、会話の自然な区切りで到着する。これは最小限の並行性モデルだ: エージェントループはシングルスレッドで決定論的なまま、I/Oバウンドなサブプロセス実行のみが並列化される。
## 試してみる
```sh
cd learn-claude-code
python agents/s08_background_tasks.py
```
試せるプロンプト例:
1. `Run "sleep 5 && echo done" in the background, then create a file while it runs`
2. `Start 3 background tasks: "sleep 2", "sleep 4", "sleep 6". Check their status.`
3. `Run pytest in the background and keep working on other things`

212
docs/ja/s09-agent-teams.md Normal file
View File

@@ -0,0 +1,212 @@
# s09: Agent Teams
> JSONL形式のインボックスを持つ永続的なチームメイトが、孤立したエージェントをコミュニケーションするチームに変える -- spawn、message、broadcast、drain。
## 問題
サブエージェント(s04)は使い捨てだ: 生成し、作業し、要約を返し、消滅する。アイデンティティもなく、呼び出し間の記憶もなく、フォローアップの指示を受け取る方法もない。バックグラウンドタスク(s08)はシェルコマンドを実行するが、LLM誘導の意思決定やフィードバックの伝達はできない。
本物のチームワークには3つのものが必要だ: (1)単一のプロンプトを超えて存続する永続的なエージェント、(2)アイデンティティとライフサイクル管理、(3)エージェント間の通信チャネル。メッセージングがなければ、永続的なチームメイトでさえ聾唖だ -- 並列に作業できるが協調することはない。
解決策は、名前付きの永続的エージェントを生成するTeammateManagerと、JONSLインボックスファイルを使うMessageBusの組み合わせだ。各チームメイトは自身のagent loopをスレッドで実行し、各LLM呼び出しの前にインボックスを確認し、他のチームメイトやリーダーにメッセージを送れる。
s06からs07への橋渡しについての注記: s03のTodoManagerアイテムは圧縮(s06)と共に死ぬ。ファイルベースのタスク(s07)はディスク上に存在するため圧縮後も生き残る。チームも同じ原則の上に構築されている -- config.jsonとインボックスファイルはコンテキストウィンドウの外に永続化される。
## 解決策
```
Teammate lifecycle:
spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN
Communication:
.team/
config.json <- team roster + statuses
inbox/
alice.jsonl <- append-only, drain-on-read
bob.jsonl
lead.jsonl
+--------+ send("alice","bob","...") +--------+
| alice | -----------------------------> | bob |
| loop | bob.jsonl << {json_line} | loop |
+--------+ +--------+
^ |
| BUS.read_inbox("alice") |
+---- alice.jsonl -> read + drain ---------+
5 message types:
+-------------------------+------------------------------+
| message | Normal text between agents |
| broadcast | Sent to all teammates |
| shutdown_request | Request graceful shutdown |
| shutdown_response | Approve/reject shutdown |
| plan_approval_response | Approve/reject plan |
+-------------------------+------------------------------+
```
## 仕組み
1. TeammateManagerがチームの名簿としてconfig.jsonを管理する。各メンバーは名前、役割、ステータスを持つ。
```python
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads = {}
```
2. `spawn()`がチームメイトを作成し、そのagent loopをスレッドで開始する。アイドル状態のチームメイトを再spawnすると再活性化される。
```python
def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
else:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt), daemon=True)
self.threads[name] = thread
thread.start()
return f"Spawned teammate '{name}' (role: {role})"
```
3. MessageBusがJSONLインボックスファイルを処理する。`send()`がJSON行を追記し、`read_inbox()`がすべての行を読み取ってファイルをドレインする。
```python
class MessageBus:
def send(self, sender, to, content,
msg_type="message", extra=None):
msg = {"type": msg_type, "from": sender,
"content": content,
"timestamp": time.time()}
if extra:
msg.update(extra)
with open(self.dir / f"{to}.jsonl", "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
def read_inbox(self, name):
path = self.dir / f"{name}.jsonl"
if not path.exists():
return "[]"
msgs = [json.loads(l)
for l in path.read_text().strip().splitlines()
if l]
path.write_text("") # drain
return json.dumps(msgs, indent=2)
```
4. 各チームメイトは各LLM呼び出しの前にインボックスを確認し、受信メッセージを会話コンテキストに注入する。
```python
def _teammate_loop(self, name, role, prompt):
sys_prompt = f"You are '{name}', role: {role}, at {WORKDIR}."
messages = [{"role": "user", "content": prompt}]
for _ in range(50):
inbox = BUS.read_inbox(name)
if inbox != "[]":
messages.append({"role": "user",
"content": f"<inbox>{inbox}</inbox>"})
messages.append({"role": "assistant",
"content": "Noted inbox messages."})
response = client.messages.create(
model=MODEL, system=sys_prompt,
messages=messages, tools=TOOLS)
messages.append({"role": "assistant",
"content": response.content})
if response.stop_reason != "tool_use":
break
# execute tools, append results...
self._find_member(name)["status"] = "idle"
self._save_config()
```
5. `broadcast()`が送信者以外の全チームメイトに同じメッセージを送信する。
```python
def broadcast(self, sender, content, teammates):
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"
```
## 主要コード
TeammateManager + MessageBusのコア(`agents/s09_agent_teams.py`):
```python
class TeammateManager:
def spawn(self, name, role, prompt):
member = self._find_member(name) or {
"name": name, "role": role, "status": "working"
}
member["status"] = "working"
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt), daemon=True)
thread.start()
return f"Spawned '{name}'"
class MessageBus:
def send(self, sender, to, content,
msg_type="message", extra=None):
msg = {"type": msg_type, "from": sender,
"content": content, "timestamp": time.time()}
if extra: msg.update(extra)
with open(self.dir / f"{to}.jsonl", "a") as f:
f.write(json.dumps(msg) + "\n")
def read_inbox(self, name):
path = self.dir / f"{name}.jsonl"
if not path.exists(): return "[]"
msgs = [json.loads(l)
for l in path.read_text().strip().splitlines()
if l]
path.write_text("")
return json.dumps(msgs, indent=2)
```
## s08からの変更点
| Component | Before (s08) | After (s09) |
|----------------|------------------|----------------------------|
| Tools | 6 | 9 (+spawn/send/read_inbox) |
| Agents | Single | Lead + N teammates |
| Persistence | None | config.json + JSONL inboxes|
| Threads | Background cmds | Full agent loops per thread|
| Lifecycle | Fire-and-forget | idle -> working -> idle |
| Communication | None | 5 message types + broadcast|
教育上の簡略化: この実装ではインボックスアクセスにロックファイルを使用していない。本番環境では、複数ライターからの並行追記にはファイルロッキングまたはアトミックリネームが必要になる。ここで使用している単一ライター/インボックスパターンは教育シナリオでは安全だ。
## 設計原理
ファイルベースのメールボックス(追記専用JSONL)は並行性安全なエージェント間通信を提供する。追記はほとんどのファイルシステムでアトミックであり、ロック競合を回避する。「読み取り時にドレイン」パターン(全読み取り、切り詰め)はバッチ配信を提供する。これは共有メモリやソケットベースのIPCよりもシンプルで堅牢だ。トレードオフはレイテンシだ -- メッセージは次のポーリングまで見えない -- しかし各ターンに数秒の推論時間がかかるLLM駆動エージェントにとって、ポーリングレイテンシは推論時間に比べて無視できる。
## 試してみる
```sh
cd learn-claude-code
python agents/s09_agent_teams.py
```
試せるプロンプト例:
1. `Spawn alice (coder) and bob (tester). Have alice send bob a message.`
2. `Broadcast "status update: phase 1 complete" to all teammates`
3. `Check the lead inbox for any messages`
4. `/team`と入力してステータス付きのチーム名簿を確認する
5. `/inbox`と入力してリーダーのインボックスを手動確認する

View File

@@ -0,0 +1,190 @@
# s10: Team Protocols
> 同じrequest_idハンドシェイクパターンがシャットダウンとプラン承認の両方を支える -- 1つのFSM、2つの適用。
## 問題
s09ではチームメイトが作業しコミュニケーションするが、構造化された協調はない。2つの問題が生じる:
**シャットダウン**: チームメイトをどうやってクリーンに停止するか。スレッドを強制終了するとファイルが中途半端に書かれ、config.jsonが不正な状態になる。グレースフルシャットダウンにはハンドシェイクが必要だ: リーダーが要求し、チームメイトが承認(終了処理を行い退出)するか拒否(作業を継続)するかを判断する。
**プラン承認**: 実行をどうやってゲーティングするか。リーダーが「認証モジュールをリファクタリングして」と言うと、チームメイトは即座に開始する。リスクの高い変更では、実行開始前にリーダーが計画をレビューすべきだ。ジュニアが提案し、シニアが承認する。
両方の問題は同じ構造を共有している: 一方がユニークなIDを持つリクエストを送り、もう一方がそのIDを参照してレスポンスする。有限状態機械が各リクエストをpending -> approved | rejectedの遷移で追跡する。
## 解決策
```
Shutdown Protocol Plan Approval Protocol
================== ======================
Lead Teammate Teammate Lead
| | | |
|--shutdown_req-->| |--plan_req------>|
| {req_id:"abc"} | | {req_id:"xyz"} |
| | | |
|<--shutdown_resp-| |<--plan_resp-----|
| {req_id:"abc", | | {req_id:"xyz", |
| approve:true} | | approve:true} |
| | | |
v v v v
tracker["abc"] exits proceeds tracker["xyz"]
= approved = approved
Shared FSM (identical for both protocols):
[pending] --approve--> [approved]
[pending] --reject---> [rejected]
Trackers:
shutdown_requests = {req_id: {target, status}}
plan_requests = {req_id: {from, plan, status}}
```
## 仕組み
1. リーダーがrequest_idを生成し、インボックス経由でshutdown_requestを送信してシャットダウンを開始する。
```python
shutdown_requests = {}
def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
shutdown_requests[req_id] = {
"target": teammate, "status": "pending",
}
BUS.send("lead", teammate, "Please shut down gracefully.",
"shutdown_request", {"request_id": req_id})
return f"Shutdown request {req_id} sent (status: pending)"
```
2. チームメイトはインボックスでリクエストを受信し、`shutdown_response`ツールを呼び出して承認または拒否する。
```python
if tool_name == "shutdown_response":
req_id = args["request_id"]
approve = args["approve"]
if req_id in shutdown_requests:
shutdown_requests[req_id]["status"] = \
"approved" if approve else "rejected"
BUS.send(sender, "lead", args.get("reason", ""),
"shutdown_response",
{"request_id": req_id, "approve": approve})
return f"Shutdown {'approved' if approve else 'rejected'}"
```
3. チームメイトのループが承認済みシャットダウンを確認して終了する。
```python
if (block.name == "shutdown_response"
and block.input.get("approve")):
should_exit = True
# ...
member["status"] = "shutdown" if should_exit else "idle"
```
4. プラン承認も同一のパターンに従う。チームメイトがプランを提出し、request_idを生成する。
```python
plan_requests = {}
if tool_name == "plan_approval":
plan_text = args.get("plan", "")
req_id = str(uuid.uuid4())[:8]
plan_requests[req_id] = {
"from": sender, "plan": plan_text,
"status": "pending",
}
BUS.send(sender, "lead", plan_text,
"plan_approval_request",
{"request_id": req_id, "plan": plan_text})
return f"Plan submitted (request_id={req_id})"
```
5. リーダーがレビューし、同じrequest_idでレスポンスする。
```python
def handle_plan_review(request_id, approve, feedback=""):
req = plan_requests.get(request_id)
if not req:
return f"Error: Unknown request_id '{request_id}'"
req["status"] = "approved" if approve else "rejected"
BUS.send("lead", req["from"], feedback,
"plan_approval_response",
{"request_id": request_id,
"approve": approve,
"feedback": feedback})
return f"Plan {req['status']} for '{req['from']}'"
```
6. 両プロトコルとも同じ`plan_approval`ツール名を2つのモードで使用する: チームメイトが提出(request_idなし)、リーダーがレビュー(request_idあり)。
```python
# Lead tool dispatch:
"plan_approval": lambda **kw: handle_plan_review(
kw["request_id"], kw["approve"],
kw.get("feedback", "")),
# Teammate: submit mode (generate request_id)
```
## 主要コード
2つのプロトコルハンドラ(`agents/s10_team_protocols.py`):
```python
shutdown_requests = {}
plan_requests = {}
# -- Shutdown --
def handle_shutdown_request(teammate):
req_id = str(uuid.uuid4())[:8]
shutdown_requests[req_id] = {
"target": teammate, "status": "pending"
}
BUS.send("lead", teammate,
"Please shut down gracefully.",
"shutdown_request",
{"request_id": req_id})
# -- Plan Approval --
def handle_plan_review(request_id, approve, feedback=""):
req = plan_requests[request_id]
req["status"] = "approved" if approve else "rejected"
BUS.send("lead", req["from"], feedback,
"plan_approval_response",
{"request_id": request_id,
"approve": approve})
# Both use the same FSM:
# pending -> approved | rejected
# Both correlate by request_id across async inboxes
```
## s09からの変更点
| Component | Before (s09) | After (s10) |
|----------------|------------------|------------------------------|
| Tools | 9 | 12 (+shutdown_req/resp +plan)|
| Shutdown | Natural exit only| Request-response handshake |
| Plan gating | None | Submit/review with approval |
| Request tracking| None | Two tracker dicts |
| Correlation | None | request_id per request |
| FSM | None | pending -> approved/rejected |
## 設計原理
request_id相関パターンは、任意の非同期インタラクションを追跡可能な有限状態マシンに変換する。同じ3状態マシン(pending -> approved/rejected)がシャットダウン、プラン承認、または将来の任意のプロトコルに適用される。1つのパターンが複数のプロトコルを処理できるのはこのためだ -- FSMは何を承認しているかを気にしない。request_idはメッセージが順不同で到着する可能性のある非同期インボックス間で相関を提供し、エージェント間のタイミング差異に対してパターンを堅牢にする。
## 試してみる
```sh
cd learn-claude-code
python agents/s10_team_protocols.py
```
試せるプロンプト例:
1. `Spawn alice as a coder. Then request her shutdown.`
2. `List teammates to see alice's status after shutdown approval`
3. `Spawn bob with a risky refactoring task. Review and reject his plan.`
4. `Spawn charlie, have him submit a plan, then approve it.`
5. `/team`と入力してステータスを監視する

View File

@@ -0,0 +1,215 @@
# s11: Autonomous Agents
> タスクボードポーリング付きのアイドルサイクルにより、チームメイトが自分で作業を見つけて確保できるようになり、コンテキスト圧縮後にはアイデンティティの再注入が行われる。
## 問題
s09-s10では、チームメイトは明示的に指示された時のみ作業する。リーダーは各チームメイトを特定のプロンプトでspawnしなければならない。タスクボードに未割り当てのタスクが10個あっても、リーダーが手動で各タスクを割り当てなければならない。これはスケールしない。
真の自律性とは、チームメイトが自分で作業を見つけることだ。チームメイトが現在のタスクを完了したら、タスクボードで未確保の作業をスキャンし、タスクを確保し、作業を開始すべきだ -- リーダーからの指示なしに。
しかし自律エージェントには微妙な問題がある: コンテキスト圧縮後に、エージェントが自分が誰かを忘れる可能性がある。メッセージが要約されると、元のシステムプロンプトのアイデンティティ(「あなたはalice、役割はcoder」)が薄れる。アイデンティティの再注入は、圧縮されたコンテキストの先頭にアイデンティティブロックを挿入することでこれを解決する。
教育上の簡略化: ここで使用するトークン推定は大まかなもの(文字数 / 4)だ。本番システムでは適切なトークナイザーライブラリを使用する。nagの閾値3ラウンド(s03から)は教育目的の可視化のために低く設定されている。本番のエージェントでは閾値は約10。
## 解決策
```
Teammate lifecycle with idle cycle:
+-------+
| spawn |
+---+---+
|
v
+-------+ tool_use +-------+
| WORK | <------------- | LLM |
+---+---+ +-------+
|
| stop_reason != tool_use
| (or idle tool called)
v
+--------+
| IDLE | poll every 5s for up to 60s
+---+----+
|
+---> check inbox --> message? ----------> WORK
|
+---> scan .tasks/ --> unclaimed? -------> claim -> WORK
|
+---> 60s timeout ----------------------> SHUTDOWN
Identity re-injection after compression:
if len(messages) <= 3:
messages.insert(0, identity_block)
"You are 'alice', role: coder, team: my-team"
```
## 仕組み
1. チームメイトのループにはWORKとIDLEの2つのフェーズがある。WORKは標準的なagent loopを実行する。LLMがツール呼び出しを停止した時(または`idle`ツールを呼び出した時)、チームメイトはIDLEフェーズに入る。
```python
def _loop(self, name, role, prompt):
while True:
# -- WORK PHASE --
messages = [{"role": "user", "content": prompt}]
for _ in range(50):
inbox = BUS.read_inbox(name)
for msg in inbox:
if msg.get("type") == "shutdown_request":
self._set_status(name, "shutdown")
return
messages.append(...)
response = client.messages.create(...)
if response.stop_reason != "tool_use":
break
# execute tools...
if idle_requested:
break
# -- IDLE PHASE --
self._set_status(name, "idle")
resume = self._idle_poll(name, messages)
if not resume:
self._set_status(name, "shutdown")
return
self._set_status(name, "working")
```
2. IDLEフェーズがインボックスとタスクボードをループでポーリングする。
```python
def _idle_poll(self, name, messages):
polls = IDLE_TIMEOUT // POLL_INTERVAL # 60s / 5s = 12
for _ in range(polls):
time.sleep(POLL_INTERVAL)
# Check inbox for new messages
inbox = BUS.read_inbox(name)
if inbox:
messages.append({"role": "user",
"content": f"<inbox>{inbox}</inbox>"})
return True
# Scan task board for unclaimed tasks
unclaimed = scan_unclaimed_tasks()
if unclaimed:
task = unclaimed[0]
claim_task(task["id"], name)
messages.append({"role": "user",
"content": f"<auto-claimed>Task #{task['id']}: "
f"{task['subject']}</auto-claimed>"})
return True
return False # timeout -> shutdown
```
3. タスクボードスキャンがpendingかつ未割り当てかつブロックされていないタスクを探す。
```python
def scan_unclaimed_tasks() -> list:
TASKS_DIR.mkdir(exist_ok=True)
unclaimed = []
for f in sorted(TASKS_DIR.glob("task_*.json")):
task = json.loads(f.read_text())
if (task.get("status") == "pending"
and not task.get("owner")
and not task.get("blockedBy")):
unclaimed.append(task)
return unclaimed
def claim_task(task_id: int, owner: str):
path = TASKS_DIR / f"task_{task_id}.json"
task = json.loads(path.read_text())
task["status"] = "in_progress"
task["owner"] = owner
path.write_text(json.dumps(task, indent=2))
```
4. アイデンティティの再注入は、コンテキストが短すぎる場合(圧縮が発生したことを示す)にアイデンティティブロックを挿入する。
```python
def make_identity_block(name, role, team_name):
return {"role": "user",
"content": f"<identity>You are '{name}', "
f"role: {role}, team: {team_name}. "
f"Continue your work.</identity>"}
# Before resuming work after idle:
if len(messages) <= 3:
messages.insert(0, make_identity_block(
name, role, team_name))
messages.insert(1, {"role": "assistant",
"content": f"I am {name}. Continuing."})
```
5. `idle`ツールにより、チームメイトはもう作業がないことを明示的にシグナルし、早期にアイドルポーリングフェーズに入る。
```python
{"name": "idle",
"description": "Signal that you have no more work. "
"Enters idle polling phase.",
"input_schema": {"type": "object", "properties": {}}},
```
## 主要コード
自律ループ(`agents/s11_autonomous_agents.py`):
```python
def _loop(self, name, role, prompt):
while True:
# WORK PHASE
for _ in range(50):
response = client.messages.create(...)
if response.stop_reason != "tool_use":
break
for block in response.content:
if block.name == "idle":
idle_requested = True
if idle_requested:
break
# IDLE PHASE
self._set_status(name, "idle")
for _ in range(IDLE_TIMEOUT // POLL_INTERVAL):
time.sleep(POLL_INTERVAL)
inbox = BUS.read_inbox(name)
if inbox: resume = True; break
unclaimed = scan_unclaimed_tasks()
if unclaimed:
claim_task(unclaimed[0]["id"], name)
resume = True; break
if not resume:
self._set_status(name, "shutdown")
return
self._set_status(name, "working")
```
## s10からの変更点
| Component | Before (s10) | After (s11) |
|----------------|------------------|----------------------------|
| Tools | 12 | 14 (+idle, +claim_task) |
| Autonomy | Lead-directed | Self-organizing |
| Idle phase | None | Poll inbox + task board |
| Task claiming | Manual only | Auto-claim unclaimed tasks |
| Identity | System prompt | + re-injection after compress|
| Timeout | None | 60s idle -> auto shutdown |
## 設計原理
ポーリング + タイムアウトにより、エージェントは中央コーディネーターなしで自己組織化する。各エージェントは独立してタスクボードをポーリングし、未確保の作業を確保し、完了したらアイドルに戻る。タイムアウトがポーリングサイクルをトリガーし、ウィンドウ内に作業が現れなければエージェントは自らシャットダウンする。これはワークスティーリングスレッドプールと同じパターンだ -- 分散型で単一障害点がない。圧縮後のアイデンティティ再注入により、会話履歴が要約された後もエージェントは自身の役割を維持する。
## 試してみる
```sh
cd learn-claude-code
python agents/s11_autonomous_agents.py
```
試せるプロンプト例:
1. `Create 3 tasks on the board, then spawn alice and bob. Watch them auto-claim.`
2. `Spawn a coder teammate and let it find work from the task board itself`
3. `Create tasks with dependencies. Watch teammates respect the blocked order.`
4. `/tasks`と入力してオーナー付きのタスクボードを確認する
5. `/team`と入力して誰が作業中でアイドルかを監視する