Learn Claude Code - 完整课程笔记

来源:https://learn.shareai.run/zh/timeline/

一套从零开始学习 Claude Code 智能体构建的进阶课程,共12个章节,从最基础的 Agent 循环逐步构建出完整的自主多智能体协作系统。


目录


第一章:Agent 循环(s01)

分类: 工具与执行 | 副标题: Bash is All You Need
代码量: 84 行 | 工具数: 1

“One loop & Bash is all you need” – 一个工具 + 一个循环 = 一个智能体。

The While Loop

Every agent is a while loop that keeps calling the model until it says ‘stop’.

学习路径: s01 → s02 → s03 → s04 → s05 → s06 | s07 → s08 → s09 → s10 → s11 → s12

问题

语言模型能推理代码,但碰不到真实世界 – 不能读文件、跑测试、看报错。没有循环,每次工具调用你都得手动把结果粘回去。你自己就是那个循环。

解决方案

1
2
3
4
5
6
7
8
+--------+      +-------+      +---------+
| User | ---> | LLM | ---> | Tool |
| prompt | | | | execute |
+--------+ +---+---+ +----+----+
^ |
| tool_result |
+----------------+
(loop until stop_reason != "tool_use")

一个退出条件控制整个流程。循环持续运行,直到模型不再调用工具。

工作原理

1. 用户 prompt 作为第一条消息。

1
messages.append({"role": "user", "content": query})

2. 将消息和工具定义一起发给 LLM。

1
2
3
4
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)

3. 追加助手响应。检查 stop_reason – 如果模型没有调用工具,结束。

1
2
3
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return

4. 执行每个工具调用,收集结果,作为 user 消息追加。回到第 2 步。

1
2
3
4
5
6
7
8
9
10
results = []
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})

组装为一个完整函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def agent_loop(query):
messages = [{"role": "user", "content": query}]
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})

if response.stop_reason != "tool_use":
return

results = []
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})

不到 30 行,这就是整个智能体。后面 11 个章节都在这个循环上叠加机制 – 循环本身始终不变。

变更内容

组件 之前 之后
Agent loop (无) while True + stop_reason
Tools (无) bash (单一工具)
Messages (无) 累积式消息列表
Control flow (无) stop_reason != "tool_use"

试一试

1
2
cd learn-claude-code
python agents/s01_agent_loop.py

试试这些 prompt(英文 prompt 对 LLM 效果更好,也可以用中文):

  • Create a file called hello.py that prints "Hello, World!"
  • List all Python files in this directory
  • What is the current git branch?
  • Create a directory called test_output and write 3 files in it

第二章:工具(s02)

分类: 工具与执行 | 副标题: 工具分发映射
代码量: 120 行 | 工具数: 4

“加一个工具,只加一个 handler” – 循环不用动,新工具注册进 dispatch map 就行。

The Dispatch Map

A dictionary maps tool names to handler functions. The loop code never changes.

工具列表:

  • bash - Execute shell commands
  • read_file - Read file contents
  • write_file - Create or overwrite a file
  • edit_file - Apply targeted edits

问题

只有 bash 时,所有操作都走 shell。cat 截断不可预测,sed 遇到特殊字符就崩,每次 bash 调用都是不受约束的安全面。专用工具(read_filewrite_file)可以在工具层面做路径沙箱。

关键洞察: 加工具不需要改循环。

解决方案

1
2
3
4
5
6
7
8
9
+--------+      +-------+      +------------------+
| User | ---> | LLM | ---> | Tool Dispatch |
| prompt | | | | { |
+--------+ +---+---+ | bash: run_bash |
^ | read: run_read |
| | write: run_wr |
+-----------+ edit: run_edit |
tool_result | } |
+------------------+

The dispatch map is a dict: {tool_name: handler_function}。One lookup replaces any if/elif chain.

工作原理

1. 每个工具有一个处理函数。路径沙箱防止逃逸工作区。

1
2
3
4
5
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
1
2
3
4
5
6
def run_read(path: str, limit: int = None) -> str:
text = safe_path(path).read_text()
lines = text.splitlines()
if limit and limit < len(lines):
lines = lines[:limit]
return "\n".join(lines)[:50000]

2. dispatch map 将工具名映射到处理函数。

1
2
3
4
5
6
7
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"],
kw["new_text"]),
}

3. 循环中按名称查找处理函数。循环体本身与 s01 完全一致。

1
2
3
4
5
6
7
8
9
10
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler \
else f"Unknown tool: {block.name}"
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})

原则: 加工具 = 加 handler + 加 schema。循环永远不变。

变更内容

组件 之前 (s01) 之后 (s02)
Tools 1(仅 bash) 4(bash, read, write, edit)
Dispatch 硬编码 bash 调用 TOOL_HANDLERS 字典
路径安全 safe_path() 沙箱
Agent loop 不变 不变

试一试

1
2
cd learn-claude-code
python agents/s02_tool_use.py
  • Read the file requirements.txt
  • Create a file called greet.py with a greet(name) function
  • Edit greet.py to add a docstring to the function
  • Read greet.py to verify the edit worked

第三章:TodoWrite(s03)

分类: 规划与协调 | 副标题: TodoWrite 提醒系统
代码量: 176 行 | 工具数: 5

“没有计划的 agent 走哪算哪” – 先列步骤再动手,完成率翻倍。

问题

多步任务中,模型会丢失进度 – 重复做过的事、跳步、跑偏。对话越长越严重:工具结果不断填满上下文,系统提示的影响力逐渐被稀释。一个 10 步重构可能做完 1-3 步就开始即兴发挥,因为 4-10 步已经被挤出注意力了。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+--------+      +-------+      +---------+
| User | ---> | LLM | ---> | Tools |
| prompt | | | | + todo |
+--------+ +---+---+ +----+----+
^ |
| tool_result |
+----------------+
|
+-----------+-----------+
| TodoManager state |
| [ ] task A |
| [>] task B <- doing |
| [x] task C |
+-----------------------+
|
if rounds_since_todo >= 3:
inject <reminder> into tool_result

工作原理

1. TodoManager 存储带状态的项目。同一时间只允许一个 in_progress

1
2
3
4
5
6
7
8
9
10
11
12
13
class TodoManager:
def update(self, items: list) -> str:
validated, in_progress_count = [], 0
for item in items:
status = item.get("status", "pending")
if status == "in_progress":
in_progress_count += 1
validated.append({"id": item["id"], "text": item["text"],
"status": status})
if in_progress_count > 1:
raise ValueError("Only one task can be in_progress")
self.items = validated
return self.render()

2. todo 工具和其他工具一样加入 dispatch map。

1
2
3
4
TOOL_HANDLERS = {
# ...base tools...
"todo": lambda **kw: TODO.update(kw["items"]),
}

3. nag reminder:模型连续 3 轮以上不调用 todo 时注入提醒。

1
2
3
4
5
6
7
if rounds_since_todo >= 3 and messages:
last = messages[-1]
if last["role"] == "user" and isinstance(last.get("content"), list):
last["content"].insert(0, {
"type": "text",
"text": "<reminder>Update your todos.</reminder>",
})

“同时只能有一个 in_progress” 强制顺序聚焦。nag reminder 制造问责压力 – 你不更新计划,系统就追着你问。

变更内容

组件 之前 (s02) 之后 (s03)
Tools 4 5(+todo)
规划 带状态的 TodoManager
Nag 注入 3 轮后注入 <reminder>
Agent loop 简单分发 + rounds_since_todo 计数器

试一试

1
2
cd learn-claude-code
python agents/s03_todo_write.py
  • Refactor the file hello.py: add type hints, docstrings, and a main guard
  • Create a Python package with __init__.py, utils.py, and tests/test_utils.py
  • Review all Python files and fix any style issues

第四章:子 Agent(s04)

分类: 规划与协调 | 副标题: 子 Agent 上下文隔离
代码量: 151 行 | 工具数: 5

“大任务拆小,每个小任务干净的上下文” – 子智能体用独立 messages[],不污染主对话。

问题

智能体工作越久,messages 数组越胖。每次读文件、跑命令的输出都永久留在上下文里。”这个项目用什么测试框架?” 可能要读 5 个文件,但父智能体只需要一个词:”pytest。”

解决方案

1
2
3
4
5
6
7
8
9
10
11
Parent agent                     Subagent
+------------------+ +------------------+
| messages=[...] | | messages=[] | <-- fresh
| | dispatch | |
| tool: task | ----------> | while tool_use: |
| prompt="..." | | call tools |
| | summary | append results |
| result = "..." | <---------- | return last text |
+------------------+ +------------------+

Parent context stays clean. Subagent context is discarded.

工作原理

1. 父智能体有一个 task 工具。子智能体拥有除 task 外的所有基础工具(禁止递归生成)。

1
2
3
4
5
6
7
8
9
PARENT_TOOLS = CHILD_TOOLS + [
{"name": "task",
"description": "Spawn a subagent with fresh context.",
"input_schema": {
"type": "object",
"properties": {"prompt": {"type": "string"}},
"required": ["prompt"],
}},
]

2. 子智能体以 messages=[] 启动,运行自己的循环。只有最终文本返回给父智能体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def run_subagent(prompt: str) -> str:
sub_messages = [{"role": "user", "content": prompt}]
for _ in range(30): # safety limit
response = client.messages.create(
model=MODEL, system=SUBAGENT_SYSTEM,
messages=sub_messages,
tools=CHILD_TOOLS, max_tokens=8000,
)
sub_messages.append({"role": "assistant",
"content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input)
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)[:50000]})
sub_messages.append({"role": "user", "content": results})
return "".join(
b.text for b in response.content if hasattr(b, "text")
) or "(no summary)"

子智能体可能跑了 30+ 次工具调用,但整个消息历史直接丢弃。父智能体收到的只是一段摘要文本,作为普通 tool_result 返回。

变更内容

组件 之前 (s03) 之后 (s04)
Tools 5 5(基础)+ task(仅父端)
上下文 单一共享 父 + 子隔离
Subagent run_subagent() 函数
返回值 不适用 仅摘要文本

试一试

1
2
cd learn-claude-code
python agents/s04_subagent.py
  • Use a subtask to find what testing framework this project uses
  • Delegate: read all .py files and summarize what each one does
  • Use a task to create a new module, then verify it from here

第五章:技能(s05)

分类: 规划与协调 | 副标题: 按需技能加载
代码量: 187 行 | 工具数: 5

“用到什么知识,临时加载什么知识” – 通过 tool_result 注入,不塞 system prompt。

问题

你希望智能体遵循特定领域的工作流:git 约定、测试模式、代码审查清单。全塞进系统提示太浪费 – 10 个技能,每个 2000 token,就是 20,000 token,大部分跟当前任务毫无关系。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
System prompt (Layer 1 -- always present):
+--------------------------------------+
| You are a coding agent. |
| Skills available: |
| - git: Git workflow helpers | ~100 tokens/skill
| - test: Testing best practices |
+--------------------------------------+

When model calls load_skill("git"):
+--------------------------------------+
| tool_result (Layer 2 -- on demand): |
| <skill name="git"> |
| Full git workflow instructions... | ~2000 tokens
| Step 1: ... |
| </skill> |
+--------------------------------------+

第一层:系统提示中放技能名称(低成本)。第二层:tool_result 中按需放完整内容。

工作原理

1. 每个技能是一个目录,包含 SKILL.md 文件和 YAML frontmatter。

1
2
3
4
5
6
7
8
9
10
11
12
13
skills/
pdf/
SKILL.md # ---
# name: pdf
# description: Process PDF files
# ---
# ...
code-review/
SKILL.md # ---
# name: code-review
# description: Review code
# ---
# ...

2. SkillLoader 递归扫描 SKILL.md 文件,用目录名作为技能标识。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class SkillLoader:
def __init__(self, skills_dir: Path):
self.skills = {}
for f in sorted(skills_dir.rglob("SKILL.md")):
text = f.read_text()
meta, body = self._parse_frontmatter(text)
name = meta.get("name", f.parent.name)
self.skills[name] = {"meta": meta, "body": body}

def get_descriptions(self) -> str:
lines = []
for name, skill in self.skills.items():
desc = skill["meta"].get("description", "")
lines.append(f" - {name}: {desc}")
return "\n".join(lines)

def get_content(self, name: str) -> str:
skill = self.skills.get(name)
if not skill:
return f"Error: Unknown skill '{name}'."
return f'<skill name="{name}">\n{skill["body"]}\n</skill>'

3. 第一层写入系统提示。第二层不过是 dispatch map 中的又一个工具。

