Agent控制循环:运行时的核心抽象
如果说 LLM 是 Agent 的大脑,那么 Control Loop 就是 Agent 的心跳。
大多数教程在讲 Agent 时,上来就接框架、调 API、跑 demo。但如果你不理解 Agent 运行时的核心抽象——控制循环——你永远只是在用别人的黑盒。
本文是 Agentic 系列第 04 篇,整个系列的技术基石。我们会从状态机模型出发,逐层拆解 Agent Control Loop 的每一个阶段,给出完整的 Python 实现,并深入分析实际工程中的 trade-off。
1. Agent 的本质:可中断的控制循环
一个常见的误解是把 Agent 等同于"一次 LLM 调用"。实际上,Agent 和 LLM 的关系,类似于操作系统和 CPU 的关系——LLM 是执行推理的计算单元,而 Agent 是管理整个执行生命周期的运行时系统。
LLM 是一个函数: f(prompt) -> completion,输入文本,输出文本,调用一次就结束。
Agent 是一个循环: 它持续运行,在每一轮中观察环境、调用 LLM 进行推理、执行动作、评估结果,然后决定是否继续。
这个循环有几个关键特性:
- 可中断:循环可以在任何阶段暂停,等待外部输入(用户确认、异步工具返回)后恢复
- 有状态:循环维护上下文信息,每一轮的输出影响下一轮的输入
- 有终止条件:循环不会无限运行,它在满足特定条件时停止
- 可观测:循环的每一步都应该是可追踪、可回溯的
理解了这一点,Agent 编程的核心问题就变成了:如何设计和实现这个控制循环?
2. 状态机模型:形式化定义
要严谨地描述 Control Loop,最自然的方式是用有限状态机(FSM)。
2.1 状态定义
一个 Agent Control Loop 可以用以下状态集合描述:
from enum import Enum
class AgentState(Enum):
OBSERVE = "observe" # 接收并归一化输入
THINK = "think" # LLM 推理,决定下一步行动
ACT = "act" # 执行工具调用或产出结果
REFLECT = "reflect" # 评估执行结果,决定是否继续
DONE = "done" # 终止:任务完成
ERROR = "error" # 终止:不可恢复错误
2.2 状态转移图
状态转移规则:
| 当前状态 | 条件 | 下一状态 |
|---|---|---|
| OBSERVE | 输入就绪 | THINK |
| THINK | LLM 返回 tool_call | ACT |
| THINK | LLM 返回最终回答 | DONE |
| THINK | LLM 调用异常 | ERROR |
| ACT | 工具执行完成 | REFLECT |
| ACT | 工具执行失败 | REFLECT (带错误信息) |
| REFLECT | 需要继续 | OBSERVE (将结果作为新输入) |
| REFLECT | 任务完成 | DONE |
| REFLECT | 超过重试上限 | ERROR |
2.3 与 OODA Loop 的对比
Agent Control Loop 并不是凭空发明的,它和军事决策理论中的 OODA Loop(Observe-Orient-Decide-Act) 有深层的结构对应:
关键区别在于 REFLECT 阶段。传统 OODA Loop 假设决策者能实时感知行动效果并自然融入下一轮 Observe。但 LLM Agent 不具备这种连续感知能力——它需要一个显式的反思步骤来评估工具返回值、判断是否需要修正。这是 Agent Control Loop 相对于经典决策循环的重要改进。
3. 循环中每个阶段的深入分析
3.1 OBSERVE:输入归一化
OBSERVE 阶段的职责是收集并归一化各种来源的输入,将它们统一为 LLM 可理解的格式。
输入来源远不止"用户消息"一种:
输入归一化的核心原则:
所有输入都必须序列化为 message 格式。不管来源是什么,最终都要变成
{"role": ..., "content": ...}的形式,因为 LLM 只理解 message 序列。工具返回值需要结构化包装。不要直接把原始 JSON 甩给 LLM,要附上工具名称、执行状态和必要的摘要信息。
输入需要截断和优先级排序。当多个输入同时到达时,需要决定哪些放进当前轮次的 Context Window,哪些缓存到下一轮。
def observe(self, raw_inputs: list[dict]) -> list[dict]:
"""将原始输入归一化为 LLM message 格式"""
messages = []
for inp in raw_inputs:
match inp["type"]:
case "user_message":
messages.append({"role": "user", "content": inp["text"]})
case "tool_result":
messages.append({
"role": "tool",
"tool_call_id": inp["call_id"],
"content": self._format_tool_result(inp),
})
case "system_event":
messages.append({
"role": "system",
"content": f"[System Event] {inp['event']}",
})
return messages
3.2 THINK:LLM 推理
THINK 阶段是控制循环中最核心的一环——调用 LLM,让它基于当前上下文做出决策。
这个阶段要解决三个问题:
问题一:Context Window 构建
LLM 的输入不是当前轮次的消息,而是从任务开始到现在的完整上下文。构建 Context Window 的典型结构:
问题二:Token 预算控制
Context Window 有上限(4K / 8K / 128K / 200K),而每一轮循环都会增加 message history。如果不加控制,几轮之后就会超限。
常见的预算控制策略:
| 策略 | 实现方式 | 适用场景 |
|---|---|---|
| 硬截断 | 只保留最近 N 条消息 | 简单场景 |
| 滑动窗口 | System Prompt 固定 + 最近 K 轮对话 | 工具调用场景 |
| 摘要压缩 | 将早期对话用 LLM 生成摘要后替换 | 长对话场景 |
| 优先级保留 | 按消息重要性排序,低优先级先丢弃 | 复杂多步任务 |
def _build_context(self, new_messages: list[dict]) -> list[dict]:
"""构建符合 Token 预算的 Context Window"""
self.message_history.extend(new_messages)
context = [self.system_prompt] + self.tool_definitions
remaining_budget = self.max_tokens - self._count_tokens(context)
# 从最新消息开始向前填充,直到预算耗尽
selected = []
for msg in reversed(self.message_history):
msg_tokens = self._count_tokens([msg])
if msg_tokens > remaining_budget:
break
selected.insert(0, msg)
remaining_budget -= msg_tokens
return context + selected
问题三:LLM 输出解析
LLM 的返回可能是纯文本回答(任务完成),也可能是工具调用请求。需要根据返回类型决定下一步状态转移:
def think(self, context: list[dict]) -> ThinkResult:
"""调用 LLM 进行推理"""
response = self.client.chat.completions.create(
model=self.model,
messages=context,
tools=self.tool_schemas,
)
choice = response.choices[0]
if choice.finish_reason == "tool_calls":
return ThinkResult(
action="tool_call",
tool_calls=choice.message.tool_calls,
raw_message=choice.message,
)
else:
return ThinkResult(
action="answer",
content=choice.message.content,
raw_message=choice.message,
)
3.3 ACT:执行层
ACT 阶段负责执行 THINK 阶段决定的动作——通常是调用工具(Tool Calling)。
执行层的核心挑战不是"调用工具"本身,而是以下几个工程问题:
同步 vs 异步执行
同步执行(Simple):
think → call_tool_1 → wait → call_tool_2 → wait → reflect
延迟 = T1 + T2
异步 / 并行执行(Optimized):
think → call_tool_1 ─┬─→ reflect
→ call_tool_2 ─┘
延迟 = max(T1, T2)
当 LLM 在一次返回中请求多个工具调用(parallel tool calling)时,应该并行执行以降低延迟:
import asyncio
async def act(self, tool_calls: list[ToolCall]) -> list[dict]:
"""并行执行多个工具调用"""
tasks = [self._execute_tool(tc) for tc in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
tool_results = []
for tc, result in zip(tool_calls, results):
if isinstance(result, Exception):
tool_results.append({
"type": "tool_result",
"call_id": tc.id,
"status": "error",
"content": f"Tool '{tc.function.name}' failed: {result}",
})
else:
tool_results.append({
"type": "tool_result",
"call_id": tc.id,
"status": "success",
"content": str(result),
})
return tool_results
执行安全
工具执行不是无条件信任的。需要考虑:
- 超时控制:每个工具调用必须有 timeout,防止阻塞整个循环
- 结果大小限制:工具返回值可能非常大(比如查数据库返回 10 万行),需要截断
- 权限校验:某些工具(文件写入、网络请求、代码执行)需要额外的权限检查
- 沙箱执行:代码执行类工具应该在沙箱中运行
3.4 REFLECT:输出质量评估
REFLECT 阶段回答一个关键问题:上一步的执行结果是否满意?是继续、重试还是停止?
这个阶段有两种实现方式:
方式一:隐式反思——让 LLM 在下一轮 THINK 中自行判断
这是最简单的方式。把工具返回值直接送进下一轮 THINK,让 LLM 自己决定是否需要修正。大多数框架(如 OpenAI Assistants API)默认采用这种方式。
优点:实现简单,不增加额外的 LLM 调用。
缺点:LLM 可能"自信地"忽略错误,特别是在返回值看起来合理但语义错误的情况下。
方式二:显式反思——用独立的 LLM 调用进行自我评估
def reflect(self, action_result: dict, task_goal: str) -> ReflectResult:
"""显式反思:评估执行结果"""
prompt = f"""评估以下工具执行结果是否达成了任务目标。
任务目标: {task_goal}
执行结果: {json.dumps(action_result, ensure_ascii=False)}
请回答:
1. 结果是否正确?(yes/no)
2. 是否需要进一步行动?(yes/no)
3. 如果需要,下一步应该做什么?
"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
)
# 解析反思结果...
return ReflectResult(
is_correct=...,
needs_more_action=...,
next_step_hint=...,
)
Trade-off 分析:
| 维度 | 隐式反思 | 显式反思 |
|---|---|---|
| Token 消耗 | 低 | 高(额外一次 LLM 调用) |
| 质量把控 | 依赖 LLM 自觉 | 有独立的质量评估 |
| 延迟 | 低 | 增加一轮 LLM 延迟 |
| 适用场景 | 简单工具调用 | 复杂推理链、高准确性要求 |
实际工程中,常用的折中方案是:对关键步骤用显式反思,对常规步骤用隐式反思。
3.5 终止条件:什么时候停下来?
一个 Agent 如果不知道什么时候停,就是一个烧钱的死循环。终止条件的设计是 Control Loop 中最容易被忽视、但对生产环境最重要的部分。
def should_stop(self, state: LoopState) -> tuple[bool, str]:
"""判断是否应该终止循环"""
# 1. LLM 认为任务完成
if state.last_think_result.action == "answer":
return True, "task_completed"
# 2. 达到最大轮次
if state.turn_count >= self.max_turns:
return True, "max_turns_exceeded"
# 3. Token 预算耗尽
if state.total_tokens >= self.token_budget:
return True, "token_budget_exceeded"
# 4. 连续错误过多
if state.consecutive_errors >= self.max_consecutive_errors:
return True, "too_many_errors"
# 5. 死循环检测(重复输出相同内容)
if self._detect_loop(state.recent_outputs):
return True, "loop_detected"
return False, ""
各终止条件的设计考量:
- max_turns:硬上限,防止失控。一般设 10-30 轮。过小会导致复杂任务被截断,过大会导致 Token 浪费
- token_budget:成本控制。根据业务场景设定每次交互的 Token 上限
- consecutive_errors:容错阈值。工具偶尔失败是正常的,但连续 3 次以上通常意味着系统性问题
- loop_detected:死循环检测。如果 Agent 连续 N 轮输出相同或高度相似的内容,说明它陷入了无效循环
4. 两种主流 Loop 模式对比
4.1 ReAct 模式
ReAct(Reason + Act) 是目前最主流的 Agent Loop 模式,由 Yao et al. 2022 提出。其核心思想是让 LLM 交替进行推理和行动:
一个典型的 ReAct 执行轨迹(Trace):
Thought: 用户想知道北京今天的天气。我需要调用天气 API。
Action: get_weather(city="北京")
Observation: {"temp": 28, "condition": "晴", "humidity": 45}
Thought: 已经获取到天气数据,我可以直接回答用户。
Answer: 北京今天晴天,气温 28°C,湿度 45%。
ReAct 的优势:
- 每一步都基于最新的观察结果做决策,适应性强
- Thought 过程可见,可解释性好
- 实现简单,与 Tool Calling API 天然契合
ReAct 的劣势:
- 逐步决策,无法全局优化执行顺序
- 每一步都需要一次 LLM 调用,延迟累积
- 对于需要协调多个子任务的复杂场景,容易陷入局部最优
4.2 Plan-then-Execute 模式
与 ReAct 的"走一步看一步"不同,Plan-then-Execute 先生成一个完整的执行计划,然后按计划依次执行:
执行轨迹示例:
Plan:
1. 查询北京天气
2. 查询上海天气
3. 对比两地天气差异
4. 生成出行建议
Execute Step 1: get_weather(city="北京") → {"temp": 28, "condition": "晴"}
Execute Step 2: get_weather(city="上海") → {"temp": 32, "condition": "多云"}
Execute Step 3: (LLM 对比分析)
Execute Step 4: (LLM 生成建议)
Answer: ...
4.3 Trade-off 分析
| 维度 | ReAct | Plan-then-Execute |
|---|---|---|
| 灵活性 | 高。每步实时调整 | 低。偏离计划时需要 Replan |
| LLM 调用次数 | 多(每步一次推理) | 少(规划一次 + 执行时可能不需要 LLM) |
| 可控性 | 低。难以预测执行路径 | 高。计划可审核、可修改 |
| 适合场景 | 工具调用为主、步骤不确定 | 多步骤、有依赖关系、需要全局协调 |
| 错误恢复 | 自然。下一步可以直接修正 | 需要 Replan 机制 |
| 人类干预 | 难以在中途插入 | 容易。可以审核和修改计划 |
实际工程建议: 大多数场景从 ReAct 开始。当你发现 Agent 频繁在多步任务中"迷路"或做出低效的工具调用序列时,再考虑引入 Plan-then-Execute 或混合模式。
5. 状态管理
Control Loop 的状态管理决定了 Agent 的持久性和可恢复性。
5.1 Stateless Agent
Stateless Agent 不维护执行状态,所有上下文通过 message history 传递。
Request 1: [system, user_msg_1] → response_1
Request 2: [system, user_msg_1, response_1, user_2] → response_2
Request 3: [system, user_msg_1, response_1, user_2, response_2, user_3] → response_3
特点:
- 实现最简单,无需持久化
- 每次请求都是自包含的
- message history 不断膨胀,最终超过 Context Window
- 不支持暂停/恢复
这是大多数 "chat completion" 应用的工作方式。适合单轮或短对话场景。
5.2 Stateful Agent
Stateful Agent 维护一个独立的 execution state,它不仅包含 message history,还包含任务进度、中间结果、工具状态等信息。
@dataclass
class ExecutionState:
"""Agent 执行状态"""
session_id: str
status: AgentState
turn_count: int
message_history: list[dict]
# 任务状态
task_goal: str
current_plan: list[str] | None
completed_steps: list[str]
# 资源消耗
total_input_tokens: int
total_output_tokens: int
# 错误追踪
consecutive_errors: int
error_log: list[dict]
# 时间戳
created_at: float
updated_at: float
5.3 状态持久化方案
当 Agent 需要支持暂停/恢复、跨进程执行、或长时间运行时,执行状态必须持久化。
Checkpoint 与恢复 是 Stateful Agent 的核心能力。思路很直接:在每轮循环的关键节点保存一次快照,异常恢复时从最近的快照重新开始。
class CheckpointManager:
def save(self, state: ExecutionState) -> str:
"""保存 checkpoint,返回 checkpoint_id"""
snapshot = {
"state": asdict(state),
"timestamp": time.time(),
}
checkpoint_id = f"{state.session_id}:{state.turn_count}"
self.store.set(checkpoint_id, json.dumps(snapshot))
return checkpoint_id
def restore(self, checkpoint_id: str) -> ExecutionState:
"""从 checkpoint 恢复执行状态"""
snapshot = json.loads(self.store.get(checkpoint_id))
return ExecutionState(**snapshot["state"])
实际系统中,checkpoint 的保存频率需要权衡:
- 每轮都保存:恢复粒度最细,但写入开销大
- 关键节点保存(如每次工具调用前后):开销适中,覆盖最重要的故障场景
- 定时保存:实现简单,但可能丢失最近几轮的状态
6. 完整代码实现
下面是一个最小但完整的 Agent Control Loop 实现。不依赖任何框架,仅使用 Python 标准库 + OpenAI SDK。
"""
Minimal Agent Control Loop
不依赖任何框架,纯 Python + OpenAI SDK
"""
import json
import time
from enum import Enum
from dataclasses import dataclass, field
from openai import OpenAI
class State(Enum):
OBSERVE = "observe"
THINK = "think"
ACT = "act"
REFLECT = "reflect"
DONE = "done"
ERROR = "error"
@dataclass
class LoopContext:
messages: list[dict] = field(default_factory=list)
turn: int = 0
total_tokens: int = 0
consecutive_errors: int = 0
recent_outputs: list[str] = field(default_factory=list)
# ── Tool Registry ────────────────────────────────────
TOOL_FUNCTIONS = {}
def register_tool(name: str, description: str, parameters: dict):
"""装饰器:注册工具函数及其 schema"""
def decorator(fn):
TOOL_FUNCTIONS[name] = {
"fn": fn,
"schema": {
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters,
},
},
}
return fn
return decorator
@register_tool(
name="get_weather",
description="获取指定城市的当前天气",
parameters={
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"},
},
"required": ["city"],
},
)
def get_weather(city: str) -> str:
# 示例实现,实际中调用真实 API
return json.dumps({"city": city, "temp": 28, "condition": "晴"})
# ── Agent Control Loop ───────────────────────────────
class Agent:
def __init__(
self,
system_prompt: str,
model: str = "gpt-4o",
max_turns: int = 15,
token_budget: int = 50_000,
max_consecutive_errors: int = 3,
):
self.client = OpenAI()
self.model = model
self.system_prompt = system_prompt
self.max_turns = max_turns
self.token_budget = token_budget
self.max_errors = max_consecutive_errors
self.tool_schemas = [t["schema"] for t in TOOL_FUNCTIONS.values()]
def run(self, user_input: str) -> str:
ctx = LoopContext()
ctx.messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_input},
]
state = State.THINK # 首轮输入已就绪,直接进入 THINK
while state not in (State.DONE, State.ERROR):
match state:
case State.THINK:
state, ctx = self._think(ctx)
case State.ACT:
state, ctx = self._act(ctx)
case State.REFLECT:
state, ctx = self._reflect(ctx)
ctx.turn += 1
# 提取最终回答
for msg in reversed(ctx.messages):
if msg["role"] == "assistant" and msg.get("content"):
return msg["content"]
return "[Agent finished without a final answer]"
def _think(self, ctx: LoopContext) -> tuple[State, LoopContext]:
"""调用 LLM 推理"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=ctx.messages,
tools=self.tool_schemas or None,
)
except Exception as e:
ctx.consecutive_errors += 1
ctx.messages.append({
"role": "assistant",
"content": f"[LLM Error] {e}",
})
if ctx.consecutive_errors >= self.max_errors:
return State.ERROR, ctx
return State.THINK, ctx # 重试
# 记录 token 消耗
usage = response.usage
ctx.total_tokens += (usage.prompt_tokens + usage.completion_tokens)
ctx.consecutive_errors = 0
choice = response.choices[0]
assistant_msg = choice.message.model_dump()
ctx.messages.append(assistant_msg)
# 决定下一状态
if choice.message.tool_calls:
return State.ACT, ctx
else:
return State.DONE, ctx
def _act(self, ctx: LoopContext) -> tuple[State, LoopContext]:
"""执行工具调用"""
assistant_msg = ctx.messages[-1]
tool_calls = assistant_msg.get("tool_calls", [])
for tc in tool_calls:
fn_name = tc["function"]["name"]
fn_args = json.loads(tc["function"]["arguments"])
tool_entry = TOOL_FUNCTIONS.get(fn_name)
if not tool_entry:
result = f"Error: unknown tool '{fn_name}'"
else:
try:
result = tool_entry["fn"](**fn_args)
except Exception as e:
result = f"Error: tool '{fn_name}' raised {type(e).__name__}: {e}"
ctx.consecutive_errors += 1
ctx.messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": str(result),
})
return State.REFLECT, ctx
def _reflect(self, ctx: LoopContext) -> tuple[State, LoopContext]:
"""反思:检查终止条件"""
# 最大轮次
if ctx.turn >= self.max_turns:
ctx.messages.append({
"role": "assistant",
"content": "[Agent stopped: max turns exceeded]",
})
return State.ERROR, ctx
# Token 预算
if ctx.total_tokens >= self.token_budget:
ctx.messages.append({
"role": "assistant",
"content": "[Agent stopped: token budget exceeded]",
})
return State.ERROR, ctx
# 连续错误
if ctx.consecutive_errors >= self.max_errors:
return State.ERROR, ctx
# 死循环检测:最近 3 次输出相同
tool_results = [
m["content"] for m in ctx.messages[-6:]
if m.get("role") == "tool"
]
if len(tool_results) >= 3 and len(set(tool_results[-3:])) == 1:
ctx.messages.append({
"role": "assistant",
"content": "[Agent stopped: loop detected]",
})
return State.ERROR, ctx
# 继续下一轮推理
return State.THINK, ctx
# ── 使用示例 ─────────────────────────────────────────
if __name__ == "__main__":
agent = Agent(
system_prompt="你是一个天气助手。使用 get_weather 工具回答天气问题。",
max_turns=10,
)
answer = agent.run("北京今天天气怎么样?")
print(answer)
这段代码约 130 行,涵盖了 Control Loop 的所有核心要素:
- 状态机驱动的循环控制
- 工具注册与动态调用
- LLM 异常重试
- Token 消耗追踪
- 多种终止条件(max_turns / token_budget / consecutive_errors / loop_detected)
- 工具执行错误处理
它不是生产级代码,但足以说明 Control Loop 的核心机制。在此基础上增加异步执行、状态持久化、日志追踪,就能逐步演进为生产级实现。
7. 错误处理策略
生产环境中,Agent Control Loop 最常遇到的四类错误:
7.1 Tool 调用失败
工具调用失败是最高频的错误。正确的处理方式不是抛异常终止,而是将错误信息作为 Observation 返回给 LLM,让它决定如何应对。
# 错误的做法:直接终止
try:
result = call_tool(name, args)
except Exception:
raise # Agent 直接崩溃
# 正确的做法:将错误反馈给 LLM
try:
result = call_tool(name, args)
except TimeoutError:
result = "Tool timed out after 30s. Consider using different parameters."
except ValueError as e:
result = f"Invalid arguments: {e}. Please check parameter types."
except Exception as e:
result = f"Tool failed: {type(e).__name__}: {e}"
LLM 在收到错误信息后,通常能自主修正——换一组参数重试、换一个工具、或者告知用户当前无法完成任务。
7.2 LLM 返回格式异常
LLM 偶尔会返回不符合预期的格式:JSON 不合法、tool_call 参数缺失、content 为空等。
def _parse_tool_call_safe(self, tool_call) -> tuple[str, dict]:
"""安全解析工具调用参数"""
name = tool_call.function.name
try:
args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
# LLM 返回了非法 JSON,尝试修复或跳过
args = {}
self.logger.warning(
f"Invalid JSON in tool_call arguments: "
f"{tool_call.function.arguments}"
)
return name, args
7.3 超时处理
整个 Agent 执行需要有全局超时,防止无限挂起:
import signal
class TimeoutError(Exception):
pass
def run_with_timeout(fn, timeout_seconds: int, *args, **kwargs):
"""为函数执行添加超时限制"""
def handler(signum, frame):
raise TimeoutError(f"Execution timed out after {timeout_seconds}s")
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_seconds)
try:
return fn(*args, **kwargs)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
7.4 死循环检测
当 Agent 陷入死循环时,它会反复执行相同的操作序列。检测策略:
def _detect_loop(self, messages: list[dict], window: int = 6) -> bool:
"""检测 Agent 是否陷入重复循环"""
recent = messages[-window:]
# 策略 1:完全重复检测
contents = [m.get("content", "") for m in recent if m["role"] == "assistant"]
if len(contents) >= 3 and len(set(contents[-3:])) == 1:
return True
# 策略 2:工具调用序列重复检测
tool_calls = []
for m in recent:
if m.get("tool_calls"):
for tc in m["tool_calls"]:
tool_calls.append(f"{tc['function']['name']}:{tc['function']['arguments']}")
if len(tool_calls) >= 4:
half = len(tool_calls) // 2
if tool_calls[:half] == tool_calls[half:2*half]:
return True
return False
8. 性能考量
8.1 Token 消耗与循环次数的关系
Agent Control Loop 的 Token 消耗不是线性增长,而是二次增长——因为每一轮都要携带之前所有轮次的 message history。
轮次 新增消息 Token 累计 Context Token 本轮总消耗
1 T S + T S + T
2 T S + 2T S + 2T
3 T S + 3T S + 3T
...
N T S + NT S + NT
总消耗 = N*S + T*(1+2+...+N) = N*S + T*N*(N+1)/2
其中 S = System Prompt Token 数,T = 平均每轮消息 Token 数
这意味着 10 轮的 Agent 消耗的 Token 不是 1 轮的 10 倍,而可能是 55 倍。这对成本控制至关重要。
8.2 Context Window 膨胀问题
随着轮次增加,Context Window 持续膨胀,导致:
- 延迟增加:LLM 推理时间与输入 Token 数正相关
- 成本增加:按 Token 计费,输入越长越贵
- 质量下降:过长的 Context 会导致 LLM "注意力分散",关键信息被淹没(lost in the middle 问题)
8.3 消息压缩/摘要策略
应对 Context Window 膨胀的核心策略:
策略一:滑动窗口
只保留最近 K 轮对话,丢弃更早的历史。简单粗暴但有效。
def _sliding_window(self, messages: list[dict], keep_last: int = 10) -> list[dict]:
system_msgs = [m for m in messages if m["role"] == "system"]
non_system = [m for m in messages if m["role"] != "system"]
return system_msgs + non_system[-keep_last:]
策略二:摘要压缩
当 message history 超过阈值时,用 LLM 对早期对话生成摘要,替换原始消息。
def _compress_history(self, messages: list[dict], threshold: int = 20) -> list[dict]:
if len(messages) <= threshold:
return messages
# 将早期消息压缩为摘要
early = messages[1:-threshold] # 跳过 system prompt,保留最近的
summary_prompt = (
"请用 3-5 句话总结以下对话的关键信息和已完成的操作:\n"
+ "\n".join(m.get("content", "") for m in early if m.get("content"))
)
summary = self.client.chat.completions.create(
model="gpt-4o-mini", # 用小模型做摘要,节省成本
messages=[{"role": "user", "content": summary_prompt}],
).choices[0].message.content
return (
[messages[0]] # system prompt
+ [{"role": "system", "content": f"[Earlier conversation summary] {summary}"}]
+ messages[-threshold:]
)
策略三:选择性保留
不是所有消息都同等重要。工具的原始返回值(可能非常长)通常可以只保留摘要:
def _trim_tool_results(self, messages: list[dict], max_len: int = 500) -> list[dict]:
"""截断过长的工具返回值"""
trimmed = []
for m in messages:
if m["role"] == "tool" and len(m.get("content", "")) > max_len:
m = {**m, "content": m["content"][:max_len] + "\n...[truncated]"}
trimmed.append(m)
return trimmed
三种策略的对比:
| 策略 | 信息保留 | 实现成本 | Token 节省 | 适用场景 |
|---|---|---|---|---|
| 滑动窗口 | 低 | 极低 | 高 | 短对话、工具调用为主 |
| 摘要压缩 | 中 | 中(需要额外 LLM 调用) | 高 | 长对话、需要历史上下文 |
| 选择性保留 | 高 | 低 | 中 | 工具返回值较大的场景 |
实际工程中,通常组合使用:先用选择性保留截断大结果,再用滑动窗口控制总长度,在关键节点用摘要压缩保留全局上下文。
9. 异步工具调用的完整错误处理
在实际系统中,工具调用通常不是串行执行,而是需要并行化处理多个工具调用。然而并行执行引入了新的复杂性:如何优雅地处理部分工具失败、全部失败、或部分超时的场景?
9.1 并行工具调用的三种失败模式
模式一:部分失败 — 多个工具中某些成功、某些失败
模式二:全部失败 — 所有工具调用都无法完成
模式三:部分超时 — 某些工具超时、某些成功、某些异常
每种模式都需要不同的重试和降级策略。
9.2 完整的异步工具调用实现
import asyncio
from typing import Any
from dataclasses import dataclass
from enum import Enum
class ToolResultStatus(Enum):
SUCCESS = "success"
TIMEOUT = "timeout"
ERROR = "error"
PARTIAL = "partial"
@dataclass
class ToolResult:
"""工具执行结果的统一表示"""
tool_name: str
tool_call_id: str
status: ToolResultStatus
content: str
duration_ms: float
error_type: str = None
retry_count: int = 0
class AsyncToolExecutor:
"""并行工具调用执行器,支持重试、超时、降级"""
def __init__(
self,
timeout_seconds: float = 30,
max_retries: int = 2,
fallback_on_timeout: bool = True,
):
self.timeout = timeout_seconds
self.max_retries = max_retries
self.fallback_on_timeout = fallback_on_timeout
async def execute_tool_calls(
self,
tool_calls: list[dict],
tool_registry: dict[str, callable],
) -> tuple[list[ToolResult], str]:
"""
并行执行多个工具调用,返回结果列表和错误聚合报告
Args:
tool_calls: 从 LLM 返回的 tool_calls 列表
tool_registry: 工具名称 -> 可调用对象的映射
Returns:
(results, error_report)
"""
tasks = []
for tc in tool_calls:
task = asyncio.create_task(
self._execute_single_with_retry(tc, tool_registry)
)
tasks.append(task)
# 使用 gather with return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)
tool_results = []
error_messages = []
for i, result in enumerate(results):
if isinstance(result, Exception):
tool_result = ToolResult(
tool_name="[unknown]",
tool_call_id=tool_calls[i].get("id", f"call_{i}"),
status=ToolResultStatus.ERROR,
content=f"Task execution failed: {type(result).__name__}: {result}",
duration_ms=0,
error_type=type(result).__name__,
)
error_messages.append(tool_result.content)
else:
tool_result = result
tool_results.append(tool_result)
if tool_result.status != ToolResultStatus.SUCCESS:
error_messages.append(
f"[{tool_result.tool_name}] {tool_result.status.value}: "
f"{tool_result.content[:100]}"
)
error_report = self._generate_error_report(tool_results, error_messages)
return tool_results, error_report
async def _execute_single_with_retry(
self,
tool_call: dict,
tool_registry: dict[str, callable],
) -> ToolResult:
"""单个工具调用的重试逻辑"""
tool_name = tool_call["function"]["name"]
tool_call_id = tool_call["id"]
if tool_name not in tool_registry:
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_call_id,
status=ToolResultStatus.ERROR,
content=f"Tool not found: {tool_name}",
duration_ms=0,
error_type="ToolNotFound",
)
fn = tool_registry[tool_name]
args = self._parse_args_safe(tool_call)
for attempt in range(self.max_retries + 1):
try:
start = asyncio.get_event_loop().time()
result = await asyncio.wait_for(
self._call_tool_async(fn, args),
timeout=self.timeout,
)
duration_ms = (asyncio.get_event_loop().time() - start) * 1000
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_call_id,
status=ToolResultStatus.SUCCESS,
content=str(result),
duration_ms=duration_ms,
retry_count=attempt,
)
except asyncio.TimeoutError:
duration_ms = self.timeout * 1000
if attempt < self.max_retries and self.fallback_on_timeout:
fallback_args = self._create_fallback_args(args)
args = fallback_args
continue
else:
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_call_id,
status=ToolResultStatus.TIMEOUT,
content=(
f"Tool execution timed out after {self.timeout}s. "
f"Consider retrying with simpler parameters."
),
duration_ms=duration_ms,
error_type="TimeoutError",
retry_count=attempt,
)
except Exception as e:
duration_ms = (asyncio.get_event_loop().time() - start) * 1000 if 'start' in locals() else 0
if attempt < self.max_retries and self._is_retryable_error(e):
await asyncio.sleep(2 ** attempt)
continue
else:
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_call_id,
status=ToolResultStatus.ERROR,
content=f"{type(e).__name__}: {str(e)[:200]}",
duration_ms=duration_ms,
error_type=type(e).__name__,
retry_count=attempt,
)
return ToolResult(
tool_name=tool_name,
tool_call_id=tool_call_id,
status=ToolResultStatus.ERROR,
content="Unexpected: max retries exceeded",
duration_ms=0,
retry_count=self.max_retries + 1,
)
async def _call_tool_async(self, fn: callable, args: dict) -> Any:
"""同步函数转异步"""
if asyncio.iscoroutinefunction(fn):
return await fn(**args)
else:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: fn(**args))
def _parse_args_safe(self, tool_call: dict) -> dict:
"""安全解析工具参数"""
import json
try:
return json.loads(tool_call["function"]["arguments"])
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in tool arguments: {e}")
def _is_retryable_error(self, e: Exception) -> bool:
"""判断异常是否值得重试"""
retryable = (ConnectionError, TimeoutError, OSError)
return isinstance(e, retryable) or "temporarily unavailable" in str(e).lower()
def _create_fallback_args(self, args: dict) -> dict:
"""为超时的工具创建简化参数"""
fallback = args.copy()
for key in ["limit", "max_results", "count"]:
if key in fallback and isinstance(fallback[key], int):
fallback[key] = max(1, fallback[key] // 2)
return fallback
def _generate_error_report(
self,
results: list[ToolResult],
error_messages: list[str],
) -> str:
"""生成供 LLM 理解的错误聚合报告"""
if not error_messages:
return ""
summary_by_status = {}
for r in results:
status = r.status.value
if status not in summary_by_status:
summary_by_status[status] = []
summary_by_status[status].append(r.tool_name)
report = "## Tool Execution Summary\n"
for status, tools in summary_by_status.items():
report += f"- **{status.upper()}**: {', '.join(tools)}\n"
if error_messages:
report += "\n### Error Details\n"
for msg in error_messages[:5]:
report += f"- {msg}\n"
return report
这个实现的关键特点:
- 返回异常不中断:用
return_exceptions=True让单个工具失败不导致整个并行执行失败 - 三层重试机制:超时可以用简化参数重试,某些异常可以指数退避重试
- 错误聚合报告:将所有错误汇总为结构化的报告,供 LLM 理解和决策
- 统一结果表示:所有结果都用
ToolResult统一表示,便于下游处理 - 同步/异步兼容:通过
run_in_executor在线程池中执行同步工具
10. 多模态输入的 OBSERVE 归一化
OBSERVE 阶段通常假设输入是纯文本。但在实际应用中,Agent 需要处理图片、PDF、音频等多模态输入。如何在 OBSERVE 阶段统一归一化这些不同格式的输入?
10.1 多模态输入的挑战
- 图片:可能包含图表、截图、照片,需要视觉理解转为文本描述
- PDF:可能有复杂版面、表格、图片混合,需要结构化提取
- 音频:需要转文字(语音识别),可能有背景噪音
- 视频:需要关键帧提取 + 转文本,数据量大
10.2 统一的 ObservationResult 格式
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import base64
class InputModality(Enum):
TEXT = "text"
IMAGE = "image"
PDF = "pdf"
AUDIO = "audio"
VIDEO = "video"
MIXED = "mixed"
@dataclass
class ObservationResult:
"""OBSERVE 阶段的统一输出格式"""
modality: InputModality
normalized_text: str
raw_data: Optional[bytes] = None
metadata: dict = None
extraction_confidence: float = 1.0
class MultimodalObserver:
"""多模态输入的 OBSERVE 处理器"""
def __init__(self, vision_model: str = "gpt-4o"):
self.vision_model = vision_model
self.openai_client = None
async def observe(
self,
input_data: any,
input_type: InputModality = None,
) -> ObservationResult:
"""接收多模态输入,归一化为 ObservationResult"""
if input_type is None:
input_type = self._infer_modality(input_data)
match input_type:
case InputModality.TEXT:
return await self._observe_text(input_data)
case InputModality.IMAGE:
return await self._observe_image(input_data)
case InputModality.PDF:
return await self._observe_pdf(input_data)
case InputModality.AUDIO:
return await self._observe_audio(input_data)
case InputModality.VIDEO:
return await self._observe_video(input_data)
case _:
raise ValueError(f"Unsupported modality: {input_type}")
async def _observe_text(self, text: str) -> ObservationResult:
"""纯文本输入"""
return ObservationResult(
modality=InputModality.TEXT,
normalized_text=text.strip(),
metadata={"length": len(text)},
extraction_confidence=1.0,
)
async def _observe_image(self, image_data: bytes) -> ObservationResult:
"""图片输入:使用 Vision 模型转为文本描述"""
try:
from PIL import Image
import io
img = Image.open(io.BytesIO(image_data))
img_width, img_height = img.size
img_format = img.format
except Exception as e:
return ObservationResult(
modality=InputModality.IMAGE,
normalized_text=f"[Failed to parse image: {e}]",
extraction_confidence=0.0,
)
img_b64 = base64.b64encode(image_data).decode("utf-8")
self._ensure_openai_client()
try:
response = self.openai_client.chat.completions.create(
model=self.vision_model,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": "Please analyze this image and provide: 1) Main objects and their relationships, 2) Any visible text, 3) Key insights, 4) Overall context.",
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/{img_format.lower()};base64,{img_b64}",
},
},
],
}
],
max_tokens=1024,
)
description = response.choices[0].message.content
confidence = 0.9
except Exception as e:
description = f"[Vision analysis failed: {e}]"
confidence = 0.5
return ObservationResult(
modality=InputModality.IMAGE,
normalized_text=description,
raw_data=image_data,
metadata={"width": img_width, "height": img_height, "format": img_format},
extraction_confidence=confidence,
)
async def _observe_pdf(self, pdf_data: bytes) -> ObservationResult:
"""PDF 输入:提取关键段落和信息"""
try:
import PyPDF2
import io
pdf_file = io.BytesIO(pdf_data)
reader = PyPDF2.PdfReader(pdf_file)
num_pages = len(reader.pages)
extracted_text = []
for i, page in enumerate(reader.pages[:min(5, num_pages)]):
text = page.extract_text()
if text.strip():
extracted_text.append(f"[Page {i+1}]\n{text}")
full_text = "\n".join(extracted_text)
if len(full_text) > 2000:
self._ensure_openai_client()
summary_resp = self.openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Summarize:\n\n{full_text[:2000]}"}],
max_tokens=300,
)
normalized = summary_resp.choices[0].message.content
confidence = 0.75
else:
normalized = full_text
confidence = 0.9
except Exception as e:
normalized = f"[PDF extraction failed: {e}]"
confidence = 0.0
return ObservationResult(
modality=InputModality.PDF,
normalized_text=normalized,
raw_data=pdf_data,
metadata={"pages": num_pages},
extraction_confidence=confidence,
)
async def _observe_audio(self, audio_data: bytes) -> ObservationResult:
"""音频输入:转文字(语音识别)"""
try:
import io
self._ensure_openai_client()
transcript = self.openai_client.audio.transcriptions.create(
model="whisper-1",
file=("audio.wav", io.BytesIO(audio_data), "audio/wav"),
)
normalized_text = transcript.text.strip()
confidence = 0.85
except Exception as e:
normalized_text = f"[Audio transcription failed: {e}]"
confidence = 0.0
return ObservationResult(
modality=InputModality.AUDIO,
normalized_text=normalized_text,
raw_data=audio_data,
metadata={"transcription_model": "whisper-1"},
extraction_confidence=confidence,
)
async def _observe_video(self, video_data: bytes) -> ObservationResult:
"""视频输入:提取关键帧并生成描述"""
try:
import cv2
import numpy as np
import io
nparr = np.frombuffer(video_data, np.uint8)
cap = cv2.VideoCapture(io.BytesIO(video_data))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
duration_sec = frame_count / fps if fps > 0 else 0
frame_descriptions = []
frame_indices = []
frame_step = max(1, frame_count // 10)
for frame_idx in range(0, frame_count, frame_step):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
continue
ret, buf = cv2.imencode(".jpg", frame)
frame_b64 = base64.b64encode(buf).decode("utf-8")
self._ensure_openai_client()
response = self.openai_client.chat.completions.create(
model=self.vision_model,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Describe what's happening in this frame."},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{frame_b64}"},
},
],
}
],
max_tokens=100,
)
desc = response.choices[0].message.content
frame_descriptions.append(desc)
frame_indices.append(int(frame_idx / fps))
cap.release()
summary = "Video timeline:\n"
for ts, desc in zip(frame_indices, frame_descriptions):
summary += f"[{ts}s] {desc}\n"
confidence = 0.7
except Exception as e:
summary = f"[Video analysis failed: {e}]"
confidence = 0.0
return ObservationResult(
modality=InputModality.VIDEO,
normalized_text=summary,
raw_data=video_data,
metadata={"duration_sec": duration_sec, "frame_count": frame_count},
extraction_confidence=confidence,
)
def _infer_modality(self, input_data: any) -> InputModality:
"""根据输入数据推断模态类型"""
if isinstance(input_data, str):
if input_data.startswith(("http://", "https://", "/")):
if input_data.endswith(".pdf"):
return InputModality.PDF
elif input_data.endswith((".mp3", ".wav", ".m4a")):
return InputModality.AUDIO
elif input_data.endswith((".mp4", ".mov", ".avi")):
return InputModality.VIDEO
elif input_data.endswith((".jpg", ".png", ".gif")):
return InputModality.IMAGE
return InputModality.TEXT
if isinstance(input_data, bytes):
if input_data.startswith(b"\x89PNG"):
return InputModality.IMAGE
elif input_data.startswith(b"\xff\xd8\xff"):
return InputModality.IMAGE
elif input_data.startswith(b"%PDF"):
return InputModality.PDF
elif input_data.startswith(b"ID3") or input_data.startswith(b"\xff\xfb"):
return InputModality.AUDIO
return InputModality.AUDIO
return InputModality.TEXT
def _ensure_openai_client(self):
"""延迟初始化 OpenAI 客户端"""
if self.openai_client is None:
from openai import AsyncOpenAI
self.openai_client = AsyncOpenAI()
这个设计的优势:
- 模态无关:Agent 只需处理文本,多模态转换在 OBSERVE 层完成
- 质量追踪:每个 ObservationResult 都有 extraction_confidence,帮助 Agent 决定是否需要人工审核
- 可降级:当某种模态的识别失败时,可以回退到原始数据或要求用户提供文本版本
- 可扩展:新增模态只需新增一个 observe* 方法
11. 增强版死循环检测
原有的死循环检测基于"输出重复"。但在复杂任务中,Agent 可能在做有意义的工作,但效率很低——例如,逐个检查潜在的解决方案,每次都获得一点新信息,但总体没有取得进展。这需要基于效率指标的智能检测机制。
11.1 效率指标与 progress_score
关键观察:如果 Agent 在真正取得进展,那么每一轮应该产生新信息增益。反之,如果连续 N 轮都没有新信息,那就是死循环。
定义 progress_score 的几种方式:
- 信息熵增益:每轮消息的平均信息量变化
- 工具调用多样性:最近 N 次工具调用的不同工具比例
- 工具参数变化:工具参数与之前调用的差异程度
- 答案置信度:LLM 在答案中表达的置信程度变化
from dataclasses import dataclass
import hashlib
@dataclass
class ProgressMetrics:
"""单轮的进度指标"""
round_num: int
tool_calls: list[str]
tool_args_hash: str
output_entropy: float
answer_confidence: float
is_retry: bool
class ProgressTracker:
"""基于效率指标的智能进度追踪器"""
def __init__(
self,
window_size: int = 5,
progress_threshold: float = 0.3,
loop_trigger_rounds: int = 3,
):
self.window_size = window_size
self.threshold = progress_threshold
self.trigger_rounds = loop_trigger_rounds
self.history: list[ProgressMetrics] = []
self.seen_tool_hashes: dict[str, int] = {}
def evaluate_round(
self,
messages: list[dict],
round_num: int,
) -> tuple[float, dict]:
"""评估一轮的进度分数"""
last_assistant_msg = None
last_tool_calls = []
for msg in reversed(messages):
if msg["role"] == "assistant" and last_assistant_msg is None:
last_assistant_msg = msg
if msg.get("tool_calls"):
last_tool_calls = msg["tool_calls"]
break
if last_assistant_msg is None:
return 0.5, {}
tool_diversity_score = self._compute_tool_diversity(last_tool_calls, messages)
parameter_novelty_score = self._compute_parameter_novelty(last_tool_calls)
output_entropy = self._compute_entropy(last_assistant_msg.get("content", ""))
confidence = self._estimate_confidence(last_assistant_msg.get("content", ""))
is_retry = self._is_retry(last_tool_calls)
progress_score = (
tool_diversity_score * 0.3 +
parameter_novelty_score * 0.25 +
output_entropy * 0.2 +
confidence * 0.15 +
(0.0 if is_retry else 0.1)
)
metrics = ProgressMetrics(
round_num=round_num,
tool_calls=[tc["function"]["name"] for tc in last_tool_calls],
tool_args_hash=self._hash_tool_calls(last_tool_calls),
output_entropy=output_entropy,
answer_confidence=confidence,
is_retry=is_retry,
)
self.history.append(metrics)
return progress_score, {
"tool_diversity": tool_diversity_score,
"parameter_novelty": parameter_novelty_score,
"entropy": output_entropy,
"confidence": confidence,
"is_retry": is_retry,
"combined_score": progress_score,
}
def detect_loop(self) -> tuple[bool, str]:
"""检测是否陷入死循环"""
if len(self.history) < self.trigger_rounds:
return False, ""
recent_scores = [
self._compute_round_score(m) for m in self.history[-self.trigger_rounds :]
]
avg_score = sum(recent_scores) / len(recent_scores)
if avg_score < self.threshold:
return True, (
f"Low progress detected: average progress_score = {avg_score:.2f} "
f"(threshold = {self.threshold}). Agent appears stuck in inefficient loop."
)
if len(self.history) >= self.window_size:
recent_hashes = [m.tool_args_hash for m in self.history[-self.window_size :]]
if self._is_pattern_loop(recent_hashes):
return True, f"Repetitive tool calling pattern detected. Agent stuck in loop."
return False, ""
def _compute_tool_diversity(self, tool_calls: list[dict], all_messages: list[dict]) -> float:
"""计算工具多样性分数"""
if not tool_calls:
return 0.3
current_tools = set(tc["function"]["name"] for tc in tool_calls)
recent_tools = set()
tool_count = 0
for msg in reversed(all_messages):
if msg["role"] == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
recent_tools.add(tc["function"]["name"])
tool_count += 1
if tool_count >= 3:
break
new_tools = current_tools - recent_tools
diversity_ratio = len(new_tools) / max(len(current_tools), 1)
return min(1.0, 0.3 + diversity_ratio * 0.7)
def _compute_parameter_novelty(self, tool_calls: list[dict]) -> float:
"""计算参数新颖性"""
if not tool_calls:
return 0.5
current_hash = self._hash_tool_calls(tool_calls)
if current_hash in self.seen_tool_hashes:
repeat_count = self.seen_tool_hashes[current_hash]
novelty = max(0.0, 1.0 - repeat_count * 0.25)
else:
novelty = 1.0
self.seen_tool_hashes[current_hash] = self.seen_tool_hashes.get(current_hash, 0) + 1
return novelty
def _compute_entropy(self, text: str) -> float:
"""计算文本的信息熵"""
if not text:
return 0.0
from collections import Counter
import math
char_counts = Counter(text.lower())
total = sum(char_counts.values())
entropy = 0.0
for count in char_counts.values():
p = count / total
entropy -= p * math.log2(p) if p > 0 else 0
normalized_entropy = min(1.0, entropy / 5.5)
return normalized_entropy
def _estimate_confidence(self, text: str) -> float:
"""启发式估计答案置信度"""
high_confidence_words = [
"definitely", "certainly", "absolutely", "confirmed", "verified",
"确定", "肯定", "已验证", "已确认",
]
low_confidence_words = [
"maybe", "possibly", "unclear", "uncertain",
"可能", "不确定", "不清楚", "似乎",
]
text_lower = text.lower()
high_count = sum(1 for w in high_confidence_words if w in text_lower)
low_count = sum(1 for w in low_confidence_words if w in text_lower)
if high_count + low_count == 0:
confidence = 0.5
else:
confidence = (high_count - low_count) / (high_count + low_count + 1)
confidence = (confidence + 1) / 2
return max(0.0, min(1.0, confidence))
def _is_retry(self, tool_calls: list[dict]) -> bool:
"""检测是否在重试"""
if len(self.history) < 1 or not tool_calls:
return False
last_hash = self._hash_tool_calls(tool_calls)
prev_hash = self.history[-1].tool_args_hash if self.history else None
return last_hash == prev_hash
def _hash_tool_calls(self, tool_calls: list[dict]) -> str:
"""为工具调用序列生成哈希"""
import json
call_str = json.dumps(
[{"name": tc["function"]["name"], "args": tc["function"]["arguments"]}
for tc in tool_calls],
sort_keys=True,
)
return hashlib.md5(call_str.encode()).hexdigest()
def _compute_round_score(self, metrics: ProgressMetrics) -> float:
"""从 ProgressMetrics 计算综合得分"""
return (
self._compute_tool_diversity_from_history() * 0.3 +
metrics.output_entropy * 0.2 +
metrics.answer_confidence * 0.15 +
(0.0 if metrics.is_retry else 0.1)
)
def _is_pattern_loop(self, hashes: list[str]) -> bool:
"""检测哈希序列中是否存在循环模式"""
if len(hashes) < 4:
return False
for i in range(len(hashes) - 3):
if (hashes[i] == hashes[i + 2] and hashes[i + 1] == hashes[i + 3]):
return True
return False
def _compute_tool_diversity_from_history(self) -> float:
"""基于历史的工具多样性分数"""
if not self.history:
return 0.5
tools = set()
for m in self.history[-5:]:
tools.update(m.tool_calls)
return min(1.0, len(tools) / 5.0)
def get_diagnostics(self) -> dict:
"""返回诊断信息"""
if not self.history:
return {}
recent = self.history[-self.trigger_rounds :]
scores = [self._compute_round_score(m) for m in recent]
return {
"recent_scores": scores,
"average_score": sum(scores) / len(scores) if scores else 0,
"threshold": self.threshold,
"history_length": len(self.history),
}
这个增强版死循环检测的优势:多维度评估、阈值自适应、可诊断、避免误判。
12. 分布式 Stateful Agent 的考量
到目前为止,我们假设 Agent 运行在单进程中,所有状态存储在内存。但在生产环境中,Agent 经常需要跨多个 Worker、多个地理位置、多个时间段运行——这时候分布式状态一致性成为关键问题。
12.1 单进程假设的局限
在分布式环境中引入的问题:
- 竞态条件:多个 Worker 同时修改同一个 Agent 状态
- 不可见的依赖:Worker A 和 Worker B 可能看到不同版本的状态
- 崩溃恢复:某个 Worker 崩溃,其他 Worker 能否接管?
- 强一致性 vs 最终一致性:如何权衡?
12.2 三种分布式状态管理方案
方案 A:分布式锁(Pessimistic Locking)
核心思路:任何时候只有一个 Worker 可以修改某个 Agent 的状态
import redis
from contextlib import contextmanager
class DistributedStatefulAgent:
"""使用分布式锁的 Stateful Agent"""
def __init__(self, agent_id: str, redis_client: redis.Redis):
self.agent_id = agent_id
self.redis = redis_client
self.lock_key = f"agent:{agent_id}:lock"
self.state_key = f"agent:{agent_id}:state"
@contextmanager
def acquire_lock(self, timeout_seconds: int = 30, blocking: bool = True):
"""获取分布式锁"""
acquired = self.redis.set(
self.lock_key,
"1",
ex=timeout_seconds,
nx=True,
)
if not acquired and not blocking:
raise RuntimeError(f"Cannot acquire lock for agent {self.agent_id}")
while not acquired:
import time
time.sleep(0.1)
acquired = self.redis.set(
self.lock_key,
"1",
ex=timeout_seconds,
nx=True,
)
try:
yield
finally:
self.redis.delete(self.lock_key)
def save_state(self, state: dict) -> None:
"""保存状态到 Redis"""
import json
self.redis.set(self.state_key, json.dumps(state))
def load_state(self) -> dict:
"""从 Redis 加载状态"""
import json
data = self.redis.get(self.state_key)
return json.loads(data) if data else {}
async def run_step(self, user_input: str) -> str:
"""单步执行:获取锁 -> 加载状态 -> 执行 -> 保存状态 -> 释放锁"""
with self.acquire_lock(timeout_seconds=60):
state = self.load_state()
new_state, output = await self._execute_round(state, user_input)
self.save_state(new_state)
return output
优点: 简单直观,不会有并发冲突。缺点: 性能受锁竞争影响,不适合长时间持有锁。
方案 B:乐观并发控制(Optimistic Locking with Versioning)
核心思路:不用锁,但为状态加版本号。如果版本冲突,则拒绝修改
import json
from dataclasses import asdict, dataclass
@dataclass
class VersionedState:
"""带版本号的状态"""
data: dict
version: int
last_modified_by: str
timestamp: float
class OptimisticStatefulAgent:
"""使用版本号的乐观并发控制"""
def __init__(self, agent_id: str, redis_client: redis.Redis, worker_id: str):
self.agent_id = agent_id
self.redis = redis_client
self.worker_id = worker_id
self.state_key = f"agent:{agent_id}:state"
def load_state(self) -> VersionedState:
"""加载带版本号的状态"""
import time
data = self.redis.get(self.state_key)
if not data:
return VersionedState(data={}, version=0, last_modified_by=self.worker_id, timestamp=time.time())
return VersionedState(**json.loads(data))
def save_state_or_conflict(self, new_state: VersionedState) -> bool:
"""尝试保存状态,若版本冲突则返回 False"""
lua_script = """
local current = redis.call('GET', KEYS[1])
if not current then
redis.call('SET', KEYS[1], ARGV[1])
return 1
end
local current_version = cjson.decode(current).version
local new_version = cjson.decode(ARGV[1]).version
if current_version + 1 == new_version then
redis.call('SET', KEYS[1], ARGV[1])
return 1
else
return 0
end
"""
result = self.redis.eval(
lua_script,
1,
self.state_key,
json.dumps(asdict(new_state), default=str),
)
return bool(result)
async def run_step(self, user_input: str) -> str:
"""乐观并发执行"""
max_retries = 3
for attempt in range(max_retries):
state = self.load_state()
new_state, output = await self._execute_round(state, user_input)
new_state.version = state.version + 1
new_state.last_modified_by = self.worker_id
if self.save_state_or_conflict(new_state):
return output
else:
import asyncio
await asyncio.sleep(0.1 * (2 ** attempt))
raise RuntimeError(f"Failed to save state after {max_retries} retries.")
优点: 无锁,并发性能高。缺点: 需要处理冲突重试,冲突率可能较高。
方案 C:事件溯源(Event Sourcing)
核心思路:不直接保存状态,而是保存一序列的不可变事件
from dataclasses import asdict, dataclass
from enum import Enum
class EventType(Enum):
ROUND_STARTED = "round_started"
LLM_CALLED = "llm_called"
TOOL_EXECUTED = "tool_executed"
ROUND_COMPLETED = "round_completed"
@dataclass
class Event:
"""不可变事件"""
event_type: EventType
timestamp: float
worker_id: str
data: dict
class EventSourcedAgent:
"""基于事件溯源的分布式 Agent"""
def __init__(self, agent_id: str, redis_client: redis.Redis, worker_id: str):
self.agent_id = agent_id
self.redis = redis_client
self.worker_id = worker_id
self.events_key = f"agent:{agent_id}:events"
def append_event(self, event: Event) -> None:
"""添加事件(追加写入,无法修改)"""
import json
import time
event.timestamp = time.time()
self.redis.rpush(self.events_key, json.dumps(asdict(event), default=str))
def replay_events(self) -> dict:
"""从头重放所有事件,重构当前状态"""
import json
state = {"turns": 0, "messages": [], "tool_results": [], "status": "running"}
events_data = self.redis.lrange(self.events_key, 0, -1)
for event_json in events_data:
event = Event(**json.loads(event_json))
match event.event_type:
case EventType.ROUND_STARTED:
state["turns"] += 1
case EventType.LLM_CALLED:
state["messages"].append(event.data["response"])
case EventType.TOOL_EXECUTED:
state["tool_results"].append({
"tool": event.data["tool_name"],
"result": event.data["result"],
})
case EventType.ROUND_COMPLETED:
state["status"] = "idle"
return state
async def run_step(self, user_input: str) -> str:
"""事件溯源执行"""
import time
self.append_event(Event(
event_type=EventType.ROUND_STARTED,
timestamp=time.time(),
worker_id=self.worker_id,
data={"user_input": user_input},
))
# 执行控制循环...
self.append_event(Event(
event_type=EventType.ROUND_COMPLETED,
timestamp=time.time(),
worker_id=self.worker_id,
data={},
))
final_state = self.replay_events()
return final_state.get("final_answer", "")
优点: 完整审计日志,天然支持时间旅行调试,易于故障恢复。缺点: 重放开销大,需要更多存储空间。
12.3 三种方案的对比与选型
| 方案 | 并发性 | 一致性 | 恢复能力 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|---|
| 分布式锁 | 低 | 强 | 中 | 低 | 少量并发、需要强一致性 |
| 乐观并发 | 高 | 最终 | 高 | 中 | 多 Worker、冲突低的业务 |
| 事件溯源 | 高 | 强 | 高 | 高 | 审计要求高、需要完整历史 |
选型建议:
- 小规模、单域部署:分布式锁即可
- 多 Worker 并发、对延迟敏感:乐观并发
- 金融、医疗等合规审计严格的场景:事件溯源
13. 小结与进一步思考
本文从状态机模型出发,完整地拆解了 Agent Control Loop 的核心抽象:
- OBSERVE 负责输入归一化——将各种来源的信息统一为 LLM 可理解的 message 格式
- THINK 是核心推理阶段——管理 Context Window、控制 Token 预算、解析 LLM 输出
- ACT 是执行层——处理工具调用的同步/异步执行、超时控制、安全隔离
- REFLECT 负责质量评估——决定是继续、重试还是终止
- 终止条件是成本和安全的兜底——max_turns、token_budget、error_threshold、loop_detection
我们对比了 ReAct 和 Plan-then-Execute 两种主流模式,分析了 Stateless 与 Stateful 两种状态管理策略,并实现了一个不依赖任何框架的完整 Control Loop。
但控制循环只是 Agent 运行时的骨架。它的灵魂在于 Tool Calling——正是工具让 Agent 从"能说会道的语言模型"变成"能做事的智能体"。
在下一篇 《Tool Calling Deep Dive: 让 LLM 成为可编程接口》 中,我们会深入工具调用的设计哲学:JSON Schema 作为契约、Tool Registry 的实现、参数校验、错误传播,以及 Structured Output 为什么优于自由文本。
留几个值得进一步思考的问题:
- Control Loop 的嵌套:当一个 Agent 的工具是另一个 Agent 时,控制循环如何嵌套?外层循环和内层循环的终止条件如何协调?
- 人机协作中的循环:如何在 Control Loop 中优雅地插入人类审批节点?这和 Stateful Agent 的 checkpoint 机制有什么关系?
- 流式输出与控制循环:当 Agent 需要边思考边输出(streaming)时,状态机模型还适用吗?需要做哪些调整?
- 多模态输入的归一化:当 OBSERVE 阶段接收的不只是文本,还有图片、音频、视频时,输入归一化策略如何演化?
系列导航:本文是 Agentic 系列的第 04 篇。