工作流编排:Routing 与 Parallelization
生产级 Agent 任务很少只是一个 ReAct 循环跑到底。它通常是一个工作流——根据输入选择不同执行路径、把可独立的子任务并行起来、把若干阶段串成有序流水。把"步骤拓扑"从 LLM 的隐性决策提到代码层显式化,是 Agent 工程从个例 demo 走到批量生产的分水岭。下面拆开工作流编排里最基础也最高频的三个模式:Chaining、Routing、Parallelization——它们是 Agent 系统组装多步执行的积木,掌握了这三个,多 Agent 协作就只是把同一套积木从"单 Agent 内"扩展到"多 Agent 间"。
1. 三个模式的核心问题
| 模式 | 解决什么 | 何时第一时间想到它 |
|---|---|---|
| Chaining | 把多个 LLM/工具调用串成有依赖关系的流水 | 任务必须 A 完成后才能做 B,B 完成后才能做 C |
| Routing | 根据输入特征把请求分发到不同处理路径 | "这个问题该用 RAG 还是工具?该用大模型还是小模型?" |
| Parallelization | 把无依赖的子任务同时执行 | 任务能拆成"互不相关的 N 份"或"对同一份数据做 N 种独立处理" |
三者经常组合出现——一个生产 Agent 的典型工作流可能是:"Router 路由 → 大块拆分 → 并行处理 → 顺序汇聚"。但拆开学一遍,才能在组合时不混淆。
底层有一个共同点:工作流模式让步骤拓扑显式化。在纯 ReAct 循环里,所有步骤的连接关系都隐藏在 LLM 的下一步选择里——你只能事后看 trace 知道它做了什么。工作流模式把"先做什么、后做什么、谁和谁能并行"提到代码层显式定义,可观测、可优化、可测试。这是 Agent 工程从"个例 demo"到"可批量生产"的关键转变。
2. Chaining:顺序流水
Chaining 的核心是步骤间显式依赖 + 顺序执行。它和 ReAct 循环的根本区别在于"控制权归谁":
| 维度 | ReAct 循环 | 显式 Chain |
|---|---|---|
| 步骤数 | 不确定,由 LLM 决定 | 固定,代码层声明 |
| 步骤间连接 | 隐式(在 LLM 选择里) | 显式(代码定义"step1 输出 → step2 输入") |
| 失败处理 | LLM 看错误重新选择 | 每步独立 try/except、retry、fallback |
| 适合任务 | 探索性、步骤模糊的 | 流程确定、合规要求高的 |
ReAct 适合做"路径不确定"的探索,Chain 适合编码"已经稳定的业务流程"——比如"采集 → 清洗 → 提取实体 → 入库 → 通知"。
2.1 带 checkpoint 的 Chain 骨架
Chain 在生产里几乎不用"裸版",至少要带 checkpoint——长链路里第 7 步失败时不能从第 1 步重跑:
def run_chain(input_data, steps, run_id) -> ChainState:
"""每步成功后落 checkpoint,失败可从最近 checkpoint 恢复"""
state = load_checkpoint(run_id) or ChainState(input=input_data, outputs={}, cursor=0)
for i in range(state.cursor, len(steps)):
step = steps[i]
try:
state.outputs[step.name] = step.run(state)
state.cursor = i + 1
save_checkpoint(run_id, state) # 每步成功立刻落 checkpoint
except StepError as e:
if step.required:
raise ChainAborted(step.name, e, partial_state=state)
state.outputs[step.name] = step.fallback(state, e)
state.cursor = i + 1
save_checkpoint(run_id, state)
return state
Checkpoint 的代价是每步多一次持久化写入;收益是长链路、贵步骤、长跑任务下重试成本断崖式下降。判断要不要上 checkpoint,看"一次重跑要烧多少"——超过几美元的链路基本都该上。
2.2 不可逆步骤要补 Saga
如果 Chain 中间有不可逆副作用(发邮件、扣款、创建资源),靠 try/except 兜不住——失败时已经发生的副作用没法回滚。这时要补 Saga 补偿模式:每个不可逆步骤声明对应的补偿动作,链路失败时按反向顺序执行补偿:
def run_saga_chain(steps, input_data):
"""每步要么成功并记录补偿动作,要么触发反向补偿链"""
compensations = [] # 已成功步骤的补偿动作栈
state = ChainState(input=input_data, outputs={})
try:
for step in steps:
state.outputs[step.name] = step.run(state)
if step.compensate is not None:
compensations.append((step.name, step.compensate, state.snapshot()))
return state
except StepError as e:
# 反向执行所有已注册的补偿动作
for name, compensate, snapshot in reversed(compensations):
try:
compensate(snapshot)
except Exception as cex:
log_compensation_failure(name, cex) # 补偿失败需要人审
raise ChainAborted(failed_step=e.step, compensations_run=len(compensations))
Saga 模式从分布式事务领域来——Agent 工作流跨多个外部系统时本质就是分布式事务,所以同样的解法在这里继续生效。补偿动作本身也可能失败——补偿失败需要告警 + 人审,因为这意味着系统进入了不一致状态。
实战中常见的形态是Chain 套 ReAct:外层流程显式 Chain 编排,某一步内部跑 ReAct 完成探索性子任务。这样既能保证整体流程的可控性,又能给关键步骤留出 LLM 自由发挥的空间。
3. Routing:根据输入选择路径
3.1 Routing 的本质
Routing 模式回答一个问题:面对一个输入,应该走哪条处理路径? 它把"选择"从 LLM 的隐性决策提升为系统的显式决策。
典型路由场景:
| 输入特征 | 路由到 |
|---|---|
| 短问答类(如"什么是 RAG") | 小模型 + 缓存 |
| 涉及实时数据(如"今天股价") | 工具调用路径 |
| 涉及私有知识(如"我们的产品文档") | RAG 路径 |
| 复杂多步任务(如"做一份竞品分析") | 完整 Agent 循环 |
| 涉及账户操作(如"删除我的订单") | HITL 审批路径 |
不做 Routing 的代价:所有请求都走最重的路径——延迟翻倍、成本翻数倍、简单问题的回答质量反而下降(大模型在简单问题上有时会"过度思考")。Routing 是 Agent 系统的第一个性能与成本杠杆。
3.2 三种 Routing 实现
| 方式 | 决策依据 | 准确率 | 成本 | 适合 |
|---|---|---|---|---|
| 规则路由 | 关键词、正则、参数特征 | 高(在覆盖到的模式上) | 零(不调 LLM) | 类别有限、特征明显 |
| 分类器路由 | 小模型/embedding 相似度 | 中-高 | 低(一次小模型调用 或 一次向量比对) | 类别 5-50 个、有训练数据 |
| LLM 路由 | LLM 读输入 + 路由 prompt 输出 enum | 高 | 中(一次 LLM 调用) | 类别复杂、规则难写 |
生产系统通常三层组合:先规则路由(覆盖 80% 高频明显的)→ 分类器路由(覆盖 15% 中等复杂的)→ LLM 路由(兜底 5% 复杂或新型的)。这种组合的关键是让最便宜的方式覆盖最多流量。
分类器路由通常用一个小型 embedding 模型(如 BGE-small / text-embedding-3-small),把每个路由类别预先 embed 成几条"代表样本向量",新请求过来时算 cosine 相似度:
def classifier_route(user_input: str) -> RouteDecision:
"""用 embedding 相似度做意图分类——零 LLM 调用、毫秒级"""
query_vec = embed_small(user_input)
best_route, best_score = None, -1
for route_name, sample_vecs in ROUTE_PROTOTYPES.items():
# 每个路由 5-10 个代表样本的向量已离线预算好
# 取与代表样本的最大相似度作为该路由的得分
score = max(cosine(query_vec, s) for s in sample_vecs)
if score > best_score:
best_route, best_score = route_name, score
if best_score >= 0.78:
return RouteDecision(route=best_route, confidence=best_score, method="classifier")
return RouteDecision(route=None, confidence=best_score, method="classifier") # 让 LLM 路由兜底
# 离线构建 ROUTE_PROTOTYPES 时,每个路由的"代表样本"从生产日志里挖出 5-10 条最典型的真实请求
分类器路由的工程优势:毫秒级响应、几乎零成本、可解释性强——失败时能直接看是哪个 prototype 离用户问题最近。劣势是新意图加入时要重新挖代表样本和调阈值。
3.3 LLM-based Routing 的 Prompt 设计
LLM Routing 的输出必须是结构化的——不能给一段散文。典型 Schema:
{
"name": "route_decision",
"parameters": {
"type": "object",
"properties": {
"route": {
"type": "string",
"enum": ["qa_simple", "rag", "tool_calling", "agent_full", "hitl_approval", "unsupported"]
},
"confidence": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"reasoning": {
"type": "string",
"description": "一句话解释路由依据"
}
},
"required": ["route", "confidence"]
}
}
注意三个设计点:
route用enum,不让 LLM 自由发挥——避免输出 "qa_simple_v2" 这种系统不认识的路由- 必须有 confidence——低 confidence 时降级到上一层路由器或 fallback
- 必须包含 unsupported 兜底——LLM 经常面对"不在已知类别里"的输入,强迫它选一个会让路由错乱
Router 的 System Prompt 关键段:
你是一个请求分类器,把用户输入分到下列路由:
- qa_simple: 简单事实性问答,1 句话能答的
- rag: 涉及私有知识库的查询
- tool_calling: 需要实时数据或外部系统操作的
- agent_full: 复杂多步骤任务(需要规划、迭代)
- hitl_approval: 涉及不可逆操作(删除、转账、对外发送)
- unsupported: 不属于以上任何类别
规则:
- 一律输出 enum 中的值,不要造新词
- confidence 低于 0.6 时,必须选 unsupported 而非猜测
- 涉及多个动作的,按"最高风险动作"归类
- 用户表达不清时,倾向选 unsupported 触发追问
这段 prompt 里 "倾向选 unsupported" 是关键反直觉点——大多数路由 prompt 会要求"尽量匹配",但生产中宁可"我不知道"也不能"猜错",因为路由错了下游全错。
3.4 Routing 的关键设计点
Confidence 阈值与降级。Routing 必须有 confidence 评分和 fallback 路径:
def smart_route(user_input):
# 第 1 层:规则
if r := rule_router(user_input):
return r
# 第 2 层:embedding 分类器
cls_result = embedding_classifier(user_input)
if cls_result.confidence >= 0.85:
return cls_result.route
# 第 3 层:LLM 路由
llm_result = llm_router(user_input)
if llm_result.confidence >= 0.6:
return llm_result.route
# 都不行,进入 fallback:要么追问用户,要么走最保守路径
return "ask_clarification"
路由错误的代价不对称。把"复杂任务"误判为"简单 QA"的代价是回答不完整;把"简单 QA"误判为"完整 Agent"的代价是延迟从 200ms 变成 30s。前者用户看得见、后者用户感受得到——但通常前者更可接受。所以 Router 应该偏向高复杂度路径——拿不准就走重路径,宁可慢一点也不能漏掉关键信息。
路由的可观测性。每次路由必须记 trace:输入摘要、路由结果、confidence、所走路径的延迟和成本。线上才能回答"哪些请求被路由到了高成本路径?是不是路由错了?"——没有这层数据,路由策略改起来全凭直觉。
Routing 不只在入口。一个常见误解是"Routing 是 Agent 系统的入口分流器"。实际上 Routing 在 Agent 内部到处都是——选哪个工具是 Routing(隐性的)、选哪个 RAG 索引是 Routing、选用哪个 prompt 模板是 Routing。把"内部路由"也显式化,能定位 70% 的 Agent 不稳定问题。
4. Parallelization:把可独立的步骤同时执行
4.1 三种并行场景
| 场景 | 例子 | 关键判断 |
|---|---|---|
| Fan-out 任务并行 | 同时调用 3 个不同工具收集信息 | 子任务彼此无依赖 |
| Map-Reduce 数据并行 | 把 100 页文档分块,每块独立 LLM 总结,再合并 | 数据可独立处理 + 结果可聚合 |
| Scatter-Gather 多源并行 | 同时查 4 个数据源,整合最有用的 | 多源等价但可靠性/速度不同,取胜者 |
判断"能不能并行"的核心问题:子任务的输出是否依赖于其他子任务的输出? 不依赖就能并行;依赖必须串行(这时回到 Chaining)。
4.2 Fan-out 的最小实现
async def fan_out(tasks, input_data, timeout=30):
"""并行执行多个独立子任务,等所有完成或超时"""
coros = [task.run_async(input_data) for task in tasks]
results = await asyncio.gather(*coros, return_exceptions=True)
# 区分成功/失败
success, failed = [], []
for task, result in zip(tasks, results):
if isinstance(result, Exception):
failed.append((task.name, result))
else:
success.append((task.name, result))
return {"success": success, "failed": failed}
# 使用示例:并行调三个信息源
results = await fan_out(
tasks=[search_web, search_kb, search_news],
input_data="2026 年 AI Agent 市场规模",
timeout=30
)
return_exceptions=True 是关键——不让一个子任务失败拖垮整个并行批次。失败的子任务在 failed 中单独处理:可能是 retry、可能是 fallback、可能是直接忽略。
4.3 Map-Reduce 模式
处理超长文档、批量数据的标准模式:
async def map_reduce(items, mapper, reducer, batch_size=10):
"""map: 每个 item 并行处理;reduce: 把所有结果聚合"""
# Map 阶段:并行处理所有 items(分批以控制并发数)
map_results = []
for batch in batched(items, batch_size):
batch_results = await asyncio.gather(*[mapper(item) for item in batch])
map_results.extend(batch_results)
# Reduce 阶段:聚合
return await reducer(map_results)
# 使用示例:长文档总结
chunks = split_document(long_doc, chunk_size=2000)
summary = await map_reduce(
items=chunks,
mapper=lambda c: llm.summarize(c),
reducer=lambda summaries: llm.synthesize("\n\n".join(summaries))
)
Map-Reduce 在 Agent 中最常见的两个用途:
| 用途 | Map 做什么 | Reduce 做什么 |
|---|---|---|
| 长文档处理 | 分块独立总结/提取 | 合并去重 + 重写为统一格式 |
| 多文档比较 | 每个文档独立打分 | 排序 + 综合判断 |
Map-Reduce 在 Agent 中最大的注意点是 Reduce 阶段也是 LLM 调用——Reduce 的输入是所有 Map 结果,可能超过单次上下文窗口。这时要做分层 Reduce:
async def hierarchical_reduce(map_results, reducer, fanout=10):
"""分层 Reduce:每 fanout 个结果先合一次,直到能塞进单次 LLM 调用"""
level = map_results
while len(level) > fanout:
# 把当前层分组、每组并行 reduce
groups = [level[i:i + fanout] for i in range(0, len(level), fanout)]
next_level = await asyncio.gather(*[reducer(g) for g in groups])
level = next_level
# 最后一层一次性 reduce
return await reducer(level)
# 例:100 个 chunk 的总结
# Level 0: 100 个 → 10 组每组 10 个 → 10 个中间总结
# Level 1: 10 个 → 1 组 10 个 → 1 个最终总结
# 总共 10 + 1 = 11 次 reducer 调用,而不是想用 1 次塞 100 个塞不下
判断要不要分层:Map 结果数 × 每个结果长度 > Reduce 模型 context 的 50% 就该分层(留一半给 prompt 和输出)。每层 fanout 一般 5-15——太小会层数过多累积误差,太大会单次 context 接近上限。
4.4 Scatter-Gather:多源择优
async def scatter_gather(sources, query, selector):
"""同时查多源,按 selector 策略选最佳结果"""
coros = [src.query_async(query) for src in sources]
results = await asyncio.gather(*coros, return_exceptions=True)
valid = [r for r in results if not isinstance(r, Exception)]
if not valid:
return None
return selector(valid)
# 使用示例:取最快返回的有效结果
result = await scatter_gather(
sources=[google_search, bing_search, kb_search],
query="GPT-5 发布时间",
selector=lambda rs: max(rs, key=lambda r: r.relevance_score)
)
Scatter-Gather 的关键设计是 selector 策略:
| 策略 | 何时用 |
|---|---|
| 取最快返回 | 容灾场景(任一源回来都行) |
| 取最高质量 | 准确率优先(综合评分挑最优) |
| 全部融合 | 召回优先(合并去重所有结果) |
4.5 并行的关键设计点
并发数控制。无脑 asyncio.gather 100 个任务会被 API 限流或耗尽连接池。生产实践是用 Semaphore 或分批:
semaphore = asyncio.Semaphore(10) # 最多 10 个并发
async def bounded_task(task, input_data):
async with semaphore:
return await task.run_async(input_data)
Token 成本的隐藏面。并行 N 个 LLM 调用意味着 N 倍的 token 消耗。Map-Reduce 处理 100 个 chunk = 100 次 Map LLM 调用 + 至少 1 次 Reduce LLM 调用。比 ReAct 串行处理快 N 倍,但贵也是 N 倍。
部分失败的处理策略。三种典型策略:
| 策略 | 含义 | 何时用 |
|---|---|---|
| All-or-Nothing | 任一失败则整体失败 | 子任务都关键,缺一不可 |
| Best-Effort | 失败的丢弃,成功的继续 | 子任务等价,部分结果可接受 |
| Fail-Open with Retry | 失败的单独 retry,最终 fallback | 关键子任务,但有降级路径 |
确定性 vs 一致性。并行调用 LLM 多次的结果可能不一致(即使温度=0,也可能因 batch 效应有微小差异)。如果 Reduce 阶段依赖 Map 结果的确定性顺序或一致性,要在 Reduce 时做去重和规范化。
5. 三种模式的组合:典型工作流
实际生产系统几乎都是三种模式的组合。一个完整的"信息综合"Agent 的工作流:
这个流里出现了本篇讲的三种模式——Routing(入口 + 子查询路由)、Parallelization(Fan-out + Map-Reduce)、Chaining(顺序综合)——再叠加一层独立的 Reflection 做兜底校验。这是大多数生产 Agent 系统的实际形态——很少是纯 ReAct,几乎都是工作流编排。
工作流编排和 ReAct 不是替代关系。工作流提供"骨架",每个工作流节点内部仍然可以是一个小型 ReAct 循环。骨架决定"步骤拓扑",ReAct 在每个节点内决定"如何完成这一步"。
6. 三种模式的踩坑指南
6.1 Routing 类
反模式:用一个大 LLM 路由器路由所有请求。每次入口都消耗一次 LLM 调用,对简单请求是浪费。应该规则 + 分类器先过一遍。
反模式:路由 prompt 用自由文本输出("你应该用 RAG 还是 Tool?")。LLM 会输出散文,需要正则匹配反解析,准确率低。必须用 enum schema。
反模式:路由没有 confidence 阈值。低 confidence 直接强行路由 = 错误路径执行 = 浪费下游资源 + 用户体验崩溃。
反模式:所有路由都强行匹配最近类别。Router 应该有 unsupported 或 ask_clarification 分支,而不是把 50% 不确定的请求强行塞到错误类别。
反模式:路由策略静态化。不监控路由分布、不分析路由错误率,路由会逐步偏离实际。Router 是需要持续优化的组件,要把 routing trace 接入评估系统。
6.2 Parallelization 类
反模式:把有依赖的步骤强行并行。step2 需要 step1 的输出,但被并行启动——step2 会用上一次的旧数据或空数据,产生静默错误。并行前必须确认依赖图。
反模式:并行任务全部 await asyncio.gather,单点失败拖垮整批。必须用 return_exceptions=True,区分成功失败分别处理。
反模式:并发数无上限。100 个并行 LLM 调用直接打满 rate limit,全部失败重试,比串行还慢。Semaphore 是底线。
反模式:Reduce 阶段直接喂入所有 Map 结果给 LLM。Map 结果加起来很容易超过上下文窗口。必须做分层 Reduce 或递归 Reduce。
反模式:忽视并行的 token 成本。"我把 ReAct 改成 Map-Reduce 加速了 10 倍"——但账单也涨了 10 倍。要权衡延迟与成本。
6.3 Chaining 类
反模式:把 Chain 写成一坨 if-else。维护到 5-6 步就读不懂了。用配置化的 Step 列表 + 通用的 runner,新增步骤只是加一个 Step 对象。
反模式:Chain 失败时整体重跑。第 5 步失败时不应该从第 1 步开始重跑——前 4 步的结果是确定的,应该有 checkpoint 机制,从失败的步骤继续。
反模式:Chain 步骤间用全局变量传状态。要用显式的 state 对象,每步声明读什么、写什么。这样可观测、可单步调试、可单步重试。
7. 工作流与 Agent 的边界
工作流编排做的事情很朴素:把步骤拓扑从 LLM 的隐性决策里提到代码层。这件事看起来不起眼,但它是 Agent 工程从个例 demo 走到批量生产的分水岭——拓扑显式之后,节点边界变成可观测、可测试、可优化的最小单元,问题能定位到具体节点而不是"模型表现不稳定"。
工作流是骨架层,其他所有模式都在它的节点里发生:工作流节点内部通常跑 Agent 控制循环、RAG 是 Fan-out 中常见的数据源、Planner 输出的子任务图直接对应 Fan-out + Chain 的拓扑、流行框架(LangGraph、CrewAI)的本质就是工作流引擎、节点边界天然是 trace span 和 Guardrail/HITL 介入的位置。把这些关系压成一句话:工作流回答步骤拓扑,节点内回答这一步怎么完成——前者是确定性的代码骨架,后者可以是确定性代码也可以是 LLM 推理。
Routing 和 Parallelization 各自解决一个真问题。Routing 解决"成本和质量的路径选择"——所有请求走最重路径是 Agent 成本爆炸最常见的原因,三层组合(规则、分类器、LLM 路由)能把流量精准分到对应成本档位;设计要点是 confidence 阈值和 unsupported 兜底,宁可承认"我不知道",不要硬猜路径。Parallelization 解决"延迟的并行化"——但并行不是免费的,N 个并行 LLM 调用就是 N 倍 token 成本,决策时要同时盯延迟和账单。
生产 Agent 几乎都是 Chain + Routing + Parallelization 的组合,纯 ReAct 只是工作流节点内部的局部实现。多 Agent 协作很大程度上就是把这套模式从"单 Agent 内"放大到"多 Agent 间"——Supervisor 本质是 Router、Peer-to-Peer 本质是 Fan-out。把工作流当骨架、节点内当肌肉,比"一个大循环跑到底"鲁棒得多。