1
2
3
4
5
6
7
8
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Skills available:
{SKILL_LOADER.get_descriptions()}"""

TOOL_HANDLERS = {
# ...base tools...
"load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
}

模型知道有哪些技能(便宜),需要时再加载完整内容(贵)。

变更内容

组件 之前 (s04) 之后 (s05)
Tools 5(基础 + task) 5(基础 + load_skill)
系统提示 静态字符串 + 技能描述列表
知识库 skills/*/SKILL.md 文件
注入方式 两层(系统提示 + result)

试一试

1
2
cd learn-claude-code
python agents/s05_skill_loading.py
  • What skills are available?
  • Load the agent-builder skill and follow its instructions
  • I need to do a code review -- load the relevant skill first
  • Build an MCP server using the mcp-builder skill

第六章:上下文压缩(s06)

分类: 内存管理 | 副标题: 三层上下文压缩
代码量: 205 行 | 工具数: 5

“上下文总会满,要有办法腾地方” – 三层压缩策略,换来无限会话。

问题

上下文窗口是有限的。读一个 1000 行的文件就吃掉 ~4000 token;读 30 个文件、跑 20 条命令,轻松突破 100k token。不压缩,智能体根本没法在大项目里干活。

解决方案

三层压缩,激进程度递增:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Every turn:
+------------------+
| Tool call result |
+------------------+
|
v
[Layer 1: micro_compact] (silent, every turn)
Replace tool_result > 3 turns old
with "[Previous: used {tool_name}]"
|
v
[Check: tokens > 50000?]
| |
no yes
| |
v v
continue [Layer 2: auto_compact]
Save transcript to .transcripts/
LLM summarizes conversation.
Replace all messages with [summary].
|
v
[Layer 3: compact tool]
Model calls compact explicitly.
Same summarization as auto_compact.

工作原理

1. 第一层 – micro_compact: 每次 LLM 调用前,将旧的 tool result 替换为占位符。

1
2
3
4
5
6
7
8
9
10
11
12
13
def micro_compact(messages: list) -> list:
tool_results = []
for i, msg in enumerate(messages):
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for j, part in enumerate(msg["content"]):
if isinstance(part, dict) and part.get("type") == "tool_result":
tool_results.append((i, j, part))
if len(tool_results) <= KEEP_RECENT:
return messages
for _, _, part in tool_results[:-KEEP_RECENT]:
if len(part.get("content", "")) > 100:
part["content"] = f"[Previous: used {tool_name}]"
return messages

2. 第二层 – auto_compact: token 超过阈值时,保存完整对话到磁盘,让 LLM 做摘要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def auto_compact(messages: list) -> list:
# Save transcript for recovery
transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with open(transcript_path, "w") as f:
for msg in messages:
f.write(json.dumps(msg, default=str) + "\n")
# LLM summarizes
response = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content":
"Summarize this conversation for continuity..."
+ json.dumps(messages, default=str)[:80000]}],
max_tokens=2000,
)
return [
{"role": "user", "content": f"[Compressed]\n\n{response.content[0].text}"},
{"role": "assistant", "content": "Understood. Continuing."},
]

3. 第三层 – manual compact: compact 工具按需触发同样的摘要机制。

4. 循环整合三层:

1
2
3
4
5
6
7
8
9
def agent_loop(messages: list):
while True:
micro_compact(messages) # Layer 1
if estimate_tokens(messages) > THRESHOLD:
messages[:] = auto_compact(messages) # Layer 2
response = client.messages.create(...)
# ... tool execution ...
if manual_compact:
messages[:] = auto_compact(messages) # Layer 3

完整历史通过 transcript 保存在磁盘上。信息没有真正丢失,只是移出了活跃上下文。

变更内容

组件 之前 (s05) 之后 (s06)
Tools 5 5(基础 + compact)
上下文管理 三层压缩
Micro-compact 旧结果 → 占位符
Auto-compact token 阈值触发
Transcripts 保存到 .transcripts/

试一试

1
2
cd learn-claude-code
python agents/s06_context_compact.py
  • Read every Python file in the agents/ directory one by one(观察 micro-compact 替换旧结果)
  • Keep reading files until compression triggers automatically
  • Use the compact tool to manually compress the conversation

第七章:任务系统(s07)

分类: 规划与协调 | 副标题: 任务依赖图
代码量: 207 行 | 工具数: 8

“大目标要拆成小任务,排好序,记在磁盘上” – 文件持久化的任务图,为多 agent 协作打基础。

问题

s03 的 TodoManager 只是内存中的扁平清单:没有顺序、没有依赖、状态只有做完没做完。真实目标是有结构的 – 任务 B 依赖任务 A,任务 C 和 D 可以并行,任务 E 要等 C 和 D 都完成。

没有显式的关系,智能体分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里,上下文压缩(s06)一跑就没了。

解决方案

把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件,有状态、前置依赖(blockedBy)和后置依赖(blocks)。任务图随时回答三个问题:

  • 什么可以做? – 状态为 pendingblockedBy 为空的任务。
  • 什么被卡住? – 等待前置任务完成的任务。
  • 什么做完了? – 状态为 completed 的任务,完成时自动解锁后续任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
.tasks/
task_1.json {"id":1, "status":"completed"}
task_2.json {"id":2, "blockedBy":[1], "status":"pending"}
task_3.json {"id":3, "blockedBy":[1], "status":"pending"}
task_4.json {"id":4, "blockedBy":[2,3], "status":"pending"}

任务图(DAG):
+----------+
+--> | task 2 | --+
| | pending | |
+----------+ +----------+ +--> +----------+
| task 1 | | task 4 |
| completed| --> +----------+ +--> | blocked |
+----------+ | task 3 | --+ +----------+
| pending |
+----------+

顺序: task 1 必须先完成,才能开始 2 和 3
并行: task 2 和 3 可以同时执行
依赖: task 4 要等 2 和 3 都完成
状态: pending -> in_progress -> completed

这个任务图是 s07 之后所有机制的协调骨架:后台执行(s08)、多 agent 团队(s09+)、worktree 隔离(s12)都读写这同一个结构。

工作原理

1. TaskManager: 每个任务一个 JSON 文件,CRUD + 依赖图。

1
2
3
4
5
6
7
8
9
10
11
12
13
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1

def create(self, subject, description=""):
task = {"id": self._next_id, "subject": subject,
"status": "pending", "blockedBy": [],
"blocks": [], "owner": ""}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)

2. 依赖解除: 完成任务时,自动将其 ID 从其他任务的 blockedBy 中移除,解锁后续任务。

1
2
3
4
5
6
def _clear_dependency(self, completed_id):
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)

3. 状态变更 + 依赖关联: update 处理状态转换和依赖边。

1
2
3
4
5
6
7
8
def update(self, task_id, status=None,
add_blocked_by=None, add_blocks=None):
task = self._load(task_id)
if status:
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
self._save(task)

4. 四个任务工具加入 dispatch map。

1
2
3
4
5
6
7
TOOL_HANDLERS = {
# ...base tools...
"task_create": lambda **kw: TASKS.create(kw["subject"]),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
}

从 s07 起,任务图是多步工作的默认选择。s03 的 Todo 仍可用于单次会话内的快速清单。

变更内容

组件 之前 (s06) 之后 (s07)
Tools 5 8(task_create/update/list/get)
规划模型 扁平清单(仅内存) 带依赖关系的任务图(磁盘)
关系 blockedBy + blocks
状态追踪 做完没做完 pendingin_progresscompleted
持久化 压缩后丢失 压缩和重启后存活

试一试

1
2
cd learn-claude-code
python agents/s07_task_system.py
  • Create 3 tasks: "Setup project", "Write code", "Write tests". Make them depend on each other in order.
  • List all tasks and show the dependency graph
  • Complete task 1 and then list tasks to see task 2 unblocked
  • Create a task board for refactoring: parse -> transform -> emit -> test, where transform and emit can run in parallel after parse

第八章:后台任务(s08)

分类: 并发 | 副标题: 后台任务通道
代码量: 198 行 | 工具数: 6

“慢操作丢后台,agent 继续想下一步” – 后台线程跑命令,完成后注入通知。

问题

有些命令要跑好几分钟:npm installpytestdocker build。阻塞式循环下模型只能干等。用户说 “装依赖,顺便建个配置文件”,智能体却只能一个一个来。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Main thread                Background thread
+-----------------+ +-----------------+
| agent loop | | subprocess runs |
| ... | | ... |
| [LLM call] <---+------- | enqueue(result) |
| ^drain queue | +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
| |
v v
[A runs] [B runs] (parallel)
| |
+-- results injected before next LLM call --+

工作原理

1. BackgroundManager 用线程安全的通知队列追踪任务。

1
2
3
4
5
class BackgroundManager:
def __init__(self):
self.tasks = {}
self._notification_queue = []
self._lock = threading.Lock()

2. run() 启动守护线程,立即返回。

1
2
3
4
5
6
7
def run(self, command: str) -> str:
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {"status": "running", "command": command}
thread = threading.Thread(
target=self._execute, args=(task_id, command), daemon=True)
thread.start()
return f"Background task {task_id} started"

3. 子进程完成后,结果进入通知队列。

1
2
3
4
5
6
7
8
9
10
def _execute(self, task_id, command):
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=300)
output = (r.stdout + r.stderr).strip()[:50000]
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
with self._lock:
self._notification_queue.append({
"task_id": task_id, "result": output[:500]})

4. 每次 LLM 调用前排空通知队列。

1
2
3
4
5
6
7
8
9
10
11
12
def agent_loop(messages: list):
while True:
notifs = BG.drain_notifications()
if notifs:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
messages.append({"role": "user",
"content": f"<background-results>\n{notif_text}\n"
f"</background-results>"})
messages.append({"role": "assistant",
"content": "Noted background results."})
response = client.messages.create(...)

循环保持单线程。只有子进程 I/O 被并行化。

变更内容

组件 之前 (s07) 之后 (s08)
Tools 8 6(基础 + background_run + check)
执行方式 仅阻塞 阻塞 + 后台线程
通知机制 每轮排空的队列
并发 守护线程

试一试

1
2
cd learn-claude-code
python agents/s08_background_tasks.py
  • Run "sleep 5 && echo done" in the background, then create a file while it runs
  • Start 3 background tasks: "sleep 2", "sleep 4", "sleep 6". Check their status.
  • Run pytest in the background and keep working on other things

第九章:Agent 团队(s09)

分类: 并发 | 副标题: Agent 团队邮箱
代码量: 348 行 | 工具数: 9

“任务太大一个人干不完,要能分给队友” – 持久化队友 + JSONL 邮箱。

问题

子智能体(s04)是一次性的:生成、干活、返回摘要、消亡。没有身份,没有跨调用的记忆。后台任务(s08)能跑 shell 命令,但做不了 LLM 引导的决策。

真正的团队协作需要三样东西:(1) 能跨多轮对话存活的持久智能体,(2) 身份和生命周期管理,(3) 智能体之间的通信通道。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Teammate lifecycle:
spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN

Communication:
.team/
config.json <- team roster + statuses
inbox/
alice.jsonl <- append-only, drain-on-read
bob.jsonl
lead.jsonl

+--------+ send("alice","bob","...") +--------+
| alice | -----------------------------> | bob |
| loop | bob.jsonl << {json_line} | loop |
+--------+ +--------+
^ |
| BUS.read_inbox("alice") |
+---- alice.jsonl -> read + drain ---------+

工作原理

1. TeammateManager 通过 config.json 维护团队名册。

1
2
3
4
5
6
7
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads = {}

2. spawn() 创建队友并在线程中启动 agent loop。

1
2
3
4
5
6
7
8
9
def spawn(self, name: str, role: str, prompt: str) -> str:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt), daemon=True)
thread.start()
return f"Spawned teammate '{name}' (role: {role})"

3. MessageBus:append-only 的 JSONL 收件箱。send() 追加一行;read_inbox() 读取全部并清空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class MessageBus:
def send(self, sender, to, content, msg_type="message", extra=None):
msg = {"type": msg_type, "from": sender,
"content": content, "timestamp": time.time()}
if extra:
msg.update(extra)
with open(self.dir / f"{to}.jsonl", "a") as f:
f.write(json.dumps(msg) + "\n")

def read_inbox(self, name):
path = self.dir / f"{name}.jsonl"
if not path.exists(): return "[]"
msgs = [json.loads(l) for l in path.read_text().strip().splitlines() if l]
path.write_text("") # drain
return json.dumps(msgs, indent=2)

4. 每个队友在每次 LLM 调用前检查收件箱,将消息注入上下文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _teammate_loop(self, name, role, prompt):
messages = [{"role": "user", "content": prompt}]
for _ in range(50):
inbox = BUS.read_inbox(name)
if inbox != "[]":
messages.append({"role": "user",
"content": f"<inbox>{inbox}</inbox>"})
messages.append({"role": "assistant",
"content": "Noted inbox messages."})
response = client.messages.create(...)
if response.stop_reason != "tool_use":
break
# execute tools, append results...
self._find_member(name)["status"] = "idle"

变更内容

组件 之前 (s08) 之后 (s09)
Tools 6 9(+spawn/send/read_inbox)
智能体数量 单一 领导 + N 个队友
持久化 config.json + JSONL 收件箱
线程 后台命令 每线程完整 agent loop
生命周期 一次性 idle → working → idle
通信 message + broadcast

试一试

1
2
cd learn-claude-code
python agents/s09_agent_teams.py
  • Spawn alice (coder) and bob (tester). Have alice send bob a message.
  • Broadcast "status update: phase 1 complete" to all teammates
  • Check the lead inbox for any messages
  • 输入 /team 查看团队名册和状态
  • 输入 /inbox 手动检查领导的收件箱

第十章:团队协议(s10)

分类: 并发 | 副标题: FSM 团队协议
代码量: 419 行 | 工具数: 12

“队友之间要有统一的沟通规矩” – 一个 request-response 模式驱动所有协商。

问题

s09 中队友能干活能通信,但缺少结构化协调:

关机: 直接杀线程会留下写了一半的文件和过期的 config.json。需要握手 – 领导请求,队友批准(收尾退出)或拒绝(继续干)。

计划审批: 领导说 “重构认证模块”,队友立刻开干。高风险变更应该先过审。

两者结构一样:一方发带唯一 ID 的请求,另一方引用同一 ID 响应。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Shutdown Protocol            Plan Approval Protocol
================== ======================

Lead Teammate Teammate Lead
| | | |
|--shutdown_req-->| |--plan_req------>|
| {req_id:"abc"} | | {req_id:"xyz"} |
| | | |
|<--shutdown_resp-| |<--plan_resp-----|
| {req_id:"abc", | | {req_id:"xyz", |
| approve:true} | | approve:true} |

Shared FSM:
[pending] --approve--> [approved]
[pending] --reject---> [rejected]

Trackers:
shutdown_requests = {req_id: {target, status}}
plan_requests = {req_id: {from, plan, status}}

工作原理

1. 领导生成 request_id,通过收件箱发起关机请求。

1
2
3
4
5
6
7
8
shutdown_requests = {}

def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
BUS.send("lead", teammate, "Please shut down gracefully.",
"shutdown_request", {"request_id": req_id})
return f"Shutdown request {req_id} sent (status: pending)"

2. 队友收到请求后,用 approve/reject 响应。

1
2
3
4
5
6
7
if tool_name == "shutdown_response":
req_id = args["request_id"]
approve = args["approve"]
shutdown_requests[req_id]["status"] = "approved" if approve else "rejected"
BUS.send(sender, "lead", args.get("reason", ""),
"shutdown_response",
{"request_id": req_id, "approve": approve})

3. 计划审批遵循完全相同的模式。队友提交计划(生成 request_id),领导审查(引用同一个 request_id)。

1
2
3
4
5
6
7
8
plan_requests = {}

def handle_plan_review(request_id, approve, feedback=""):
req = plan_requests[request_id]
req["status"] = "approved" if approve else "rejected"
BUS.send("lead", req["from"], feedback,
"plan_approval_response",
{"request_id": request_id, "approve": approve})

一个 FSM,两种用途。同样的 pending → approved | rejected 状态机可以套用到任何请求-响应协议上。

变更内容

组件 之前 (s09) 之后 (s10)
Tools 9 12(+shutdown_req/resp +plan)
关机 仅自然退出 请求-响应握手
计划门控 提交/审查与审批
关联 每个请求一个 request_id
FSM pending → approved/rejected

试一试

1
2
cd learn-claude-code
python agents/s10_team_protocols.py
  • Spawn alice as a coder. Then request her shutdown.
  • List teammates to see alice's status after shutdown approval
  • Spawn bob with a risky refactoring task. Review and reject his plan.
  • Spawn charlie, have him submit a plan, then approve it.
  • 输入 /team 监控状态

第十一章:自主 Agent(s11)

分类: 并发 | 副标题: 自主 Agent 循环
代码量: 499 行 | 工具数: 14

“队友自己看看板,有活就认领” – 不需要领导逐个分配,自组织。

问题

s09-s10 中,队友只在被明确指派时才动。领导得给每个队友写 prompt,任务看板上 10 个未认领的任务得手动分配。这扩展不了。

真正的自治:队友自己扫描任务看板,认领没人做的任务,做完再找下一个。

一个细节:上下文压缩(s06)后智能体可能忘了自己是谁。身份重注入解决这个问题。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Teammate lifecycle with idle cycle:

+-------+
| spawn |
+---+---+
|
v
+-------+ tool_use +-------+
| WORK | <------------- | LLM |
+---+---+ +-------+
|
| stop_reason != tool_use (or idle tool called)
v
+--------+
| IDLE | poll every 5s for up to 60s
+---+----+
|
+---> check inbox --> message? ----------> WORK
|
+---> scan .tasks/ --> unclaimed? -------> claim -> WORK
|
+---> 60s timeout ----------------------> SHUTDOWN

Identity re-injection after compression:
if len(messages) <= 3:
messages.insert(0, identity_block)

工作原理

1. 队友循环分两个阶段:WORK 和 IDLE。LLM 停止调用工具(或调用了 idle)时,进入 IDLE。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _loop(self, name, role, prompt):
while True:
# -- WORK PHASE --
messages = [{"role": "user", "content": prompt}]
for _ in range(50):
response = client.messages.create(...)
if response.stop_reason != "tool_use":
break
# execute tools...
if idle_requested:
break

# -- IDLE PHASE --
self._set_status(name, "idle")
resume = self._idle_poll(name, messages)
if not resume:
self._set_status(name, "shutdown")
return
self._set_status(name, "working")

2. 空闲阶段循环轮询收件箱和任务看板。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def _idle_poll(self, name, messages):
for _ in range(IDLE_TIMEOUT // POLL_INTERVAL): # 60s / 5s = 12
time.sleep(POLL_INTERVAL)
inbox = BUS.read_inbox(name)
if inbox:
messages.append({"role": "user",
"content": f"<inbox>{inbox}</inbox>"})
return True
unclaimed = scan_unclaimed_tasks()
if unclaimed:
claim_task(unclaimed[0]["id"], name)
messages.append({"role": "user",
"content": f"<auto-claimed>Task #{unclaimed[0]['id']}: "
f"{unclaimed[0]['subject']}</auto-claimed>"})
return True
return False # timeout -> shutdown

3. 任务看板扫描:找 pending 状态、无 owner、未被阻塞的任务。

1
2
3
4
5
6
7
8
9
def scan_unclaimed_tasks() -> list:
unclaimed = []
for f in sorted(TASKS_DIR.glob("task_*.json")):
task = json.loads(f.read_text())
if (task.get("status") == "pending"
and not task.get("owner")
and not task.get("blockedBy")):
unclaimed.append(task)
return unclaimed

4. 身份重注入:上下文过短(说明发生了压缩)时,在开头插入身份块。

1
2
3
4
5
6
if len(messages) <= 3:
messages.insert(0, {"role": "user",
"content": f"<identity>You are '{name}', role: {role}, "
f"team: {team_name}. Continue your work.</identity>"})
messages.insert(1, {"role": "assistant",
"content": f"I am {name}. Continuing."})

变更内容

组件 之前 (s10) 之后 (s11)
Tools 12 14(+idle, +claim_task)
自治性 领导指派 自组织
空闲阶段 轮询收件箱 + 任务看板
任务认领 仅手动 自动认领未分配任务
身份 系统提示 + 压缩后重注入
超时 60 秒空闲 → 自动关机

试一试

1
2
cd learn-claude-code
python agents/s11_autonomous_agents.py
  • Create 3 tasks on the board, then spawn alice and bob. Watch them auto-claim.
  • Spawn a coder teammate and let it find work from the task board itself
  • Create tasks with dependencies. Watch teammates respect the blocked order.
  • 输入 /tasks 查看带 owner 的任务看板
  • 输入 /team 监控谁在工作、谁在空闲

第十二章:Worktree + 任务隔离(s12)

分类: 并发 | 副标题: Worktree + Task Isolation
代码量: 694 行 | 工具数: 16

“Each works in its own directory; tasks manage goals, worktrees manage directories, bound by ID”

“各干各的目录,互不干扰” – 任务管目标,worktree 管目录,按 ID 绑定。

问题

到 s11,智能体已经能自主认领和完成任务。但所有任务共享一个目录。两个智能体同时重构不同模块 – A 改 config.py,B 也改 config.py,未提交的改动互相污染,谁也没法干净回滚。

任务板管 “做什么” 但不管 “在哪做”。解法:给每个任务一个独立的 git worktree 目录,用任务 ID 把两边关联起来。

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Control plane (.tasks/)             Execution plane (.worktrees/)
+------------------+ +------------------------+
| task_1.json | | auth-refactor/ |
| status: in_progress <------> branch: wt/auth-refactor
| worktree: "auth-refactor" | task_id: 1 |
+------------------+ +------------------------+
| task_2.json | | ui-login/ |
| status: pending <------> branch: wt/ui-login
| worktree: "ui-login" | task_id: 2 |
+------------------+ +------------------------+
|
index.json (worktree registry)
events.jsonl (lifecycle log)

State machines:
Task: pending -> in_progress -> completed
Worktree: absent -> active -> removed | kept

工作原理

1. 创建任务。 先把目标持久化。

1
2
TASKS.create("Implement auth refactor")
# -> .tasks/task_1.json status=pending worktree=""

2. 创建 worktree 并绑定任务。 传入 task_id 自动将任务推进到 in_progress

1
2
3
WORKTREES.create("auth-refactor", task_id=1)
# -> git worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD
# -> index.json gets new entry, task_1.json gets worktree="auth-refactor"

绑定同时写入两侧状态:

1
2
3
4
5
6
def bind_worktree(self, task_id, worktree):
task = self._load(task_id)
task["worktree"] = worktree
if task["status"] == "pending":
task["status"] = "in_progress"
self._save(task)

3. 在 worktree 中执行命令。 cwd 指向隔离目录。

1
2
subprocess.run(command, shell=True, cwd=worktree_path,
capture_output=True, text=True, timeout=300)

4. 收尾。 两种选择:

  • worktree_keep(name) – 保留目录供后续使用。
  • worktree_remove(name, complete_task=True) – 删除目录,完成绑定任务,发出事件。一个调用搞定拆除 + 完成。
1
2
3
4
5
6
def remove(self, name, force=False, complete_task=False):
self._run_git(["worktree", "remove", wt["path"]])
if complete_task and wt.get("task_id") is not None:
self.tasks.update(wt["task_id"], status="completed")
self.tasks.unbind_worktree(wt["task_id"])
self.events.emit("task.completed", ...)

5. 事件流。 每个生命周期步骤写入 .worktrees/events.jsonl

1
2
3
4
5
6
{
"event": "worktree.remove.after",
"task": {"id": 1, "status": "completed"},
"worktree": {"name": "auth-refactor", "status": "removed"},
"ts": 1730000000
}

事件类型:worktree.create.before/after/failedworktree.remove.before/after/failedworktree.keeptask.completed

崩溃后从 .tasks/ + .worktrees/index.json 重建现场。会话记忆是易失的;磁盘状态是持久的。

变更内容

组件 之前 (s11) 之后 (s12)
协调 任务板(owner/status) 任务板 + worktree 显式绑定
执行范围 共享目录 每个任务独立目录
可恢复性 仅任务状态 任务状态 + worktree 索引
收尾 任务完成 任务完成 + 显式 keep/remove
生命周期可见性 隐式日志 .worktrees/events.jsonl 显式事件流

试一试

1
2
cd learn-claude-code
python agents/s12_worktree_task_isolation.py
  • Create tasks for backend auth and frontend login page, then list tasks.
  • Create worktree "auth-refactor" for task 1, then bind task 2 to a new worktree "ui-login".
  • Run "git status --short" in worktree "auth-refactor".
  • Keep worktree "ui-login", then list worktrees and inspect events.
  • Remove worktree "auth-refactor" with complete_task=true, then list tasks/worktrees/events.

附录:课程代码量增长

章节 代码行数(LOC)
s01 Agent 循环 84
s02 工具 120
s03 TodoWrite 176
s04 子 Agent 151
s05 技能 187
s06 上下文压缩 205
s07 任务系统 207
s08 后台任务 198
s09 Agent 团队 348
s10 团队协议 419
s11 自主 Agent 499
s12 Worktree + 任务隔离 694

核心学习路径

1
s01 → s02 → s03 → s04 → s05 → s06 | s07 → s08 → s09 → s10 → s11 → s12

左侧(s01-s06):单智能体能力增强

  • s01:最基础的 while 循环 + bash 工具
  • s02:多工具分发映射(dispatch map)
  • s03:TodoWrite 计划与提醒
  • s04:子 Agent 上下文隔离
  • s05:按需技能加载
  • s06:三层上下文压缩

右侧(s07-s12):多智能体协作

  • s07:带依赖关系的文件持久化任务图
  • s08:后台任务并行执行
  • s09:Agent 团队 + JSONL 邮箱通信
  • s10:FSM 团队协议(关机握手、计划审批)
  • s11:自主 Agent(自组织认领任务)
  • s12:Worktree 任务隔离(独立目录执行)

本文档整理自 https://learn.shareai.run/zh/timeline/ 的全部 12 个章节内容。


附录:完整源代码

以下源代码来自 shareAI-lab/learn-claude-code 仓库。

第一章:Agent 循环(s01) - 源代码

文件: s01_agent_loop.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
# Harness: the loop -- the model's first connection to the real world.
"""
s01_agent_loop.py - The Agent Loop

