Claude Code / s11
Autonomous Agents:自组织团队
s09-s10 的团队需要 Leader 手动分配任务。这一节移除这个限制:让 Agent 自己扫描任务、自己认领工作、自己记忆经验。
Claude Code 在这个方向上已经走得很远:Coordinator Mode 让主 Agent 自动调度 Worker;Dream Task 在后台整理记忆;Memory Extraction 在每轮对话后自动提取经验。
本文分两条线:
- 源码实证 — 直接读 Claude Code 源码,看真实架构怎么做
- 从零实现 — 用 Python 从头搭建一个自治 Agent 系统
一、Coordinator Mode 源码实证
源文件:
src/coordinator/coordinatorMode.ts
1.1 Feature Flag 双重门控
Coordinator Mode 不是默认开启的。它需要两道门:
// coordinatorMode.ts
export function isCoordinatorMode(): boolean {
if (feature('COORDINATOR_MODE')) {
return isEnvTruthy(process.env.CLAUDE_CODE_COORDINATOR_MODE)
}
return false
}
第一道:编译时 feature flag COORDINATOR_MODE(Bun bundle 阶段决定)。第二道:运行时环境变量 CLAUDE_CODE_COORDINATOR_MODE。两道都通过才开启。
这种 编译时 + 运行时 双重门控在大型 Agent 系统中很常见 — 编译时决定代码是否打包进产物,运行时决定是否激活。
1.2 Session Mode 恢复
恢复一个旧 session 时,当前环境变量可能和 session 创建时不同:
export function matchSessionMode(
sessionMode: 'coordinator' | 'normal' | undefined,
): string | undefined {
const currentIsCoordinator = isCoordinatorMode()
const sessionIsCoordinator = sessionMode === 'coordinator'
if (currentIsCoordinator === sessionIsCoordinator) {
return undefined
}
// 翻转环境变量,让 isCoordinatorMode() 实时匹配
if (sessionIsCoordinator) {
process.env.CLAUDE_CODE_COORDINATOR_MODE = '1'
} else {
delete process.env.CLAUDE_CODE_COORDINATOR_MODE
}
// ...
}
关键设计:session 的模式优先于当前环境变量。恢复 session 时直接改 process.env,因为 isCoordinatorMode() 是实时读取的,没有缓存。
1.3 Coordinator 的 System Prompt
getCoordinatorSystemPrompt() 返回一个完整的系统提示词,定义了 Coordinator 的行为规范:
角色定义:
You are a coordinator. Your job is to help the user achieve their goal, direct workers to research/implement/verify code changes, synthesize results and communicate with the user.
核心工具:
| 工具 | 用途 |
|---|---|
Agent |
创建新 Worker |
SendMessage |
给已有 Worker 发后续指令 |
TaskStop |
终止一个 Worker |
Worker 结果如何回来:
Worker 完成后,结果作为 user-role 消息 回来,包裹在 <task-notification> XML 中:
<task-notification>
<task-id>{agentId}</task-id>
<status>completed|failed|killed</status>
<summary>{human-readable status summary}</summary>
<result>{agent's final text response}</result>
<usage>
<total_tokens>N</total_tokens>
<tool_uses>N</tool_uses>
<duration_ms>N</duration_ms>
</usage>
</task-notification>
最重要的一条规则 — “先理解再委派”:
When workers report research findings, you must understand them before directing follow-up work. Never write “based on your findings” or “based on the research.” You never hand off understanding to another worker.
这是区分好 Coordinator 和差 Coordinator 的分水岭。差的 Coordinator 是传话筒(”根据你的发现去修”);好的 Coordinator 先消化结果,再给出精确指令(”修 src/auth/validate.ts:42 的空指针,Session.expired 为 true 时 user 字段是 undefined”)。
1.4 Worker 的工具上下文注入
export function getCoordinatorUserContext(
mcpClients: ReadonlyArray<{ name: string }>,
scratchpadDir?: string,
): { [k: string]: string } {
if (!isCoordinatorMode()) return {}
const workerTools = isEnvTruthy(process.env.CLAUDE_CODE_SIMPLE)
? [BASH_TOOL_NAME, FILE_READ_TOOL_NAME, FILE_EDIT_TOOL_NAME]
.sort().join(', ')
: Array.from(ASYNC_AGENT_ALLOWED_TOOLS)
.filter(name => !INTERNAL_WORKER_TOOLS.has(name))
.sort().join(', ')
let content = `Workers spawned via the Agent tool have access to these tools: ${workerTools}`
if (scratchpadDir && isScratchpadGateEnabled()) {
content += `\n\nScratchpad directory: ${scratchpadDir}\n`
+ 'Workers can read and write here without permission prompts.'
}
return { workerToolsContext: content }
}
三个关键点:
- SIMPLE 模式:只给 Worker 最基础的 Bash + Read + Edit
- 正常模式:给 Worker 全部异步工具,但过滤掉内部工具(TeamCreate、TeamDelete、SendMessage、SyntheticOutput)— Worker 不能自己创建或管理其他 Worker
- Scratchpad:一个共享目录,Worker 可以无权限读写,用于跨 Worker 传递知识
二、Dream Task 源码实证
源文件:
src/tasks/DreamTask/DreamTask.ts
Dream Task 是 Claude Code 的 后台记忆整理 机制 — 像人睡觉时整理白天记忆一样(所以叫 “Dream”)。
2.1 状态模型
export type DreamPhase = 'starting' | 'updating'
export type DreamTaskState = TaskStateBase & {
type: 'dream'
phase: DreamPhase
sessionsReviewing: number // 正在回顾多少个 session
filesTouched: string[] // 被修改的文件路径(不完整)
turns: DreamTurn[] // 最近 30 轮的摘要
abortController?: AbortController // 用于取消
priorMtime: number // 用于 kill 时回滚锁
}
两个阶段非常简单:starting(刚启动)和 updating(第一次 Edit/Write 工具调用落地后切换)。注释明确说了 不解析 dream agent 的 4 阶段内部结构(orient/gather/consolidate/prune),只看有没有实际写文件。
2.2 增量转弯记录
const MAX_TURNS = 30
export function addDreamTurn(
taskId: string,
turn: DreamTurn,
touchedPaths: string[],
setAppState: SetAppState,
): void {
updateTaskState<DreamTaskState>(taskId, setAppState, task => {
const seen = new Set(task.filesTouched)
const newTouched = touchedPaths.filter(p => !seen.has(p) && seen.add(p))
// 空 turn 且没有新文件 → 跳过,避免无意义重渲染
if (turn.text === '' && turn.toolUseCount === 0 && newTouched.length === 0) {
return task
}
return {
...task,
phase: newTouched.length > 0 ? 'updating' : task.phase,
filesTouched: newTouched.length > 0
? [...task.filesTouched, ...newTouched]
: task.filesTouched,
turns: task.turns.slice(-(MAX_TURNS - 1)).concat(turn),
}
})
}
滑动窗口只保留最近 30 轮。filesTouched 用 Set 去重。注释特别强调这只是 “至少碰了这些文件”,因为 Bash 命令里的写操作无法被 pattern-match 捕获。
2.3 Kill 与锁回滚
async kill(taskId, setAppState) {
let priorMtime: number | undefined
updateTaskState<DreamTaskState>(taskId, setAppState, task => {
if (task.status !== 'running') return task
task.abortController?.abort()
priorMtime = task.priorMtime
return { ...task, status: 'killed', endTime: Date.now(), notified: true }
})
// 回滚锁的 mtime,让下次 session 可以重试
if (priorMtime !== undefined) {
await rollbackConsolidationLock(priorMtime)
}
}
Dream 被 kill 时,不仅终止 agent,还要回滚 consolidation lock 的 mtime。这确保了 kill 不会导致记忆整理永久跳过 — 下个 session 检测锁时间戳时,会认为上次整理没完成,重新触发。
2.4 仅 UI 通知
export function completeDreamTask(taskId, setAppState): void {
// notified: true immediately — dream 没有 model-facing 通知路径
// 它是纯 UI 展示,eviction 需要 terminal + notified
updateTaskState<DreamTaskState>(taskId, setAppState, task => ({
...task, status: 'completed', endTime: Date.now(), notified: true,
}))
}
Dream 完成时 不通知模型,只更新 UI。主 agent 不需要知道后台整理的结果 — 下次读 memory 文件时自然会看到更新。
三、Memory Extraction 源码实证
源文件:
src/services/extractMemories/extractMemories.ts+prompts.ts类型定义:src/memdir/memoryTypes.ts
Memory Extraction 是 Claude Code 在 每轮对话结束后 自动提取有价值信息并写入持久化记忆的机制。
3.1 四种记忆类型
// memoryTypes.ts
export const MEMORY_TYPES = ['user', 'feedback', 'project', 'reference'] as const
| 类型 | 内容 | 示例 |
|---|---|---|
user |
用户角色、目标、偏好 | “用户是数据科学家,关注可观测性” |
feedback |
用户对工作方式的指导 | “不要 mock 数据库,上次 mock 通过但线上挂了” |
project |
项目进展、决策、截止日期 | “周四后冻结非关键合并,移动端在切分支” |
reference |
外部系统指针 | “pipeline bug 追踪在 Linear 项目 INGEST 里” |
注意 什么不存:代码模式、架构、文件结构、git 历史 — 这些可以从当前代码直接推导出来。记忆只存 不可推导 的信息。
3.2 Forked Agent 模式
Memory Extraction 使用 runForkedAgent — 主对话的一个完美分叉,共享 prompt cache:
const result = await runForkedAgent({
promptMessages: [createUserMessage({ content: userPrompt })],
cacheSafeParams,
canUseTool,
querySource: 'extract_memories',
forkLabel: 'extract_memories',
skipTranscript: true, // 不记录到 transcript,避免和主线程竞争
maxTurns: 5, // 硬上限,防止验证兔子洞
})
分叉 agent 的关键约束:
- 工具权限:Read/Grep/Glob 不受限;Bash 只允许只读命令;Edit/Write 只允许写 auto-memory 路径
- 轮次上限:最多 5 轮。正常模式是 2-4 轮(第 1 轮并行读所有要改的文件,第 2 轮并行写)
- 不记录 transcript:避免和主线程的写操作竞争
3.3 Cursor-based 增量处理
提取不是每次处理全部消息,而是用游标追踪上次处理到哪:
// 闭包内的可变状态
let lastMemoryMessageUuid: string | undefined
function countModelVisibleMessagesSince(
messages: Message[],
sinceUuid: string | undefined,
): number {
if (sinceUuid === null || sinceUuid === undefined) {
return count(messages, isModelVisibleMessage)
}
let foundStart = false
let n = 0
for (const message of messages) {
if (!foundStart) {
if (message.uuid === sinceUuid) { foundStart = true }
continue
}
if (isModelVisibleMessage(message)) { n++ }
}
// 游标 UUID 不存在(被 context compaction 删除)→ 回退到全量计数
if (!foundStart) {
return count(messages, isModelVisibleMessage)
}
return n
}
游标找不到时(context compaction 后消息被删除),回退到全量计数 — 宁可多处理也不漏。
3.4 互斥机制
主 agent 和提取 agent 不会同时写记忆:
if (hasMemoryWritesSince(messages, lastMemoryMessageUuid)) {
// 主 agent 已经写了 → 跳过,推进游标
lastMemoryMessageUuid = lastMessage.uuid
return
}
hasMemoryWritesSince 扫描游标之后的 assistant 消息,检查是否有 Edit/Write 工具调用写了 auto-memory 路径。如果主 agent 自己写了记忆,后台提取就跳过这个范围。
3.5 Overlap Guard + Trailing Run
防止并发,但不丢消息:
let inProgress = false
let pendingContext: { context: REPLHookContext; ... } | undefined
// 如果正在运行,暂存最新上下文
if (inProgress) {
pendingContext = { context, appendSystemMessage }
return
}
// runExtraction 的 finally 块
finally {
inProgress = false
const trailing = pendingContext
pendingContext = undefined
if (trailing) {
// trailing run 使用推进后的游标,只处理新增消息
await runExtraction({ ...trailing, isTrailingRun: true })
}
}
设计要点:pendingContext 只保留 最新的(覆盖旧的),因为最新上下文包含最多消息。trailing run 用的是更新过的游标,所以只处理两次调用之间新增的消息,不会重复。
3.6 提取 Prompt 的结构
function opener(newMessageCount: number, existingMemories: string): string {
return [
`You are now acting as the memory extraction subagent.`,
`Analyze the most recent ~${newMessageCount} messages above...`,
`Available tools: Read, Grep, Glob, read-only Bash, Edit/Write for memory dir only.`,
`You have a limited turn budget. Efficient strategy:`,
` turn 1 — parallel Read all files you might update`,
` turn 2 — parallel Write/Edit all changes`,
// 预注入已有记忆目录清单
existingMemories.length > 0
? `## Existing memory files\n${existingMemories}\nCheck before writing — update, don't duplicate.`
: '',
].join('\n')
}
预注入 memory 目录清单(文件名 + frontmatter 摘要),省掉一轮 ls 调用。
3.7 记忆存储格式
每条记忆是一个独立 Markdown 文件,带 frontmatter:
---
name: 用户偏好-简洁回复
description: 用户不喜欢冗长总结,直接给 diff
type: feedback
---
不要在回复末尾总结刚做了什么,用户可以自己看 diff。
**Why:** 用户明确说过 "stop summarizing, I can read the diff"
**How to apply:** 所有回复都省略末尾总结段落
存储路径:~/.claude/projects/<sanitized-git-root>/memory/
MEMORY.md 是索引文件(不是记忆本身),每行一个指针:- [Title](file.md) — one-line hook,200 行截断。
四、从零实现:自治 Agent 系统
理解了 Claude Code 的架构后,用 Python 搭一个简化版。核心思路保持一致:WORK/IDLE 双态循环 + 任务板自动认领。
4.1 两阶段生命周期
每个自治 Agent 有两个状态:
WORK 阶段:
像普通 Agent 一样:LLM 调用工具,直到任务完成
IDLE 阶段:
每 5 秒轮询一次:
1. 检查收件箱(有新消息 → 进入 WORK)
2. 扫描任务板(有可认领任务 → claim → 进入 WORK)
3. 60 秒无活动 → SHUTDOWN
4.2 核心工具:idle 和 claim_task
idle 工具:Agent 完成任务后主动调用,告诉框架”我空了,可以接新任务”。
def run_idle(agent_name: str) -> str:
"""进入 IDLE 状态,开始轮询"""
_set_agent_status(agent_name, "IDLE")
return f"Agent {agent_name} is now IDLE. Polling for new tasks..."
claim_task 工具:从任务板认领一个可用任务。
def run_claim_task(agent_name: str) -> str:
"""从任务板认领一个 ready 状态的未认领任务"""
ready_tasks = task_manager.list_ready()
unclaimed = [t for t in ready_tasks if not t.get("claimed_by")]
if not unclaimed:
return "No unclaimed tasks available."
# 认领第一个可用任务
task = unclaimed[0]
task["claimed_by"] = agent_name
task["status"] = "in_progress"
task_manager._save(task)
return (f"Claimed task: {task['id']}\n"
f"Title: {task['title']}\n"
f"Description: {task['description']}")
4.3 IDLE 轮询循环
import time
def idle_loop(agent_name: str, system_prompt: str):
"""IDLE 阶段:轮询收件箱和任务板"""
idle_start = time.time()
IDLE_TIMEOUT = 60 # 60 秒无活动则关机
while True:
# 1. 检查收件箱
new_msgs = team.read_inbox(agent_name)
if new_msgs != "No new messages.":
print(f"[{agent_name}] Got messages, entering WORK")
work_loop(agent_name, system_prompt,
f"收件箱新消息:\n{new_msgs}")
idle_start = time.time() # 重置超时计时
continue
# 2. 扫描任务板
unclaimed = [t for t in task_manager.list_ready() if not t.get("claimed_by")]
if unclaimed:
task = unclaimed[0]
print(f"[{agent_name}] Found unclaimed task {task['id']}, claiming")
result = run_claim_task(agent_name)
work_loop(agent_name, system_prompt,
f"认领到新任务:\n{result}")
idle_start = time.time()
continue
# 3. 检查超时
if time.time() - idle_start > IDLE_TIMEOUT:
print(f"[{agent_name}] Idle timeout, shutting down")
_set_agent_status(agent_name, "SHUTDOWN")
return
time.sleep(5)
4.4 WORK 循环
def work_loop(agent_name: str, system_prompt: str, initial_message: str):
"""WORK 阶段:标准 Agent 循环"""
_set_agent_status(agent_name, "WORKING")
# 注入身份信息(防止 context compact 后遗忘身份)
full_system = f"{system_prompt}\n\n你的名字是 {agent_name}。完成任务后调用 idle 工具进入待机状态。"
messages = [{"role": "user", "content": initial_message}]
for _ in range(50): # 安全限制
# 上下文压缩
micro_compact(messages)
if estimate_tokens(messages) > THRESHOLD:
messages[:] = auto_compact(messages)
# 压缩后重新注入身份
messages.insert(0, {
"role": "user",
"content": f"[身份恢复] 你是 {agent_name}。{system_prompt}"
})
response = client.messages.create(
model=MODEL, system=full_system,
messages=messages, tools=AGENT_TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
idle_called = False
for block in response.content:
if block.type == "tool_use":
if block.name == "idle":
idle_called = True
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": run_idle(agent_name)
})
else:
output = TOOL_HANDLERS.get(block.name, lambda **kw: "Unknown")(**block.input)
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)
})
messages.append({"role": "user", "content": results})
if idle_called:
break # 退出 WORK,进入 IDLE
4.5 完整 Agent 入口
def autonomous_agent(name: str, system_prompt: str, initial_task: str = None):
"""自治 Agent 的完整生命周期"""
_set_agent_status(name, "WORKING")
if initial_task:
work_loop(name, system_prompt, initial_task)
# 进入 IDLE 循环,等待新任务
idle_loop(name, system_prompt)
4.6 自组织的完整流程
Leader:
创建任务板:
task_manager.create("实现登录 API", blocked_by=[])
task_manager.create("实现注册 API", blocked_by=[])
task_manager.create("集成测试", blocked_by=["task_001", "task_002"])
启动团队:
spawn("coder_1", "专注后端 API 开发")
spawn("coder_2", "专注后端 API 开发")
自动发生:
coder_1 → 认领 task_001(登录 API)→ 开始 WORK
coder_2 → 认领 task_002(注册 API)→ 开始 WORK
coder_1 完成 → 调用 idle() → 扫描任务板
coder_2 完成 → 调用 idle() → 扫描任务板
task_003 被自动解锁(task_001 + task_002 都完成)
coder_1 认领 task_003(集成测试)→ 开始 WORK
coder_2:60 秒无任务 → 自动关机
五、架构对比
| 维度 | Claude Code 真实架构 | 从零实现 |
|---|---|---|
| 调度模型 | Coordinator 主动分派 + Worker 异步回报 | IDLE 轮询 + 自动认领 |
| Worker 管理 | Agent/SendMessage/TaskStop 三件套 | spawn + 任务板 |
| 结果传递 | <task-notification> XML user-role 消息 |
收件箱消息 |
| 记忆提取 | forked agent + cursor-based 增量 + 互斥 | 无(可扩展) |
| 记忆整理 | Dream Task 后台运行 | 无(可扩展) |
| 工具权限 | canUseTool 函数精细控制 | 全量工具 |
| 并发控制 | overlap guard + trailing run | 轮询串行 |
| Prompt 管理 | 完整的合成指令(coordinator 理解后再委派) | 直接传递任务描述 |
Claude Code 的核心设计哲学:Coordinator 不是传话筒,是理解者。它读完 Worker 的研究报告后,要自己理解,然后写出包含具体文件路径、行号、修改方案的精确指令。
从零实现的版本更简单 — Agent 自己扫任务板、自己认领 — 适合理解自治 Agent 的基本机制。但要做到生产级别,需要补上 Claude Code 源码中展示的这些机制:增量处理、互斥写入、权限控制、优雅终止。
六、设计哲学:协调器是指挥家,不是演奏者
设计指南用一个精妙的比喻描述 Coordinator 模式:
协调器是多 Agent 系统的”指挥家”——它不演奏,但决定谁演奏什么。
这个比喻揭示了 Coordinator 的三个核心职责:
- 理解全局:读完所有 Worker 的报告后,Coordinator 自己消化理解,然后生成包含具体文件路径、行号、修改方案的精确指令——而不是简单转发用户的原始请求
- 选择合适的演奏者:Explore Agent 做只读分析,Plan Agent 做方案设计,general-purpose Agent 做实际修改——不同任务分配给不同专业能力的 Agent
- 整合最终结果:收集各 Agent 的输出,综合成用户需要的完整答案
设计指南将这种模式提升为一个通用工程原则:复杂系统的管理本身也需要专门的角色。不是每个团队成员都应该做调度,也不是调度者应该亲自执行。这和软件架构中的”关注点分离”一脉相承。
从源码中还能看到一个重要的设计决策:Coordinator 通过 feature flag 双重门控 启用,而且有 session mode 恢复 机制。这意味着:
- 新功能默认关闭,验证充分后再开放(安全是默认)
- 用户中途切换模式时,系统能优雅地恢复到正确状态(为失败设计)