The entire secret of an AI coding agent in one pattern:

while stop_reason == "tool_use":
response = LLM(messages, tools)
execute tools
append results

+----------+ +-------+ +---------+
| User | ---> | LLM | ---> | Tool |
| prompt | | | | execute |
+----------+ +---+---+ +----+----+
^ |
| tool_result |
+---------------+
(loop continues)

This is the core loop: feed tool results back to the model
until the model decides to stop. Production agents layer
policy, hooks, and lifecycle controls on top.
"""

import os
import subprocess

try:
import readline
# #143 UTF-8 backspace fix for macOS libedit
readline.parse_and_bind('set bind-tty-special-chars off')
readline.parse_and_bind('set input-meta on')
readline.parse_and_bind('set output-meta on')
readline.parse_and_bind('set convert-meta off')
readline.parse_and_bind('set enable-meta-keybindings on')
except ImportError:
pass

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]

SYSTEM = f"You are a coding agent at {os.getcwd()}. Use bash to solve tasks. Act, don't explain."

TOOLS = [{
"name": "bash",
"description": "Run a shell command.",
"input_schema": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"],
},
}]


def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=os.getcwd(),
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"


# -- The core pattern: a while loop that calls tools until the model stops --
def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
# Append assistant turn
messages.append({"role": "assistant", "content": response.content})
# If the model didn't call a tool, we're done
if response.stop_reason != "tool_use":
return
# Execute each tool call, collect results
results = []
for block in response.content:
if block.type == "tool_use":
print(f"\033[33m$ {block.input['command']}\033[0m")
output = run_bash(block.input["command"])
print(output[:200])
results.append({"type": "tool_result", "tool_use_id": block.id,
"content": output})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms01 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第二章:工具(s02) - 源代码

文件: s02_tool_use.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
# Harness: tool dispatch -- expanding what the model can reach.
"""
s02_tool_use.py - Tools

The agent loop from s01 didn't change. We just added tools to the array
and a dispatch map to route calls.

+----------+ +-------+ +------------------+
| User | ---> | LLM | ---> | Tool Dispatch |
| prompt | | | | { |
+----------+ +---+---+ | bash: run_bash |
^ | read: run_read |
| | write: run_wr |
+----------+ edit: run_edit |
tool_result| } |
+------------------+

Key insight: "The loop didn't change at all. I just added tools."
"""

import os
import subprocess
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]

SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain."


def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path


def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"


def run_read(path: str, limit: int = None) -> str:
try:
text = safe_path(path).read_text()
lines = text.splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"


def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"


def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
content = fp.read_text()
if old_text not in content:
return f"Error: Text not found in {path}"
fp.write_text(content.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


# -- The dispatch map: {tool_name: handler} --
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}

TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
]


def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
print(f"> {block.name}:")
print(output[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms02 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第三章:TodoWrite(s03) - 源代码

文件: s03_todo_write.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/env python3
# Harness: planning -- keeping the model on course without scripting the route.
"""
s03_todo_write.py - TodoWrite

The model tracks its own progress via a TodoManager. A nag reminder
forces it to keep updating when it forgets.

+----------+ +-------+ +---------+
| User | ---> | LLM | ---> | Tools |
| prompt | | | | + todo |
+----------+ +---+---+ +----+----+
^ |
| tool_result |
+---------------+
|
+-----------+-----------+
| TodoManager state |
| [ ] task A |
| [>] task B <- doing |
| [x] task C |
+-----------------------+
|
if rounds_since_todo >= 3:
inject <reminder>

Key insight: "The agent can track its own progress -- and I can see it."
"""

import os
import subprocess
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]

SYSTEM = f"""You are a coding agent at {WORKDIR}.
Use the todo tool to plan multi-step tasks. Mark in_progress before starting, completed when done.
Prefer tools over prose."""


# -- TodoManager: structured state the LLM writes to --
class TodoManager:
def __init__(self):
self.items = []

def update(self, items: list) -> str:
if len(items) > 20:
raise ValueError("Max 20 todos allowed")
validated = []
in_progress_count = 0
for i, item in enumerate(items):
text = str(item.get("text", "")).strip()
status = str(item.get("status", "pending")).lower()
item_id = str(item.get("id", str(i + 1)))
if not text:
raise ValueError(f"Item {item_id}: text required")
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Item {item_id}: invalid status '{status}'")
if status == "in_progress":
in_progress_count += 1
validated.append({"id": item_id, "text": text, "status": status})
if in_progress_count > 1:
raise ValueError("Only one task can be in_progress at a time")
self.items = validated
return self.render()

def render(self) -> str:
if not self.items:
return "No todos."
lines = []
for item in self.items:
marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}[item["status"]]
lines.append(f"{marker} #{item['id']}: {item['text']}")
done = sum(1 for t in self.items if t["status"] == "completed")
lines.append(f"\n({done}/{len(self.items)} completed)")
return "\n".join(lines)


TODO = TodoManager()


# -- Tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path

def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"

def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
content = fp.read_text()
if old_text not in content:
return f"Error: Text not found in {path}"
fp.write_text(content.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"todo": lambda **kw: TODO.update(kw["items"]),
}

TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "todo", "description": "Update task list. Track progress on multi-step tasks.",
"input_schema": {"type": "object", "properties": {"items": {"type": "array", "items": {"type": "object", "properties": {"id": {"type": "string"}, "text": {"type": "string"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}}, "required": ["id", "text", "status"]}}}, "required": ["items"]}},
]


# -- Agent loop with nag reminder injection --
def agent_loop(messages: list):
rounds_since_todo = 0
while True:
# Nag reminder is injected below, alongside tool results
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
used_todo = False
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
if block.name == "todo":
used_todo = True
rounds_since_todo = 0 if used_todo else rounds_since_todo + 1
if rounds_since_todo >= 3:
results.append({"type": "text", "text": "<reminder>Update your todos.</reminder>"})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms03 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第四章:子 Agent(s04) - 源代码

文件: s04_subagent.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
# Harness: context isolation -- protecting the model's clarity of thought.
"""
s04_subagent.py - Subagents

Spawn a child agent with fresh messages=[]. The child works in its own
context, sharing the filesystem, then returns only a summary to the parent.

Parent agent Subagent
+------------------+ +------------------+
| messages=[...] | | messages=[] | <-- fresh
| | dispatch | |
| tool: task | ---------->| while tool_use: |
| prompt="..." | | call tools |
| description="" | | append results |
| | summary | |
| result = "..." | <--------- | return last text |
+------------------+ +------------------+
|
Parent context stays clean.
Subagent context is discarded.

Key insight: "Process isolation gives context isolation for free."
"""

import os
import subprocess
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]

SYSTEM = f"You are a coding agent at {WORKDIR}. Use the task tool to delegate exploration or subtasks."
SUBAGENT_SYSTEM = f"You are a coding subagent at {WORKDIR}. Complete the given task, then summarize your findings."


# -- Tool implementations shared by parent and child --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path

def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"

def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
content = fp.read_text()
if old_text not in content:
return f"Error: Text not found in {path}"
fp.write_text(content.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}

# Child gets all base tools except task (no recursive spawning)
CHILD_TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
]


# -- Subagent: fresh context, filtered tools, summary-only return --
def run_subagent(prompt: str) -> str:
sub_messages = [{"role": "user", "content": prompt}] # fresh context
for _ in range(30): # safety limit
response = client.messages.create(
model=MODEL, system=SUBAGENT_SYSTEM, messages=sub_messages,
tools=CHILD_TOOLS, max_tokens=8000,
)
sub_messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)[:50000]})
sub_messages.append({"role": "user", "content": results})
# Only the final text returns to the parent -- child context is discarded
return "".join(b.text for b in response.content if hasattr(b, "text")) or "(no summary)"


# -- Parent tools: base tools + task dispatcher --
PARENT_TOOLS = CHILD_TOOLS + [
{"name": "task", "description": "Spawn a subagent with fresh context. It shares the filesystem but not conversation history.",
"input_schema": {"type": "object", "properties": {"prompt": {"type": "string"}, "description": {"type": "string", "description": "Short description of the task"}}, "required": ["prompt"]}},
]


def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=PARENT_TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
if block.name == "task":
desc = block.input.get("description", "subtask")
print(f"> task ({desc}): {block.input['prompt'][:80]}")
output = run_subagent(block.input["prompt"])
else:
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
print(f" {str(output)[:200]}")
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms04 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第五章:技能(s05) - 源代码

文件: s05_skill_loading.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#!/usr/bin/env python3
# Harness: on-demand knowledge -- domain expertise, loaded when the model asks.
"""
s05_skill_loading.py - Skills

Two-layer skill injection that avoids bloating the system prompt:

Layer 1 (cheap): skill names in system prompt (~100 tokens/skill)
Layer 2 (on demand): full skill body in tool_result

skills/
pdf/
SKILL.md <-- frontmatter (name, description) + body
code-review/
SKILL.md

System prompt:
+--------------------------------------+
| You are a coding agent. |
| Skills available: |
| - pdf: Process PDF files... | <-- Layer 1: metadata only
| - code-review: Review code... |
+--------------------------------------+

When model calls load_skill("pdf"):
+--------------------------------------+
| tool_result: |
| <skill> |
| Full PDF processing instructions | <-- Layer 2: full body
| Step 1: ... |
| Step 2: ... |
| </skill> |
+--------------------------------------+

Key insight: "Don't put everything in the system prompt. Load on demand."
"""

import os
import re
import subprocess
import yaml
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
SKILLS_DIR = WORKDIR / "skills"


# -- SkillLoader: scan skills/<name>/SKILL.md with YAML frontmatter --
class SkillLoader:
def __init__(self, skills_dir: Path):
self.skills_dir = skills_dir
self.skills = {}
self._load_all()

def _load_all(self):
if not self.skills_dir.exists():
return
for f in sorted(self.skills_dir.rglob("SKILL.md")):
text = f.read_text()
meta, body = self._parse_frontmatter(text)
name = meta.get("name", f.parent.name)
self.skills[name] = {"meta": meta, "body": body, "path": str(f)}

def _parse_frontmatter(self, text: str) -> tuple:
"""Parse YAML frontmatter between --- delimiters."""
match = re.match(r"^---\n(.*?)\n---\n(.*)", text, re.DOTALL)
if not match:
return {}, text
try:
meta = yaml.safe_load(match.group(1)) or {}
except yaml.YAMLError:
meta = {}
return meta, match.group(2).strip()

def get_descriptions(self) -> str:
"""Layer 1: short descriptions for the system prompt."""
if not self.skills:
return "(no skills available)"
lines = []
for name, skill in self.skills.items():
desc = skill["meta"].get("description", "No description")
tags = skill["meta"].get("tags", "")
line = f" - {name}: {desc}"
if tags:
line += f" [{tags}]"
lines.append(line)
return "\n".join(lines)

def get_content(self, name: str) -> str:
"""Layer 2: full skill body returned in tool_result."""
skill = self.skills.get(name)
if not skill:
return f"Error: Unknown skill '{name}'. Available: {', '.join(self.skills.keys())}"
return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"


SKILL_LOADER = SkillLoader(SKILLS_DIR)

# Layer 1: skill metadata injected into system prompt
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Use load_skill to access specialized knowledge before tackling unfamiliar topics.

Skills available:
{SKILL_LOADER.get_descriptions()}"""


# -- Tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path

def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"

def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
content = fp.read_text()
if old_text not in content:
return f"Error: Text not found in {path}"
fp.write_text(content.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
}

TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "load_skill", "description": "Load specialized knowledge by name.",
"input_schema": {"type": "object", "properties": {"name": {"type": "string", "description": "Skill name to load"}}, "required": ["name"]}},
]


def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms05 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第六章:上下文压缩(s06) - 源代码

文件: s06_context_compact.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/env python3
# Harness: compression -- clean memory for infinite sessions.
"""
s06_context_compact.py - Compact

Three-layer compression pipeline so the agent can work forever:

Every turn:
+------------------+
| Tool call result |
+------------------+
|
v
[Layer 1: micro_compact] (silent, every turn)
Replace non-read_file tool_result content older than last 3
with "[Previous: used {tool_name}]"
|
v
[Check: tokens > 50000?]
| |
no yes
| |
v v
continue [Layer 2: auto_compact]
Save full transcript to .transcripts/
Ask LLM to summarize conversation.
Replace all messages with [summary].
|
v
[Layer 3: compact tool]
Model calls compact -> immediate summarization.
Same as auto, triggered manually.

Key insight: "The agent can forget strategically and keep working forever."
"""

import json
import os
import subprocess
import time
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]

SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks."

THRESHOLD = 50000
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
KEEP_RECENT = 3
PRESERVE_RESULT_TOOLS = {"read_file"}


def estimate_tokens(messages: list) -> int:
"""Rough token count: ~4 chars per token."""
return len(str(messages)) // 4


# -- Layer 1: micro_compact - replace old tool results with placeholders --
def micro_compact(messages: list) -> list:
# Collect (msg_index, part_index, tool_result_dict) for all tool_result entries
tool_results = []
for msg_idx, msg in enumerate(messages):
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for part_idx, part in enumerate(msg["content"]):
if isinstance(part, dict) and part.get("type") == "tool_result":
tool_results.append((msg_idx, part_idx, part))
if len(tool_results) <= KEEP_RECENT:
return messages
# Find tool_name for each result by matching tool_use_id in prior assistant messages
tool_name_map = {}
for msg in messages:
if msg["role"] == "assistant":
content = msg.get("content", [])
if isinstance(content, list):
for block in content:
if hasattr(block, "type") and block.type == "tool_use":
tool_name_map[block.id] = block.name
# Clear old results (keep last KEEP_RECENT). Preserve read_file outputs because
# they are reference material; compacting them forces the agent to re-read files.
to_clear = tool_results[:-KEEP_RECENT]
for _, _, result in to_clear:
if not isinstance(result.get("content"), str) or len(result["content"]) <= 100:
continue
tool_id = result.get("tool_use_id", "")
tool_name = tool_name_map.get(tool_id, "unknown")
if tool_name in PRESERVE_RESULT_TOOLS:
continue
result["content"] = f"[Previous: used {tool_name}]"
return messages


# -- Layer 2: auto_compact - save transcript, summarize, replace messages --
def auto_compact(messages: list) -> list:
# Save full transcript to disk
TRANSCRIPT_DIR.mkdir(exist_ok=True)
transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with open(transcript_path, "w") as f:
for msg in messages:
f.write(json.dumps(msg, default=str) + "\n")
print(f"[transcript saved: {transcript_path}]")
# Ask LLM to summarize
conversation_text = json.dumps(messages, default=str)[-80000:]
response = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content":
"Summarize this conversation for continuity. Include: "
"1) What was accomplished, 2) Current state, 3) Key decisions made. "
"Be concise but preserve critical details.\n\n" + conversation_text}],
max_tokens=2000,
)
summary = response.content[0].text
# Replace all messages with compressed summary
return [
{"role": "user", "content": f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"},
]


# -- Tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path

def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"

def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
content = fp.read_text()
if old_text not in content:
return f"Error: Text not found in {path}"
fp.write_text(content.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"compact": lambda **kw: "Manual compression requested.",
}

TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "compact", "description": "Trigger manual conversation compression.",
"input_schema": {"type": "object", "properties": {"focus": {"type": "string", "description": "What to preserve in the summary"}}}},
]


def agent_loop(messages: list):
while True:
# Layer 1: micro_compact before each LLM call
micro_compact(messages)
# Layer 2: auto_compact if token estimate exceeds threshold
if estimate_tokens(messages) > THRESHOLD:
print("[auto_compact triggered]")
messages[:] = auto_compact(messages)
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
manual_compact = False
for block in response.content:
if block.type == "tool_use":
if block.name == "compact":
manual_compact = True
output = "Compressing..."
else:
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
messages.append({"role": "user", "content": results})
# Layer 3: manual compact triggered by the compact tool
if manual_compact:
print("[manual compact]")
messages[:] = auto_compact(messages)
return


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms06 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第七章:任务系统(s07) - 源代码

文件: s07_task_system.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python3
# Harness: persistent tasks -- goals that outlive any single conversation.
"""
s07_task_system.py - Tasks

Tasks persist as JSON files in .tasks/ so they survive context compression.
Each task has a dependency graph (blockedBy).

.tasks/
task_1.json {"id":1, "subject":"...", "status":"completed", ...}
task_2.json {"id":2, "blockedBy":[1], "status":"pending", ...}
task_3.json {"id":3, "blockedBy":[2], ...}

Dependency resolution:
+----------+ +----------+ +----------+
| task 1 | --> | task 2 | --> | task 3 |
| complete | | blocked | | blocked |
+----------+ +----------+ +----------+
| ^
+--- completing task 1 removes it from task 2's blockedBy

Key insight: "State that survives compression -- because it's outside the conversation."
"""

import json
import os
import subprocess
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TASKS_DIR = WORKDIR / ".tasks"

SYSTEM = f"You are a coding agent at {WORKDIR}. Use task tools to plan and track work."


# -- TaskManager: CRUD with dependency graph, persisted as JSON files --
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1

def _max_id(self) -> int:
ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
return max(ids) if ids else 0

def _load(self, task_id: int) -> dict:
path = self.dir / f"task_{task_id}.json"
if not path.exists():
raise ValueError(f"Task {task_id} not found")
return json.loads(path.read_text())

def _save(self, task: dict):
path = self.dir / f"task_{task['id']}.json"
path.write_text(json.dumps(task, indent=2, ensure_ascii=False))

def create(self, subject: str, description: str = "") -> str:
task = {
"id": self._next_id, "subject": subject, "description": description,
"status": "pending", "blockedBy": [], "owner": "",
}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2, ensure_ascii=False)

def get(self, task_id: int) -> str:
return json.dumps(self._load(task_id), indent=2, ensure_ascii=False)

def update(self, task_id: int, status: str = None,
add_blocked_by: list = None, remove_blocked_by: list = None) -> str:
task = self._load(task_id)
if status:
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Invalid status: {status}")
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
if add_blocked_by:
task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
if remove_blocked_by:
task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by]
self._save(task)
return json.dumps(task, indent=2, ensure_ascii=False)

def _clear_dependency(self, completed_id: int):
"""Remove completed_id from all other tasks' blockedBy lists."""
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)

def list_all(self) -> str:
tasks = []
files = sorted(
self.dir.glob("task_*.json"),
key=lambda f: int(f.stem.split("_")[1])
)
for f in files:
tasks.append(json.loads(f.read_text()))
if not tasks:
return "No tasks."
lines = []
for t in tasks:
marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
return "\n".join(lines)


TASKS = TaskManager(TASKS_DIR)


# -- Base tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path

def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"

def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("removeBlockedBy")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
}

TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "task_create", "description": "Create a new task.",
"input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}}, "required": ["subject"]}},
{"name": "task_update", "description": "Update a task's status or dependencies.",
"input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}, "addBlockedBy": {"type": "array", "items": {"type": "integer"}}, "removeBlockedBy": {"type": "array", "items": {"type": "integer"}}}, "required": ["task_id"]}},
{"name": "task_list", "description": "List all tasks with status summary.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "task_get", "description": "Get full details of a task by ID.",
"input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
]


def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms07 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第八章:后台任务(s08) - 源代码

文件: s08_background_tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/env python3
# Harness: background execution -- the model thinks while the harness waits.
"""
s08_background_tasks.py - Background Tasks

Run commands in background threads. A notification queue is drained
before each LLM call to deliver results.

Main thread Background thread
+-----------------+ +-----------------+
| agent loop | | task executes |
| ... | | ... |
| [LLM call] <---+------- | enqueue(result) |
| ^drain queue | +-----------------+
+-----------------+

Timeline:
Agent ----[spawn A]----[spawn B]----[other work]----
| |
v v
[A runs] [B runs] (parallel)
| |
+-- notification queue --> [results injected]

Key insight: "Fire and forget -- the agent doesn't block while the command runs."
"""

import os
import subprocess
import threading
import uuid
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]

SYSTEM = f"You are a coding agent at {WORKDIR}. Use background_run for long-running commands."


# -- BackgroundManager: threaded execution + notification queue --
class BackgroundManager:
def __init__(self):
self.tasks = {} # task_id -> {status, result, command}
self._notification_queue = [] # completed task results
self._lock = threading.Lock()

def run(self, command: str) -> str:
"""Start a background thread, return task_id immediately."""
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {"status": "running", "result": None, "command": command}
thread = threading.Thread(
target=self._execute, args=(task_id, command), daemon=True
)
thread.start()
return f"Background task {task_id} started: {command[:80]}"

def _execute(self, task_id: str, command: str):
"""Thread target: run subprocess, capture output, push to queue."""
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=300
)
output = (r.stdout + r.stderr).strip()[:50000]
status = "completed"
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
status = "timeout"
except Exception as e:
output = f"Error: {e}"
status = "error"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
with self._lock:
self._notification_queue.append({
"task_id": task_id,
"status": status,
"command": command[:80],
"result": (output or "(no output)")[:500],
})

def check(self, task_id: str = None) -> str:
"""Check status of one task or list all."""
if task_id:
t = self.tasks.get(task_id)
if not t:
return f"Error: Unknown task {task_id}"
return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
lines = []
for tid, t in self.tasks.items():
lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
return "\n".join(lines) if lines else "No background tasks."

def drain_notifications(self) -> list:
"""Return and clear all pending completion notifications."""
with self._lock:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs


BG = BackgroundManager()


# -- Tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path

def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"

def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"

def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"background_run": lambda **kw: BG.run(kw["command"]),
"check_background": lambda **kw: BG.check(kw.get("task_id")),
}

TOOLS = [
{"name": "bash", "description": "Run a shell command (blocking).",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "background_run", "description": "Run command in background thread. Returns task_id immediately.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "check_background", "description": "Check background task status. Omit task_id to list all.",
"input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}},
]


def agent_loop(messages: list):
while True:
# Drain background notifications and inject as system message before LLM call
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms08 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第九章:Agent 团队(s09) - 源代码

文件: s09_agent_teams.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
#!/usr/bin/env python3
# Harness: team mailboxes -- multiple models, coordinated through files.
"""
s09_agent_teams.py - Agent Teams

Persistent named agents with file-based JSONL inboxes. Each teammate runs
its own agent loop in a separate thread. Communication via append-only inboxes.

Subagent (s04): spawn -> execute -> return summary -> destroyed
Teammate (s09): spawn -> work -> idle -> work -> ... -> shutdown

.team/config.json .team/inbox/
+----------------------------+ +------------------+
| {"team_name": "default", | | alice.jsonl |
| "members": [ | | bob.jsonl |
| {"name":"alice", | | lead.jsonl |
| "role":"coder", | +------------------+
| "status":"idle"} |
| ]} | send_message("alice", "fix bug"):
+----------------------------+ open("alice.jsonl", "a").write(msg)

read_inbox("alice"):
spawn_teammate("alice","coder",...) msgs = [json.loads(l) for l in ...]
| open("alice.jsonl", "w").close()
v return msgs # drain
Thread: alice Thread: bob
+------------------+ +------------------+
| agent_loop | | agent_loop |
| status: working | | status: idle |
| ... runs tools | | ... waits ... |
| status -> idle | | |
+------------------+ +------------------+

5 message types (all declared, not all handled here):
+-------------------------+-----------------------------------+
| message | Normal text message |
| broadcast | Sent to all teammates |
| shutdown_request | Request graceful shutdown (s10) |
| shutdown_response | Approve/reject shutdown (s10) |
| plan_approval_response | Approve/reject plan (s10) |
+-------------------------+-----------------------------------+

Key insight: "Teammates that can talk to each other."
"""

import json
import os
import subprocess
import threading
import time
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TEAM_DIR = WORKDIR / ".team"
INBOX_DIR = TEAM_DIR / "inbox"

SYSTEM = f"You are a team lead at {WORKDIR}. Spawn teammates and communicate via inboxes."

VALID_MSG_TYPES = {
"message",
"broadcast",
"shutdown_request",
"shutdown_response",
"plan_approval_response",
}


# -- MessageBus: JSONL inbox per teammate --
class MessageBus:
def __init__(self, inbox_dir: Path):
self.dir = inbox_dir
self.dir.mkdir(parents=True, exist_ok=True)

def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
if msg_type not in VALID_MSG_TYPES:
return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"

def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("")
return messages

def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"


BUS = MessageBus(INBOX_DIR)


# -- TeammateManager: persistent named agents with config.json --
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads = {}

def _load_config(self) -> dict:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
return {"team_name": "default", "members": []}

def _save_config(self):
self.config_path.write_text(json.dumps(self.config, indent=2))

def _find_member(self, name: str) -> dict:
for m in self.config["members"]:
if m["name"] == name:
return m
return None

def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
member["role"] = role
else:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"

def _teammate_loop(self, name: str, role: str, prompt: str):
sys_prompt = (
f"You are '{name}', role: {role}, at {WORKDIR}. "
f"Use send_message to communicate. Complete your task."
)
messages = [{"role": "user", "content": prompt}]
tools = self._teammate_tools()
for _ in range(50):
inbox = BUS.read_inbox(name)
for msg in inbox:
messages.append({"role": "user", "content": json.dumps(msg)})
try:
response = client.messages.create(
model=MODEL,
system=sys_prompt,
messages=messages,
tools=tools,
max_tokens=8000,
)
except Exception:
break
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
output = self._exec(name, block.name, block.input)
print(f" [{name}] {block.name}: {str(output)[:120]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})
member = self._find_member(name)
if member and member["status"] != "shutdown":
member["status"] = "idle"
self._save_config()

def _exec(self, sender: str, tool_name: str, args: dict) -> str:
# these base tools are unchanged from s02
if tool_name == "bash":
return _run_bash(args["command"])
if tool_name == "read_file":
return _run_read(args["path"])
if tool_name == "write_file":
return _run_write(args["path"], args["content"])
if tool_name == "edit_file":
return _run_edit(args["path"], args["old_text"], args["new_text"])
if tool_name == "send_message":
return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
if tool_name == "read_inbox":
return json.dumps(BUS.read_inbox(sender), indent=2)
return f"Unknown tool: {tool_name}"

def _teammate_tools(self) -> list:
# these base tools are unchanged from s02
return [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "send_message", "description": "Send message to a teammate.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain your inbox.",
"input_schema": {"type": "object", "properties": {}}},
]

def list_all(self) -> str:
if not self.config["members"]:
return "No teammates."
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
return "\n".join(lines)

def member_names(self) -> list:
return [m["name"] for m in self.config["members"]]


TEAM = TeammateManager(TEAM_DIR)


# -- Base tool implementations (these base tools are unchanged from s02) --
def _safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path


def _run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"


def _run_read(path: str, limit: int = None) -> str:
try:
lines = _safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"


def _run_write(path: str, content: str) -> str:
try:
fp = _safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"


def _run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = _safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


# -- Lead tool dispatch (9 tools) --
TOOL_HANDLERS = {
"bash": lambda **kw: _run_bash(kw["command"]),
"read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: _run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
"list_teammates": lambda **kw: TEAM.list_all(),
"send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
"read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
"broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
}

# these base tools are unchanged from s02
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "spawn_teammate", "description": "Spawn a persistent teammate that runs in its own thread.",
"input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}},
{"name": "list_teammates", "description": "List all teammates with name, role, status.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "send_message", "description": "Send a message to a teammate's inbox.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain the lead's inbox.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "broadcast", "description": "Send a message to all teammates.",
"input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}},
]


def agent_loop(messages: list):
while True:
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({
"role": "user",
"content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
})
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms09 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
if query.strip() == "/team":
print(TEAM.list_all())
continue
if query.strip() == "/inbox":
print(json.dumps(BUS.read_inbox("lead"), indent=2))
continue
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第十章:团队协议(s10) - 源代码

文件: s10_team_protocols.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
#!/usr/bin/env python3
# Harness: protocols -- structured handshakes between models.
"""
s10_team_protocols.py - Team Protocols

Shutdown protocol and plan approval protocol, both using the same
request_id correlation pattern. Builds on s09's team messaging.

Shutdown FSM: pending -> approved | rejected

Lead Teammate
+---------------------+ +---------------------+
| shutdown_request | | |
| { | -------> | receives request |
| request_id: abc | | decides: approve? |
| } | | |
+---------------------+ +---------------------+
|
+---------------------+ +-------v-------------+
| shutdown_response | <------- | shutdown_response |
| { | | { |
| request_id: abc | | request_id: abc |
| approve: true | | approve: true |
| } | | } |
+---------------------+ +---------------------+
|
v
status -> "shutdown", thread stops

Plan approval FSM: pending -> approved | rejected

Teammate Lead
+---------------------+ +---------------------+
| plan_approval | | |
| submit: {plan:"..."}| -------> | reviews plan text |
+---------------------+ | approve/reject? |
+---------------------+
|
+---------------------+ +-------v-------------+
| plan_approval_resp | <------- | plan_approval |
| {approve: true} | | review: {req_id, |
+---------------------+ | approve: true} |
+---------------------+

Trackers: {request_id: {"target|from": name, "status": "pending|..."}}

Key insight: "Same request_id correlation pattern, two domains."
"""

import json
import os
import subprocess
import threading
import time
import uuid
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TEAM_DIR = WORKDIR / ".team"
INBOX_DIR = TEAM_DIR / "inbox"

SYSTEM = f"You are a team lead at {WORKDIR}. Manage teammates with shutdown and plan approval protocols."

VALID_MSG_TYPES = {
"message",
"broadcast",
"shutdown_request",
"shutdown_response",
"plan_approval_response",
}

# -- Request trackers: correlate by request_id --
shutdown_requests = {}
plan_requests = {}
_tracker_lock = threading.Lock()


# -- MessageBus: JSONL inbox per teammate --
class MessageBus:
def __init__(self, inbox_dir: Path):
self.dir = inbox_dir
self.dir.mkdir(parents=True, exist_ok=True)

def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
if msg_type not in VALID_MSG_TYPES:
return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"

def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("")
return messages

def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"


BUS = MessageBus(INBOX_DIR)


# -- TeammateManager with shutdown + plan approval --
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads = {}

def _load_config(self) -> dict:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
return {"team_name": "default", "members": []}

def _save_config(self):
self.config_path.write_text(json.dumps(self.config, indent=2))

def _find_member(self, name: str) -> dict:
for m in self.config["members"]:
if m["name"] == name:
return m
return None

def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
member["role"] = role
else:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"

def _teammate_loop(self, name: str, role: str, prompt: str):
sys_prompt = (
f"You are '{name}', role: {role}, at {WORKDIR}. "
f"Submit plans via plan_approval before major work. "
f"Respond to shutdown_request with shutdown_response."
)
messages = [{"role": "user", "content": prompt}]
tools = self._teammate_tools()
should_exit = False
for _ in range(50):
inbox = BUS.read_inbox(name)
for msg in inbox:
messages.append({"role": "user", "content": json.dumps(msg)})
if should_exit:
break
try:
response = client.messages.create(
model=MODEL,
system=sys_prompt,
messages=messages,
tools=tools,
max_tokens=8000,
)
except Exception:
break
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
output = self._exec(name, block.name, block.input)
print(f" [{name}] {block.name}: {str(output)[:120]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
if block.name == "shutdown_response" and block.input.get("approve"):
should_exit = True
messages.append({"role": "user", "content": results})
member = self._find_member(name)
if member:
member["status"] = "shutdown" if should_exit else "idle"
self._save_config()

def _exec(self, sender: str, tool_name: str, args: dict) -> str:
# these base tools are unchanged from s02
if tool_name == "bash":
return _run_bash(args["command"])
if tool_name == "read_file":
return _run_read(args["path"])
if tool_name == "write_file":
return _run_write(args["path"], args["content"])
if tool_name == "edit_file":
return _run_edit(args["path"], args["old_text"], args["new_text"])
if tool_name == "send_message":
return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
if tool_name == "read_inbox":
return json.dumps(BUS.read_inbox(sender), indent=2)
if tool_name == "shutdown_response":
req_id = args["request_id"]
approve = args["approve"]
with _tracker_lock:
if req_id in shutdown_requests:
shutdown_requests[req_id]["status"] = "approved" if approve else "rejected"
BUS.send(
sender, "lead", args.get("reason", ""),
"shutdown_response", {"request_id": req_id, "approve": approve},
)
return f"Shutdown {'approved' if approve else 'rejected'}"
if tool_name == "plan_approval":
plan_text = args.get("plan", "")
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
plan_requests[req_id] = {"from": sender, "plan": plan_text, "status": "pending"}
BUS.send(
sender, "lead", plan_text, "plan_approval_response",
{"request_id": req_id, "plan": plan_text},
)
return f"Plan submitted (request_id={req_id}). Waiting for lead approval."
return f"Unknown tool: {tool_name}"

def _teammate_tools(self) -> list:
# these base tools are unchanged from s02
return [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "send_message", "description": "Send message to a teammate.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain your inbox.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "shutdown_response", "description": "Respond to a shutdown request. Approve to shut down, reject to keep working.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "reason": {"type": "string"}}, "required": ["request_id", "approve"]}},
{"name": "plan_approval", "description": "Submit a plan for lead approval. Provide plan text.",
"input_schema": {"type": "object", "properties": {"plan": {"type": "string"}}, "required": ["plan"]}},
]

def list_all(self) -> str:
if not self.config["members"]:
return "No teammates."
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
return "\n".join(lines)

def member_names(self) -> list:
return [m["name"] for m in self.config["members"]]


TEAM = TeammateManager(TEAM_DIR)


# -- Base tool implementations (these base tools are unchanged from s02) --
def _safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path


def _run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"


def _run_read(path: str, limit: int = None) -> str:
try:
lines = _safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"


def _run_write(path: str, content: str) -> str:
try:
fp = _safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"


def _run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = _safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


# -- Lead-specific protocol handlers --
def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
BUS.send(
"lead", teammate, "Please shut down gracefully.",
"shutdown_request", {"request_id": req_id},
)
return f"Shutdown request {req_id} sent to '{teammate}' (status: pending)"


def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
with _tracker_lock:
req = plan_requests.get(request_id)
if not req:
return f"Error: Unknown plan request_id '{request_id}'"
with _tracker_lock:
req["status"] = "approved" if approve else "rejected"
BUS.send(
"lead", req["from"], feedback, "plan_approval_response",
{"request_id": request_id, "approve": approve, "feedback": feedback},
)
return f"Plan {req['status']} for '{req['from']}'"


def _check_shutdown_status(request_id: str) -> str:
with _tracker_lock:
return json.dumps(shutdown_requests.get(request_id, {"error": "not found"}))


# -- Lead tool dispatch (12 tools) --
TOOL_HANDLERS = {
"bash": lambda **kw: _run_bash(kw["command"]),
"read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: _run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
"list_teammates": lambda **kw: TEAM.list_all(),
"send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
"read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
"broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
"shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]),
"shutdown_response": lambda **kw: _check_shutdown_status(kw.get("request_id", "")),
"plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")),
}

# these base tools are unchanged from s02
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "spawn_teammate", "description": "Spawn a persistent teammate.",
"input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}},
{"name": "list_teammates", "description": "List all teammates.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "send_message", "description": "Send a message to a teammate.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain the lead's inbox.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "broadcast", "description": "Send a message to all teammates.",
"input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}},
{"name": "shutdown_request", "description": "Request a teammate to shut down gracefully. Returns a request_id for tracking.",
"input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}},
{"name": "shutdown_response", "description": "Check the status of a shutdown request by request_id.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}}, "required": ["request_id"]}},
{"name": "plan_approval", "description": "Approve or reject a teammate's plan. Provide request_id + approve + optional feedback.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}},
]


def agent_loop(messages: list):
while True:
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({
"role": "user",
"content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
})
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms10 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
if query.strip() == "/team":
print(TEAM.list_all())
continue
if query.strip() == "/inbox":
print(json.dumps(BUS.read_inbox("lead"), indent=2))
continue
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第十一章:自主 Agent(s11) - 源代码

文件: s11_autonomous_agents.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
#!/usr/bin/env python3
# Harness: autonomy -- models that find work without being told.
"""
s11_autonomous_agents.py - Autonomous Agents

Idle cycle with task board polling, auto-claiming unclaimed tasks, and
identity re-injection after context compression. Builds on s10's protocols.

Teammate lifecycle:
+-------+
| spawn |
+---+---+
|
v
+-------+ tool_use +-------+
| WORK | <----------- | LLM |
+---+---+ +-------+
|
| stop_reason != tool_use
v
+--------+
| IDLE | poll every 5s for up to 60s
+---+----+
|
+---> check inbox -> message? -> resume WORK
|
+---> scan .tasks/ -> unclaimed? -> claim -> resume WORK
|
+---> timeout (60s) -> shutdown

Identity re-injection after compression:
messages = [identity_block, ...remaining...]
"You are 'coder', role: backend, team: my-team"

Key insight: "The agent finds work itself."
"""

import json
import os
import subprocess
import threading
import time
import uuid
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TEAM_DIR = WORKDIR / ".team"
INBOX_DIR = TEAM_DIR / "inbox"
TASKS_DIR = WORKDIR / ".tasks"

POLL_INTERVAL = 5
IDLE_TIMEOUT = 60

SYSTEM = f"You are a team lead at {WORKDIR}. Teammates are autonomous -- they find work themselves."

VALID_MSG_TYPES = {
"message",
"broadcast",
"shutdown_request",
"shutdown_response",
"plan_approval_response",
}

# -- Request trackers --
shutdown_requests = {}
plan_requests = {}
_tracker_lock = threading.Lock()
_claim_lock = threading.Lock()


# -- MessageBus: JSONL inbox per teammate --
class MessageBus:
def __init__(self, inbox_dir: Path):
self.dir = inbox_dir
self.dir.mkdir(parents=True, exist_ok=True)

def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
if msg_type not in VALID_MSG_TYPES:
return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"

def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("")
return messages

def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"


BUS = MessageBus(INBOX_DIR)


# -- Task board scanning --
def scan_unclaimed_tasks() -> list:
TASKS_DIR.mkdir(exist_ok=True)
unclaimed = []
for f in sorted(TASKS_DIR.glob("task_*.json")):
task = json.loads(f.read_text())
if (task.get("status") == "pending"
and not task.get("owner")
and not task.get("blockedBy")):
unclaimed.append(task)
return unclaimed


def claim_task(task_id: int, owner: str) -> str:
with _claim_lock:
path = TASKS_DIR / f"task_{task_id}.json"
if not path.exists():
return f"Error: Task {task_id} not found"
task = json.loads(path.read_text())
if task.get("owner"):
existing_owner = task.get("owner") or "someone else"
return f"Error: Task {task_id} has already been claimed by {existing_owner}"
if task.get("status") != "pending":
status = task.get("status")
return f"Error: Task {task_id} cannot be claimed because its status is '{status}'"
if task.get("blockedBy"):
return f"Error: Task {task_id} is blocked by other task(s) and cannot be claimed yet"
task["owner"] = owner
task["status"] = "in_progress"
path.write_text(json.dumps(task, indent=2))
return f"Claimed task #{task_id} for {owner}"


# -- Identity re-injection after compression --
def make_identity_block(name: str, role: str, team_name: str) -> dict:
return {
"role": "user",
"content": f"<identity>You are '{name}', role: {role}, team: {team_name}. Continue your work.</identity>",
}


# -- Autonomous TeammateManager --
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads = {}

def _load_config(self) -> dict:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
return {"team_name": "default", "members": []}

def _save_config(self):
self.config_path.write_text(json.dumps(self.config, indent=2))

def _find_member(self, name: str) -> dict:
for m in self.config["members"]:
if m["name"] == name:
return m
return None

def _set_status(self, name: str, status: str):
member = self._find_member(name)
if member:
member["status"] = status
self._save_config()

def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
member["role"] = role
else:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"

def _loop(self, name: str, role: str, prompt: str):
team_name = self.config["team_name"]
sys_prompt = (
f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. "
f"Use idle tool when you have no more work. You will auto-claim new tasks."
)
messages = [{"role": "user", "content": prompt}]
tools = self._teammate_tools()

while True:
# -- WORK PHASE: standard agent loop --
for _ in range(50):
inbox = BUS.read_inbox(name)
for msg in inbox:
if msg.get("type") == "shutdown_request":
self._set_status(name, "shutdown")
return
messages.append({"role": "user", "content": json.dumps(msg)})
try:
response = client.messages.create(
model=MODEL,
system=sys_prompt,
messages=messages,
tools=tools,
max_tokens=8000,
)
except Exception:
self._set_status(name, "idle")
return
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
idle_requested = False
for block in response.content:
if block.type == "tool_use":
if block.name == "idle":
idle_requested = True
output = "Entering idle phase. Will poll for new tasks."
else:
output = self._exec(name, block.name, block.input)
print(f" [{name}] {block.name}: {str(output)[:120]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})
if idle_requested:
break

# -- IDLE PHASE: poll for inbox messages and unclaimed tasks --
self._set_status(name, "idle")
resume = False
polls = IDLE_TIMEOUT // max(POLL_INTERVAL, 1)
for _ in range(polls):
time.sleep(POLL_INTERVAL)
inbox = BUS.read_inbox(name)
if inbox:
for msg in inbox:
if msg.get("type") == "shutdown_request":
self._set_status(name, "shutdown")
return
messages.append({"role": "user", "content": json.dumps(msg)})
resume = True
break
unclaimed = scan_unclaimed_tasks()
if unclaimed:
task = unclaimed[0]
result = claim_task(task["id"], name)
if result.startswith("Error:"):
continue
task_prompt = (
f"<auto-claimed>Task #{task['id']}: {task['subject']}\n"
f"{task.get('description', '')}</auto-claimed>"
)
if len(messages) <= 3:
messages.insert(0, make_identity_block(name, role, team_name))
messages.insert(1, {"role": "assistant", "content": f"I am {name}. Continuing."})
messages.append({"role": "user", "content": task_prompt})
messages.append({"role": "assistant", "content": f"Claimed task #{task['id']}. Working on it."})
resume = True
break

if not resume:
self._set_status(name, "shutdown")
return
self._set_status(name, "working")

def _exec(self, sender: str, tool_name: str, args: dict) -> str:
# these base tools are unchanged from s02
if tool_name == "bash":
return _run_bash(args["command"])
if tool_name == "read_file":
return _run_read(args["path"])
if tool_name == "write_file":
return _run_write(args["path"], args["content"])
if tool_name == "edit_file":
return _run_edit(args["path"], args["old_text"], args["new_text"])
if tool_name == "send_message":
return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
if tool_name == "read_inbox":
return json.dumps(BUS.read_inbox(sender), indent=2)
if tool_name == "shutdown_response":
req_id = args["request_id"]
with _tracker_lock:
if req_id in shutdown_requests:
shutdown_requests[req_id]["status"] = "approved" if args["approve"] else "rejected"
BUS.send(
sender, "lead", args.get("reason", ""),
"shutdown_response", {"request_id": req_id, "approve": args["approve"]},
)
return f"Shutdown {'approved' if args['approve'] else 'rejected'}"
if tool_name == "plan_approval":
plan_text = args.get("plan", "")
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
plan_requests[req_id] = {"from": sender, "plan": plan_text, "status": "pending"}
BUS.send(
sender, "lead", plan_text, "plan_approval_response",
{"request_id": req_id, "plan": plan_text},
)
return f"Plan submitted (request_id={req_id}). Waiting for approval."
if tool_name == "claim_task":
return claim_task(args["task_id"], sender)
return f"Unknown tool: {tool_name}"

def _teammate_tools(self) -> list:
# these base tools are unchanged from s02
return [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "send_message", "description": "Send message to a teammate.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain your inbox.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "shutdown_response", "description": "Respond to a shutdown request.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "reason": {"type": "string"}}, "required": ["request_id", "approve"]}},
{"name": "plan_approval", "description": "Submit a plan for lead approval.",
"input_schema": {"type": "object", "properties": {"plan": {"type": "string"}}, "required": ["plan"]}},
{"name": "idle", "description": "Signal that you have no more work. Enters idle polling phase.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "claim_task", "description": "Claim a task from the task board by ID.",
"input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
]

def list_all(self) -> str:
if not self.config["members"]:
return "No teammates."
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
return "\n".join(lines)

def member_names(self) -> list:
return [m["name"] for m in self.config["members"]]


TEAM = TeammateManager(TEAM_DIR)


# -- Base tool implementations (these base tools are unchanged from s02) --
def _safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path


def _run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"


def _run_read(path: str, limit: int = None) -> str:
try:
lines = _safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"


def _run_write(path: str, content: str) -> str:
try:
fp = _safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"


def _run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = _safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


# -- Lead-specific protocol handlers --
def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
BUS.send(
"lead", teammate, "Please shut down gracefully.",
"shutdown_request", {"request_id": req_id},
)
return f"Shutdown request {req_id} sent to '{teammate}'"


def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
with _tracker_lock:
req = plan_requests.get(request_id)
if not req:
return f"Error: Unknown plan request_id '{request_id}'"
with _tracker_lock:
req["status"] = "approved" if approve else "rejected"
BUS.send(
"lead", req["from"], feedback, "plan_approval_response",
{"request_id": request_id, "approve": approve, "feedback": feedback},
)
return f"Plan {req['status']} for '{req['from']}'"


def _check_shutdown_status(request_id: str) -> str:
with _tracker_lock:
return json.dumps(shutdown_requests.get(request_id, {"error": "not found"}))


# -- Lead tool dispatch (14 tools) --
TOOL_HANDLERS = {
"bash": lambda **kw: _run_bash(kw["command"]),
"read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: _run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
"list_teammates": lambda **kw: TEAM.list_all(),
"send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
"read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
"broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
"shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]),
"shutdown_response": lambda **kw: _check_shutdown_status(kw.get("request_id", "")),
"plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")),
"idle": lambda **kw: "Lead does not idle.",
"claim_task": lambda **kw: claim_task(kw["task_id"], "lead"),
}

# these base tools are unchanged from s02
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "spawn_teammate", "description": "Spawn an autonomous teammate.",
"input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}},
{"name": "list_teammates", "description": "List all teammates.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "send_message", "description": "Send a message to a teammate.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain the lead's inbox.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "broadcast", "description": "Send a message to all teammates.",
"input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}},
{"name": "shutdown_request", "description": "Request a teammate to shut down.",
"input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}},
{"name": "shutdown_response", "description": "Check shutdown request status.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}}, "required": ["request_id"]}},
{"name": "plan_approval", "description": "Approve or reject a teammate's plan.",
"input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}},
{"name": "idle", "description": "Enter idle state (for lead -- rarely used).",
"input_schema": {"type": "object", "properties": {}}},
{"name": "claim_task", "description": "Claim a task from the board by ID.",
"input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
]


def agent_loop(messages: list):
while True:
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({
"role": "user",
"content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
})
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms11 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
if query.strip() == "/team":
print(TEAM.list_all())
continue
if query.strip() == "/inbox":
print(json.dumps(BUS.read_inbox("lead"), indent=2))
continue
if query.strip() == "/tasks":
TASKS_DIR.mkdir(exist_ok=True)
for f in sorted(TASKS_DIR.glob("task_*.json")):
t = json.loads(f.read_text())
marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
owner = f" @{t['owner']}" if t.get("owner") else ""
print(f" {marker} #{t['id']}: {t['subject']}{owner}")
continue
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()

第十二章:Worktree + 任务隔离(s12) - 源代码

文件: s12_worktree_task_isolation.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
#!/usr/bin/env python3
# Harness: directory isolation -- parallel execution lanes that never collide.
"""
s12_worktree_task_isolation.py - Worktree + Task Isolation

Directory-level isolation for parallel task execution.
Tasks are the control plane and worktrees are the execution plane.

.tasks/task_12.json
{
"id": 12,
"subject": "Implement auth refactor",
"status": "in_progress",
"worktree": "auth-refactor"
}

.worktrees/index.json
{
"worktrees": [
{
"name": "auth-refactor",
"path": ".../.worktrees/auth-refactor",
"branch": "wt/auth-refactor",
"task_id": 12,
"status": "active"
}
]
}

Key insight: "Isolate by directory, coordinate by task ID."
"""

import json
import os
import re
import subprocess
import time
from pathlib import Path

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]


def detect_repo_root(cwd: Path) -> Path | None:
"""Return git repo root if cwd is inside a repo, else None."""
try:
r = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
cwd=cwd,
capture_output=True,
text=True,
timeout=10,
)
if r.returncode != 0:
return None
root = Path(r.stdout.strip())
return root if root.exists() else None
except Exception:
return None


REPO_ROOT = detect_repo_root(WORKDIR) or WORKDIR

SYSTEM = (
f"You are a coding agent at {WORKDIR}. "
"Use task + worktree tools for multi-task work. "
"For parallel or risky changes: create tasks, allocate worktree lanes, "
"run commands in those lanes, then choose keep/remove for closeout. "
"Use worktree_events when you need lifecycle visibility."
)


# -- EventBus: append-only lifecycle events for observability --
class EventBus:
def __init__(self, event_log_path: Path):
self.path = event_log_path
self.path.parent.mkdir(parents=True, exist_ok=True)
if not self.path.exists():
self.path.write_text("")

def emit(
self,
event: str,
task: dict | None = None,
worktree: dict | None = None,
error: str | None = None,
):
payload = {
"event": event,
"ts": time.time(),
"task": task or {},
"worktree": worktree or {},
}
if error:
payload["error"] = error
with self.path.open("a", encoding="utf-8") as f:
f.write(json.dumps(payload) + "\n")

def list_recent(self, limit: int = 20) -> str:
n = max(1, min(int(limit or 20), 200))
lines = self.path.read_text(encoding="utf-8").splitlines()
recent = lines[-n:]
items = []
for line in recent:
try:
items.append(json.loads(line))
except Exception:
items.append({"event": "parse_error", "raw": line})
return json.dumps(items, indent=2)


# -- TaskManager: persistent task board with optional worktree binding --
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(parents=True, exist_ok=True)
self._next_id = self._max_id() + 1

def _max_id(self) -> int:
ids = []
for f in self.dir.glob("task_*.json"):
try:
ids.append(int(f.stem.split("_")[1]))
except Exception:
pass
return max(ids) if ids else 0

def _path(self, task_id: int) -> Path:
return self.dir / f"task_{task_id}.json"

def _load(self, task_id: int) -> dict:
path = self._path(task_id)
if not path.exists():
raise ValueError(f"Task {task_id} not found")
return json.loads(path.read_text())

def _save(self, task: dict):
self._path(task["id"]).write_text(json.dumps(task, indent=2))

def create(self, subject: str, description: str = "") -> str:
task = {
"id": self._next_id,
"subject": subject,
"description": description,
"status": "pending",
"owner": "",
"worktree": "",
"blockedBy": [],
"created_at": time.time(),
"updated_at": time.time(),
}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)

def get(self, task_id: int) -> str:
return json.dumps(self._load(task_id), indent=2)

def exists(self, task_id: int) -> bool:
return self._path(task_id).exists()

def update(self, task_id: int, status: str = None, owner: str = None) -> str:
task = self._load(task_id)
if status:
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Invalid status: {status}")
task["status"] = status
if owner is not None:
task["owner"] = owner
task["updated_at"] = time.time()
self._save(task)
return json.dumps(task, indent=2)

def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str:
task = self._load(task_id)
task["worktree"] = worktree
if owner:
task["owner"] = owner
if task["status"] == "pending":
task["status"] = "in_progress"
task["updated_at"] = time.time()
self._save(task)
return json.dumps(task, indent=2)

def unbind_worktree(self, task_id: int) -> str:
task = self._load(task_id)
task["worktree"] = ""
task["updated_at"] = time.time()
self._save(task)
return json.dumps(task, indent=2)

def list_all(self) -> str:
tasks = []
for f in sorted(self.dir.glob("task_*.json")):
tasks.append(json.loads(f.read_text()))
if not tasks:
return "No tasks."
lines = []
for t in tasks:
marker = {
"pending": "[ ]",
"in_progress": "[>]",
"completed": "[x]",
}.get(t["status"], "[?]")
owner = f" owner={t['owner']}" if t.get("owner") else ""
wt = f" wt={t['worktree']}" if t.get("worktree") else ""
lines.append(f"{marker} #{t['id']}: {t['subject']}{owner}{wt}")
return "\n".join(lines)


TASKS = TaskManager(REPO_ROOT / ".tasks")
EVENTS = EventBus(REPO_ROOT / ".worktrees" / "events.jsonl")


# -- WorktreeManager: create/list/run/remove git worktrees + lifecycle index --
class WorktreeManager:
def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus):
self.repo_root = repo_root
self.tasks = tasks
self.events = events
self.dir = repo_root / ".worktrees"
self.dir.mkdir(parents=True, exist_ok=True)
self.index_path = self.dir / "index.json"
if not self.index_path.exists():
self.index_path.write_text(json.dumps({"worktrees": []}, indent=2))
self.git_available = self._is_git_repo()

def _is_git_repo(self) -> bool:
try:
r = subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=self.repo_root,
capture_output=True,
text=True,
timeout=10,
)
return r.returncode == 0
except Exception:
return False

def _run_git(self, args: list[str]) -> str:
if not self.git_available:
raise RuntimeError("Not in a git repository. worktree tools require git.")
r = subprocess.run(
["git", *args],
cwd=self.repo_root,
capture_output=True,
text=True,
timeout=120,
)
if r.returncode != 0:
msg = (r.stdout + r.stderr).strip()
raise RuntimeError(msg or f"git {' '.join(args)} failed")
return (r.stdout + r.stderr).strip() or "(no output)"

def _load_index(self) -> dict:
return json.loads(self.index_path.read_text())

def _save_index(self, data: dict):
self.index_path.write_text(json.dumps(data, indent=2))

def _find(self, name: str) -> dict | None:
idx = self._load_index()
for wt in idx.get("worktrees", []):
if wt.get("name") == name:
return wt
return None

def _validate_name(self, name: str):
if not re.fullmatch(r"[A-Za-z0-9._-]{1,40}", name or ""):
raise ValueError(
"Invalid worktree name. Use 1-40 chars: letters, numbers, ., _, -"
)

def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str:
self._validate_name(name)
if self._find(name):
raise ValueError(f"Worktree '{name}' already exists in index")
if task_id is not None and not self.tasks.exists(task_id):
raise ValueError(f"Task {task_id} not found")

path = self.dir / name
branch = f"wt/{name}"
self.events.emit(
"worktree.create.before",
task={"id": task_id} if task_id is not None else {},
worktree={"name": name, "base_ref": base_ref},
)
try:
self._run_git(["worktree", "add", "-b", branch, str(path), base_ref])

entry = {
"name": name,
"path": str(path),
"branch": branch,
"task_id": task_id,
"status": "active",
"created_at": time.time(),
}

idx = self._load_index()
idx["worktrees"].append(entry)
self._save_index(idx)

if task_id is not None:
self.tasks.bind_worktree(task_id, name)

self.events.emit(
"worktree.create.after",
task={"id": task_id} if task_id is not None else {},
worktree={
"name": name,
"path": str(path),
"branch": branch,
"status": "active",
},
)
return json.dumps(entry, indent=2)
except Exception as e:
self.events.emit(
"worktree.create.failed",
task={"id": task_id} if task_id is not None else {},
worktree={"name": name, "base_ref": base_ref},
error=str(e),
)
raise

def list_all(self) -> str:
idx = self._load_index()
wts = idx.get("worktrees", [])
if not wts:
return "No worktrees in index."
lines = []
for wt in wts:
suffix = f" task={wt['task_id']}" if wt.get("task_id") else ""
lines.append(
f"[{wt.get('status', 'unknown')}] {wt['name']} -> "
f"{wt['path']} ({wt.get('branch', '-')}){suffix}"
)
return "\n".join(lines)

def status(self, name: str) -> str:
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
path = Path(wt["path"])
if not path.exists():
return f"Error: Worktree path missing: {path}"
r = subprocess.run(
["git", "status", "--short", "--branch"],
cwd=path,
capture_output=True,
text=True,
timeout=60,
)
text = (r.stdout + r.stderr).strip()
return text or "Clean worktree"

def run(self, name: str, command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"

wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"
path = Path(wt["path"])
if not path.exists():
return f"Error: Worktree path missing: {path}"

try:
r = subprocess.run(
command,
shell=True,
cwd=path,
capture_output=True,
text=True,
timeout=300,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (300s)"

def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str:
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"

self.events.emit(
"worktree.remove.before",
task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {},
worktree={"name": name, "path": wt.get("path")},
)
try:
args = ["worktree", "remove"]
if force:
args.append("--force")
args.append(wt["path"])
self._run_git(args)

if complete_task and wt.get("task_id") is not None:
task_id = wt["task_id"]
before = json.loads(self.tasks.get(task_id))
self.tasks.update(task_id, status="completed")
self.tasks.unbind_worktree(task_id)
self.events.emit(
"task.completed",
task={
"id": task_id,
"subject": before.get("subject", ""),
"status": "completed",
},
worktree={"name": name},
)

idx = self._load_index()
for item in idx.get("worktrees", []):
if item.get("name") == name:
item["status"] = "removed"
item["removed_at"] = time.time()
self._save_index(idx)

self.events.emit(
"worktree.remove.after",
task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {},
worktree={"name": name, "path": wt.get("path"), "status": "removed"},
)
return f"Removed worktree '{name}'"
except Exception as e:
self.events.emit(
"worktree.remove.failed",
task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {},
worktree={"name": name, "path": wt.get("path")},
error=str(e),
)
raise

def keep(self, name: str) -> str:
wt = self._find(name)
if not wt:
return f"Error: Unknown worktree '{name}'"

idx = self._load_index()
kept = None
for item in idx.get("worktrees", []):
if item.get("name") == name:
item["status"] = "kept"
item["kept_at"] = time.time()
kept = item
self._save_index(idx)

self.events.emit(
"worktree.keep",
task={"id": wt.get("task_id")} if wt.get("task_id") is not None else {},
worktree={
"name": name,
"path": wt.get("path"),
"status": "kept",
},
)
return json.dumps(kept, indent=2) if kept else f"Error: Unknown worktree '{name}'"


WORKTREES = WorktreeManager(REPO_ROOT, TASKS, EVENTS)


# -- Base tools (kept minimal, same style as previous sessions) --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path


def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command,
shell=True,
cwd=WORKDIR,
capture_output=True,
text=True,
timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"


def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"


def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"


def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"


TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("owner")),
"task_bind_worktree": lambda **kw: TASKS.bind_worktree(kw["task_id"], kw["worktree"], kw.get("owner", "")),
"worktree_create": lambda **kw: WORKTREES.create(kw["name"], kw.get("task_id"), kw.get("base_ref", "HEAD")),
"worktree_list": lambda **kw: WORKTREES.list_all(),
"worktree_status": lambda **kw: WORKTREES.status(kw["name"]),
"worktree_run": lambda **kw: WORKTREES.run(kw["name"], kw["command"]),
"worktree_keep": lambda **kw: WORKTREES.keep(kw["name"]),
"worktree_remove": lambda **kw: WORKTREES.remove(kw["name"], kw.get("force", False), kw.get("complete_task", False)),
"worktree_events": lambda **kw: EVENTS.list_recent(kw.get("limit", 20)),
}

TOOLS = [
{
"name": "bash",
"description": "Run a shell command in the current workspace (blocking).",
"input_schema": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"],
},
},
{
"name": "read_file",
"description": "Read file contents.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"limit": {"type": "integer"},
},
"required": ["path"],
},
},
{
"name": "write_file",
"description": "Write content to file.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["path", "content"],
},
},
{
"name": "edit_file",
"description": "Replace exact text in file.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_text": {"type": "string"},
"new_text": {"type": "string"},
},
"required": ["path", "old_text", "new_text"],
},
},
{
"name": "task_create",
"description": "Create a new task on the shared task board.",
"input_schema": {
"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
},
"required": ["subject"],
},
},
{
"name": "task_list",
"description": "List all tasks with status, owner, and worktree binding.",
"input_schema": {"type": "object", "properties": {}},
},
{
"name": "task_get",
"description": "Get task details by ID.",
"input_schema": {
"type": "object",
"properties": {"task_id": {"type": "integer"}},
"required": ["task_id"],
},
},
{
"name": "task_update",
"description": "Update task status or owner.",
"input_schema": {
"type": "object",
"properties": {
"task_id": {"type": "integer"},
"status": {
"type": "string",
"enum": ["pending", "in_progress", "completed"],
},
"owner": {"type": "string"},
},
"required": ["task_id"],
},
},
{
"name": "task_bind_worktree",
"description": "Bind a task to a worktree name.",
"input_schema": {
"type": "object",
"properties": {
"task_id": {"type": "integer"},
"worktree": {"type": "string"},
"owner": {"type": "string"},
},
"required": ["task_id", "worktree"],
},
},
{
"name": "worktree_create",
"description": "Create a git worktree and optionally bind it to a task.",
"input_schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"task_id": {"type": "integer"},
"base_ref": {"type": "string"},
},
"required": ["name"],
},
},
{
"name": "worktree_list",
"description": "List worktrees tracked in .worktrees/index.json.",
"input_schema": {"type": "object", "properties": {}},
},
{
"name": "worktree_status",
"description": "Show git status for one worktree.",
"input_schema": {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
},
},
{
"name": "worktree_run",
"description": "Run a shell command in a named worktree directory.",
"input_schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"command": {"type": "string"},
},
"required": ["name", "command"],
},
},
{
"name": "worktree_remove",
"description": "Remove a worktree and optionally mark its bound task completed.",
"input_schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"force": {"type": "boolean"},
"complete_task": {"type": "boolean"},
},
"required": ["name"],
},
},
{
"name": "worktree_keep",
"description": "Mark a worktree as kept in lifecycle state without removing it.",
"input_schema": {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
},
},
{
"name": "worktree_events",
"description": "List recent worktree/task lifecycle events from .worktrees/events.jsonl.",
"input_schema": {
"type": "object",
"properties": {"limit": {"type": "integer"}},
},
},
]


def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return

results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
}
)
messages.append({"role": "user", "content": results})


if __name__ == "__main__":
print(f"Repo root for s12: {REPO_ROOT}")
if not WORKTREES.git_available:
print("Note: Not in a git repo. worktree_* tools will return errors.")

history = []
while True:
try:
query = input("\033[36ms12 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()