记忆架构:Agent的状态与记忆体系
LLM 是一个纯函数:给定相同的 prompt,产生相同的输出。它没有"昨天",没有"上次",没有"你之前说过"。
但一个合格的 Agent 必须记得:用户的偏好、上一步的结果、三天前那个失败的任务、以及从知识库中检索到的关键事实。
记忆,是 Agent 从"单轮工具"变成"持续助手"的分水岭。本文是 Agentic 系列第 08 篇,将系统拆解 Agent 记忆的四层架构,从认知科学类比到工程实现,给出完整的设计方案。
1. 为什么 Agent 需要记忆
LLM 的本质是一个 stateless function:response = llm(prompt)。每次调用都是一个全新的开始,模型不知道上一次调用发生了什么。
这在单轮问答场景下没有问题。但当我们把 LLM 嵌入 Agent 系统后,无状态就成了致命缺陷:
- 多轮对话:用户说"把上面那个改成蓝色"——"上面那个"在哪里?
- 长任务执行:Agent 执行到第 5 步,需要回顾第 2 步的输出来做决策
- 跨会话连续性:用户昨天让 Agent 分析了一份报告,今天问"上次那份报告的结论是什么?"
- 个性化服务:Agent 需要记住用户偏好("我喜欢简洁的回答"、"输出用 Markdown 表格")
没有记忆的 Agent,每次对话都是一个"失忆症患者"——它可能很聪明,但永远无法建立连续的工作关系。
核心命题:如何为一个 stateless 的 LLM 构建一套 stateful 的记忆体系,使 Agent 在有限的 Context Window 内,获得"无限"的记忆能力?
2. 从认知科学看 Agent 记忆
在设计 Agent 记忆架构之前,先看看人类大脑是怎么处理记忆的。认知心理学中 Atkinson-Shiffrin 模型把人类记忆分为多个层级,这个分层对 Agent 设计有极强的指导意义。
这个类比的价值在于:
- 分层处理:不是所有信息都需要"记住",大部分感觉输入会被丢弃
- 容量约束:工作记忆(Context Window)的容量是硬性限制,必须在这个限制内做信息的取舍
- 编码与检索:信息从短期记忆进入长期记忆需要"编码"(写入),使用时需要"检索"(读取)
- 遗忘是特性:遗忘不是 bug,而是一种必要的信息过滤机制
基于这个认知框架,我们设计 Agent 的四层记忆架构。
3. Agent 记忆的四层架构
Layer 1: Conversation Buffer — 对话历史
本质:保存完整的 message history,让 LLM 能"看到"之前的对话。
这是最直觉的记忆形式:把所有 user 和 assistant 消息按顺序存起来,每次调用 LLM 时全量传入。
class ConversationBuffer:
"""最简单的对话记忆:完整保存消息历史"""
def __init__(self, max_tokens: int = 8000):
self.messages: list[dict] = []
self.max_tokens = max_tokens
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._enforce_limit()
def get_messages(self) -> list[dict]:
return list(self.messages)
def _enforce_limit(self):
"""当超出 token 限制时,从最旧的消息开始裁剪"""
while self._estimate_tokens() > self.max_tokens and len(self.messages) > 2:
# 保留第一条(通常包含重要上下文)和最后一条
self.messages.pop(1)
def _estimate_tokens(self) -> int:
# 粗略估算:1 token ≈ 4 chars (英文) 或 1.5 chars (中文)
return sum(len(m["content"]) // 3 for m in self.messages)
问题与解决方案:
| 问题 | 影响 | 解决方案 |
|---|---|---|
| Context Window 有限 | 消息多了装不下 | 滑动窗口:只保留最近 N 条 |
| 旧消息价值不均 | 早期关键信息被丢弃 | 消息摘要:用 LLM 压缩旧消息 |
| Token 成本线性增长 | 每轮调用的 token 越来越多 | 选择性保留:只保留有工具调用或关键决策的消息 |
滑动窗口 + 摘要是最常见的策略:
class SummarizingBuffer:
"""带摘要能力的对话缓冲区"""
def __init__(self, llm_client, window_size: int = 20, max_tokens: int = 8000):
self.llm_client = llm_client
self.window_size = window_size
self.max_tokens = max_tokens
self.messages: list[dict] = []
self.summary: str = "" # 旧消息的压缩摘要
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
if len(self.messages) > self.window_size:
self._compress()
def get_messages(self) -> list[dict]:
result = []
if self.summary:
result.append({
"role": "system",
"content": f"[Previous conversation summary]\n{self.summary}"
})
result.extend(self.messages)
return result
def _compress(self):
"""将窗口外的消息压缩为摘要"""
# 取出要压缩的消息(保留最近 window_size 条)
to_compress = self.messages[:-self.window_size]
self.messages = self.messages[-self.window_size:]
# 用 LLM 生成摘要
old_context = "\n".join(
f"{m['role']}: {m['content']}" for m in to_compress
)
prompt = (
f"Summarize this conversation history concisely, "
f"preserving key decisions, facts, and user preferences:\n\n"
f"Previous summary: {self.summary}\n\n"
f"New messages:\n{old_context}"
)
self.summary = self.llm_client.complete(prompt)
关键决策点:摘要的质量直接决定 Agent 的"记忆保真度"。摘要太短会丢失关键信息,太长又失去压缩的意义。实践中,摘要长度控制在原文的 20%-30% 是比较好的平衡点。
Layer 2: Working Memory — 任务执行状态
本质:当前任务的"草稿纸",记录正在进行的工作的结构化状态。
Conversation Buffer 保存的是"说了什么",Working Memory 保存的是"正在做什么"。两者的核心区别:
Working Memory 的价值在长任务中尤为明显。当 Agent 执行一个需要 10+ 步的任务时,把所有中间结果都塞在对话历史里是低效的。Working Memory 提供了一个结构化的"任务视图"。
from dataclasses import dataclass, field
from typing import Any
from enum import Enum
class StepStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class TaskStep:
description: str
status: StepStatus = StepStatus.PENDING
result: Any = None
error: str | None = None
@dataclass
class WorkingMemory:
"""当前任务的执行状态"""
goal: str = ""
plan: list[TaskStep] = field(default_factory=list)
scratchpad: dict[str, Any] = field(default_factory=dict)
iteration: int = 0
max_iterations: int = 20
def set_goal(self, goal: str):
self.goal = goal
self.plan = []
self.scratchpad = {}
self.iteration = 0
def add_step(self, description: str) -> int:
self.plan.append(TaskStep(description=description))
return len(self.plan) - 1
def complete_step(self, index: int, result: Any):
self.plan[index].status = StepStatus.COMPLETED
self.plan[index].result = result
def fail_step(self, index: int, error: str):
self.plan[index].status = StepStatus.FAILED
self.plan[index].error = error
def note(self, key: str, value: Any):
"""在 scratchpad 上记录中间发现"""
self.scratchpad[key] = value
def to_context_string(self) -> str:
"""序列化为可注入 prompt 的文本"""
lines = [f"## Current Task State"]
lines.append(f"**Goal**: {self.goal}")
lines.append(f"**Progress**: Step {self.iteration}/{self.max_iterations}")
lines.append("")
lines.append("### Plan:")
for i, step in enumerate(self.plan):
status_icon = {
StepStatus.PENDING: "[ ]",
StepStatus.IN_PROGRESS: "[>]",
StepStatus.COMPLETED: "[x]",
StepStatus.FAILED: "[!]",
}[step.status]
lines.append(f" {status_icon} {i+1}. {step.description}")
if step.result:
lines.append(f" Result: {str(step.result)[:200]}")
if step.error:
lines.append(f" Error: {step.error}")
if self.scratchpad:
lines.append("")
lines.append("### Scratchpad:")
for k, v in self.scratchpad.items():
lines.append(f" - {k}: {str(v)[:300]}")
return "\n".join(lines)
Working Memory 什么时候更新?
- Plan 阶段:Agent 制定计划后,写入
plan - 每步执行后:更新 step 状态和结果
- 发现新信息时:写入
scratchpad(例如发现数据有异常值) - 任务完成/失败时:清空或归档到 Episodic Memory
Layer 3: Episodic Memory — 历史经验
本质:过去交互的结构化记录,用于跨会话的经验积累。
如果说 Working Memory 是"今天的笔记",Episodic Memory 就是"过去的日记"。它回答的问题是:
- "上次用户让我处理类似的任务,我是怎么做的?"
- "用户偏好什么样的输出格式?"
- "上次这个工具调用失败了,原因是什么?"
import time
import json
import hashlib
from dataclasses import dataclass, asdict
@dataclass
class Episode:
"""一次交互的结构化记录"""
episode_id: str
timestamp: float
task_description: str
approach: str # Agent 采用的方法
outcome: str # 成功/失败/部分成功
key_decisions: list[str] # 关键决策点
user_feedback: str | None # 用户反馈(如果有)
tools_used: list[str] # 使用了哪些工具
lessons: list[str] # 经验教训
importance: float # 重要性评分 0-1
embedding: list[float] | None = None # 向量表示
def to_context_string(self) -> str:
return (
f"[Past experience - {self.task_description}]\n"
f"Approach: {self.approach}\n"
f"Outcome: {self.outcome}\n"
f"Lessons: {'; '.join(self.lessons)}"
)
class EpisodicMemory:
"""基于向量检索的情景记忆"""
def __init__(self, embedding_fn, vector_store):
self.embedding_fn = embedding_fn # text -> vector
self.vector_store = vector_store # 向量数据库客户端
self.decay_factor = 0.95 # 时间衰减因子
def store(self, episode: Episode):
"""写入一条记忆"""
# 生成向量表示
text_for_embedding = (
f"{episode.task_description} {episode.approach} "
f"{' '.join(episode.lessons)}"
)
episode.embedding = self.embedding_fn(text_for_embedding)
# 写入向量数据库
self.vector_store.upsert(
id=episode.episode_id,
vector=episode.embedding,
metadata=asdict(episode)
)
def recall(self, query: str, top_k: int = 5) -> list[Episode]:
"""根据当前任务检索相关记忆"""
query_embedding = self.embedding_fn(query)
# 向量相似度检索
results = self.vector_store.query(
vector=query_embedding,
top_k=top_k * 2 # 多检索一些,后面再过滤
)
# 综合评分:相似度 × 时间衰减 × 重要性
scored_episodes = []
now = time.time()
for result in results:
episode = Episode(**result.metadata)
age_days = (now - episode.timestamp) / 86400
# 综合评分公式
time_decay = self.decay_factor ** age_days
final_score = (
result.similarity * 0.5 + # 语义相似度
time_decay * 0.3 + # 时间新鲜度
episode.importance * 0.2 # 重要性
)
scored_episodes.append((episode, final_score))
# 按综合分排序,取 top_k
scored_episodes.sort(key=lambda x: x[1], reverse=True)
return [ep for ep, _ in scored_episodes[:top_k]]
Episodic Memory 的检索策略:
关键决策:什么时候写入 Episodic Memory?
不是每轮对话都值得记住。过度记忆会导致检索噪声。实践中推荐以下策略:
| 触发条件 | 写入内容 | 重要性 |
|---|---|---|
| 任务成功完成 | 完整的任务描述、方法、结果 | 0.7-0.9 |
| 任务失败 | 失败原因、错误信息、教训 | 0.8-1.0 |
| 用户显式反馈 | 用户的表扬/批评/修正 | 0.9-1.0 |
| 发现新的用户偏好 | 偏好描述 | 0.8 |
| 使用了新的工具/方法 | 工具使用经验 | 0.5-0.7 |
Layer 4: Semantic Memory — 知识库
本质:相对稳定的事实性知识,通常通过 RAG (Retrieval-Augmented Generation) 接入。
Semantic Memory 与 Episodic Memory 的区别:
| 维度 | Episodic Memory | Semantic Memory |
|---|---|---|
| 存储内容 | Agent 的经验(做过什么) | 外部知识(世界是什么样) |
| 更新频率 | 每次任务后可能更新 | 相对稳定,定期更新 |
| 来源 | Agent 自身的交互历史 | 文档、数据库、API |
| 检索触发 | 遇到类似任务时 | 需要事实性知识时 |
在本篇中,我们只关注 Agent 如何"使用"Semantic Memory。知识如何构建、如何切分、如何检索——这些 RAG 工程问题留给下一篇文章。
class SemanticMemory:
"""知识库接口(RAG 的消费侧)"""
def __init__(self, retriever):
self.retriever = retriever # RAG 检索器
def query(self, question: str, top_k: int = 3) -> list[dict]:
"""检索相关知识片段"""
results = self.retriever.search(question, top_k=top_k)
return [
{
"content": r.text,
"source": r.metadata.get("source", "unknown"),
"relevance": r.score,
}
for r in results
]
def format_for_context(self, results: list[dict]) -> str:
"""格式化为可注入 prompt 的文本"""
if not results:
return ""
lines = ["## Relevant Knowledge:"]
for i, r in enumerate(results, 1):
lines.append(f"\n### [{i}] (source: {r['source']})")
lines.append(r["content"])
return "\n".join(lines)
4. 记忆的读写操作
记忆系统的核心操作可以概括为四个:Write、Read、Update、Forget。每个操作都有其触发时机和策略选择。
Write:写入记忆
class MemoryWriter:
"""决定什么信息、在什么时候写入哪层记忆"""
def __init__(self, working_memory, episodic_memory, llm_client):
self.working = working_memory
self.episodic = episodic_memory
self.llm = llm_client
def on_step_complete(self, step_index: int, result: Any):
"""每步执行完成后的写入"""
# 更新 Working Memory
self.working.complete_step(step_index, result)
# 重要发现写入 scratchpad
if self._is_notable(result):
key = f"step_{step_index}_finding"
self.working.note(key, self._extract_key_info(result))
def on_task_complete(self, task_description: str, success: bool):
"""任务完成后的写入"""
# 用 LLM 提取经验教训
reflection_prompt = (
f"Task: {task_description}\n"
f"Working Memory:\n{self.working.to_context_string()}\n\n"
f"Extract key lessons learned from this task. "
f"Output as JSON with keys: approach, lessons, importance (0-1)"
)
reflection = self.llm.complete(reflection_prompt, json_mode=True)
parsed = json.loads(reflection)
# 写入 Episodic Memory
episode = Episode(
episode_id=hashlib.md5(
f"{task_description}{time.time()}".encode()
).hexdigest(),
timestamp=time.time(),
task_description=task_description,
approach=parsed.get("approach", ""),
outcome="success" if success else "failure",
key_decisions=[],
user_feedback=None,
tools_used=[],
lessons=parsed.get("lessons", []),
importance=parsed.get("importance", 0.5),
)
self.episodic.store(episode)
def on_user_feedback(self, feedback: str, task_description: str):
"""用户反馈时的写入——高优先级"""
episode = Episode(
episode_id=hashlib.md5(
f"feedback_{time.time()}".encode()
).hexdigest(),
timestamp=time.time(),
task_description=task_description,
approach="",
outcome="user_feedback",
key_decisions=[],
user_feedback=feedback,
tools_used=[],
lessons=[f"User feedback: {feedback}"],
importance=0.9, # 用户反馈总是高重要性
)
self.episodic.store(episode)
def _is_notable(self, result: Any) -> bool:
"""判断结果是否值得特别记录"""
# 简单启发式:结果较长或包含数字时可能重要
text = str(result)
return len(text) > 200 or any(c.isdigit() for c in text)
def _extract_key_info(self, result: Any) -> str:
"""提取关键信息(可以用 LLM,也可以用规则)"""
text = str(result)
if len(text) <= 300:
return text
return text[:300] + "..."
Read:读取记忆
读取操作发生在每次 LLM 调用之前——我们需要从各层记忆中组装 context。
class MemoryReader:
"""从各层记忆中组装 LLM 调用的上下文"""
def __init__(
self,
conversation_buffer,
working_memory,
episodic_memory,
semantic_memory,
):
self.conversation = conversation_buffer
self.working = working_memory
self.episodic = episodic_memory
self.semantic = semantic_memory
def assemble_context(
self,
user_query: str,
system_prompt: str,
token_budget: int = 16000,
) -> list[dict]:
"""组装完整的消息列表"""
messages = []
# 1. System Prompt(固定分配)
messages.append({"role": "system", "content": system_prompt})
# 2. 检索 Episodic Memory(相关历史经验)
relevant_episodes = self.episodic.recall(user_query, top_k=3)
if relevant_episodes:
episode_text = "\n\n".join(
ep.to_context_string() for ep in relevant_episodes
)
messages.append({
"role": "system",
"content": f"## Relevant Past Experience:\n{episode_text}"
})
# 3. 检索 Semantic Memory(相关知识)
knowledge_results = self.semantic.query(user_query, top_k=3)
if knowledge_results:
knowledge_text = self.semantic.format_for_context(knowledge_results)
messages.append({
"role": "system",
"content": knowledge_text
})
# 4. Working Memory(当前任务状态)
if self.working.goal:
messages.append({
"role": "system",
"content": self.working.to_context_string()
})
# 5. Conversation History(对话历史)
messages.extend(self.conversation.get_messages())
# 6. Token 预算检查与裁剪
messages = self._fit_to_budget(messages, token_budget)
return messages
def _fit_to_budget(
self, messages: list[dict], budget: int
) -> list[dict]:
"""确保总 token 数不超过预算"""
total = sum(len(m["content"]) // 3 for m in messages)
if total <= budget:
return messages
# 裁剪策略:优先裁减对话历史中间部分
# 保留: system prompts + 最早2条 + 最近5条
system_msgs = [m for m in messages if m["role"] == "system"]
non_system = [m for m in messages if m["role"] != "system"]
if len(non_system) > 7:
kept = non_system[:2] + non_system[-5:]
messages = system_msgs + kept
return messages
Update:记忆更新
记忆更新有三种模式,适用于不同场景:
class MemoryUpdateStrategy:
"""记忆更新策略"""
@staticmethod
def overwrite(store: dict, key: str, value: Any):
"""覆盖:新值完全替换旧值
适用于:用户偏好(用户说"我改主意了,用英文回复")
"""
store[key] = value
@staticmethod
def append(store: dict, key: str, value: Any):
"""追加:保留历史,添加新记录
适用于:任务历史(每次任务都是新记录)
"""
if key not in store:
store[key] = []
store[key].append(value)
@staticmethod
def merge(store: dict, key: str, value: dict, llm_client=None):
"""合并:智能融合旧信息和新信息
适用于:用户画像(逐步积累,可能有矛盾需要解决)
"""
if key not in store:
store[key] = value
return
old = store[key]
if llm_client:
# 用 LLM 智能合并
prompt = (
f"Merge these two user profiles, resolving conflicts "
f"by preferring newer information:\n"
f"Old: {json.dumps(old)}\nNew: {json.dumps(value)}"
)
merged = json.loads(llm_client.complete(prompt, json_mode=True))
store[key] = merged
else:
# 简单合并:新值覆盖旧值中的同名字段
if isinstance(old, dict) and isinstance(value, dict):
store[key] = {**old, **value}
Forget:记忆遗忘
遗忘是记忆系统的必要组成部分。没有遗忘,记忆库会无限膨胀,检索质量会持续下降。
class MemoryForgetting:
"""记忆遗忘策略"""
def __init__(self, episodic_memory, decay_rate: float = 0.01):
self.episodic = episodic_memory
self.decay_rate = decay_rate
def time_based_decay(self, max_age_days: int = 90):
"""基于时间的遗忘:超过 N 天且重要性低的记忆被清除"""
cutoff = time.time() - (max_age_days * 86400)
all_episodes = self.episodic.vector_store.list_all()
for episode_data in all_episodes:
if (
episode_data["timestamp"] < cutoff
and episode_data["importance"] < 0.7
):
self.episodic.vector_store.delete(episode_data["episode_id"])
def capacity_based_eviction(self, max_episodes: int = 1000):
"""基于容量的驱逐:保留最重要的 N 条记忆"""
all_episodes = self.episodic.vector_store.list_all()
if len(all_episodes) <= max_episodes:
return
# 按综合分排序(重要性 × 时间衰减)
now = time.time()
scored = []
for ep in all_episodes:
age_days = (now - ep["timestamp"]) / 86400
score = ep["importance"] * (0.95 ** age_days)
scored.append((ep["episode_id"], score))
scored.sort(key=lambda x: x[1])
# 删除分数最低的
to_remove = len(all_episodes) - max_episodes
for episode_id, _ in scored[:to_remove]:
self.episodic.vector_store.delete(episode_id)
def explicit_forget(self, episode_id: str):
"""主动遗忘:用户要求或隐私合规"""
self.episodic.vector_store.delete(episode_id)
5. 记忆存储方案对比
不同的记忆层适合不同的存储后端。选择存储方案时需要考虑:数据结构、访问模式、持久化需求和查询能力。
实践建议:
- 原型阶段:全部用内存(dict + list),快速验证
- 单用户产品:SQLite + ChromaDB(本地向量库),零运维
- 多用户产品:PostgreSQL(结构化数据 + pgvector 扩展)+ Redis(会话缓存)
- 大规模系统:PostgreSQL + 专用向量数据库(Pinecone/Qdrant)+ Redis Cluster
6. 完整实现:MemoryManager
将四层记忆整合到一个统一的管理器中,在 Agent Loop 中使用。
import time
import json
import hashlib
from dataclasses import dataclass, field, asdict
from typing import Any, Protocol
class LLMClient(Protocol):
"""LLM 客户端接口"""
def complete(self, prompt: str, json_mode: bool = False) -> str: ...
class VectorStore(Protocol):
"""向量存储接口"""
def upsert(self, id: str, vector: list[float], metadata: dict): ...
def query(self, vector: list[float], top_k: int) -> list: ...
def delete(self, id: str): ...
def list_all(self) -> list[dict]: ...
class Retriever(Protocol):
"""RAG 检索器接口"""
def search(self, query: str, top_k: int) -> list: ...
class MemoryManager:
"""
统一记忆管理器,整合四层记忆架构。
职责:
1. 管理四层记忆的生命周期
2. 在 Agent Loop 中提供 read/write 接口
3. 处理 Context Window 的 token 预算分配
"""
def __init__(
self,
llm_client: LLMClient,
embedding_fn,
vector_store: VectorStore,
retriever: Retriever,
config: dict | None = None,
):
self.llm = llm_client
self.config = config or {}
# Layer 1: Conversation Buffer
self.conversation = SummarizingBuffer(
llm_client=llm_client,
window_size=self.config.get("conversation_window", 20),
max_tokens=self.config.get("conversation_max_tokens", 8000),
)
# Layer 2: Working Memory
self.working = WorkingMemory(
max_iterations=self.config.get("max_iterations", 20)
)
# Layer 3: Episodic Memory
self.episodic = EpisodicMemory(
embedding_fn=embedding_fn,
vector_store=vector_store,
)
# Layer 4: Semantic Memory
self.semantic = SemanticMemory(retriever=retriever)
# Token budget config
self.total_budget = self.config.get("total_token_budget", 16000)
self.budget_allocation = self.config.get("budget_allocation", {
"system_prompt": 0.20, # 20% for system prompt
"memory": 0.30, # 30% for episodic + semantic memory
"history": 0.30, # 30% for conversation history
"reserve": 0.20, # 20% for tool schemas + response
})
# ── Read: 组装 LLM 上下文 ──────────────────────────────────
def build_context(
self, user_query: str, system_prompt: str
) -> list[dict]:
"""在每次 LLM 调用前,组装完整的消息列表"""
messages = []
budget = self.total_budget
# 1. System Prompt
sp_budget = int(budget * self.budget_allocation["system_prompt"])
system_content = self._truncate(system_prompt, sp_budget)
messages.append({"role": "system", "content": system_content})
# 2. Memory injection (Episodic + Semantic + Working)
mem_budget = int(budget * self.budget_allocation["memory"])
memory_parts = []
# 2a. Working Memory
if self.working.goal:
memory_parts.append(self.working.to_context_string())
# 2b. Episodic Memory
episodes = self.episodic.recall(user_query, top_k=3)
if episodes:
ep_text = "\n\n".join(ep.to_context_string() for ep in episodes)
memory_parts.append(f"## Past Experience:\n{ep_text}")
# 2c. Semantic Memory
knowledge = self.semantic.query(user_query, top_k=3)
if knowledge:
memory_parts.append(self.semantic.format_for_context(knowledge))
if memory_parts:
combined = "\n\n---\n\n".join(memory_parts)
combined = self._truncate(combined, mem_budget)
messages.append({"role": "system", "content": combined})
# 3. Conversation History
hist_budget = int(budget * self.budget_allocation["history"])
history = self.conversation.get_messages()
history = self._truncate_messages(history, hist_budget)
messages.extend(history)
return messages
# ── Write: 记忆写入 ─────────────────────────────────────
def on_user_message(self, content: str):
"""用户消息到来时"""
self.conversation.add("user", content)
def on_assistant_message(self, content: str):
"""Agent 回复时"""
self.conversation.add("assistant", content)
def on_tool_result(self, tool_name: str, result: str):
"""工具返回结果时"""
self.conversation.add(
"tool", f"[{tool_name}] {result}"
)
def on_step_complete(self, step_index: int, result: Any):
"""单步完成时更新 Working Memory"""
self.working.complete_step(step_index, result)
def on_task_start(self, goal: str, plan: list[str]):
"""任务开始时初始化 Working Memory"""
self.working.set_goal(goal)
for step_desc in plan:
self.working.add_step(step_desc)
def on_task_complete(self, task_description: str, success: bool):
"""任务完成时归档到 Episodic Memory"""
# 用 LLM 从 Working Memory 中提取经验
reflection_prompt = (
f"Reflect on this completed task.\n"
f"Task: {task_description}\n"
f"State:\n{self.working.to_context_string()}\n\n"
f"Extract: approach (string), lessons (list of strings), "
f"importance (float 0-1). Output JSON."
)
try:
raw = self.llm.complete(reflection_prompt, json_mode=True)
parsed = json.loads(raw)
except (json.JSONDecodeError, Exception):
parsed = {
"approach": "unknown",
"lessons": [],
"importance": 0.5,
}
episode = Episode(
episode_id=hashlib.md5(
f"{task_description}{time.time()}".encode()
).hexdigest(),
timestamp=time.time(),
task_description=task_description,
approach=parsed.get("approach", ""),
outcome="success" if success else "failure",
key_decisions=[],
user_feedback=None,
tools_used=[],
lessons=parsed.get("lessons", []),
importance=parsed.get("importance", 0.5),
)
self.episodic.store(episode)
# 清空 Working Memory
self.working = WorkingMemory(
max_iterations=self.config.get("max_iterations", 20)
)
# ── Forget: 定期维护 ────────────────────────────────────
def maintenance(self, max_age_days: int = 90, max_episodes: int = 1000):
"""定期执行的记忆维护"""
forgetting = MemoryForgetting(self.episodic)
forgetting.time_based_decay(max_age_days)
forgetting.capacity_based_eviction(max_episodes)
# ── 辅助方法 ─────────────────────────────────────────
def _truncate(self, text: str, max_tokens: int) -> str:
max_chars = max_tokens * 3 # 粗略估算
if len(text) <= max_chars:
return text
return text[:max_chars] + "\n...[truncated]"
def _truncate_messages(
self, messages: list[dict], max_tokens: int
) -> list[dict]:
total = sum(len(m["content"]) // 3 for m in messages)
if total <= max_tokens:
return messages
# 保留最早 1 条 + 最近 N 条
if len(messages) > 6:
return messages[:1] + messages[-5:]
return messages[-5:]
在 Agent Loop 中集成
def agent_loop(
user_input: str,
memory: MemoryManager,
llm_client: LLMClient,
tools: dict,
system_prompt: str,
max_steps: int = 10,
):
"""带记忆管理的 Agent 主循环"""
# 记录用户输入
memory.on_user_message(user_input)
for step in range(max_steps):
# ── Read: 从记忆中组装上下文 ──
messages = memory.build_context(user_input, system_prompt)
# ── Think: 调用 LLM ──
response = llm_client.chat(messages, tools=tools)
# ── 判断是否需要调用工具 ──
if response.tool_calls:
for tool_call in response.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# ── Act: 执行工具 ──
result = tools[tool_name](**tool_args)
# ── Write: 记录工具结果 ──
memory.on_tool_result(tool_name, str(result))
memory.on_step_complete(step, result)
else:
# 没有工具调用,Agent 给出了最终回答
final_answer = response.content
memory.on_assistant_message(final_answer)
# 任务完成,归档到 Episodic Memory
memory.on_task_complete(user_input, success=True)
return final_answer
# 超过最大步数
memory.on_task_complete(user_input, success=False)
return "Task exceeded maximum steps."
7. Context Window 管理策略
Context Window 是 Agent 记忆系统中最关键的瓶颈。所有层级的记忆最终都要"挤进"这个有限的空间。
Token 预算分配
这个比例不是固定的。关键在于动态调整:
class TokenBudgetAllocator:
"""根据任务特征动态分配 token 预算"""
def __init__(self, total_budget: int = 16000):
self.total = total_budget
def allocate(
self,
task_complexity: str = "medium",
has_knowledge_need: bool = False,
conversation_length: int = 0,
) -> dict[str, int]:
"""
根据任务特征动态调整各部分预算。
- 简单任务:减少 memory,增加 history(对话上下文更重要)
- 复杂任务:增加 memory,减少 history(需要更多参考信息)
- 知识密集:增加 semantic memory 的比重
"""
if task_complexity == "simple":
allocation = {
"system_prompt": 0.15,
"memory": 0.15,
"history": 0.45,
"reserve": 0.25,
}
elif task_complexity == "complex":
allocation = {
"system_prompt": 0.15,
"memory": 0.40,
"history": 0.25,
"reserve": 0.20,
}
else: # medium
allocation = {
"system_prompt": 0.20,
"memory": 0.30,
"history": 0.30,
"reserve": 0.20,
}
# 如果需要知识检索,从 history 匀一些给 memory
if has_knowledge_need:
allocation["memory"] += 0.10
allocation["history"] -= 0.10
# 对话很长时,给 history 更多空间
if conversation_length > 30:
allocation["history"] += 0.05
allocation["reserve"] -= 0.05
return {
k: int(v * self.total) for k, v in allocation.items()
}
消息压缩策略
当对话历史超出预算时,需要压缩。两种主要方案:
方案 A:LLM 摘要压缩
def llm_summarize(messages: list[dict], llm_client) -> str:
"""用 LLM 压缩对话历史"""
conversation_text = "\n".join(
f"{m['role']}: {m['content'][:500]}" for m in messages
)
prompt = (
"Summarize this conversation, preserving:\n"
"1. Key decisions and their rationale\n"
"2. Important facts and data points\n"
"3. User preferences and corrections\n"
"4. Current task status\n\n"
"Be concise but complete. Do not lose critical information.\n\n"
f"Conversation:\n{conversation_text}"
)
return llm_client.complete(prompt)
方案 B:规则压缩(零成本)
def rule_based_compress(messages: list[dict]) -> list[dict]:
"""基于规则的消息压缩,不需要额外 LLM 调用"""
compressed = []
for msg in messages:
content = msg["content"]
# 规则 1: 截断超长的工具输出
if msg["role"] == "tool" and len(content) > 500:
content = content[:500] + "\n...[output truncated]"
# 规则 2: 移除纯确认消息("好的"、"明白了")
if msg["role"] == "assistant" and len(content) < 20:
continue
# 规则 3: 移除重复的错误消息
if "error" in content.lower() and any(
content == m["content"] for m in compressed
):
continue
compressed.append({"role": msg["role"], "content": content})
return compressed
方案对比:
| 维度 | LLM 摘要 | 规则压缩 |
|---|---|---|
| 压缩质量 | 高,能理解语义 | 中,可能丢失隐含信息 |
| 额外成本 | 需要一次 LLM 调用 | 零 |
| 延迟 | 增加 1-3 秒 | 毫秒级 |
| 适用场景 | 长对话、复杂任务 | 短对话、实时场景 |
实践建议:先用规则压缩兜底(保证不超 budget),当压缩比 > 50% 时再触发 LLM 摘要。两种方案可以组合使用:先规则裁剪,再 LLM 摘要。
8. Trade-off 分析
记忆系统的设计充满权衡。没有银弹,只有适合你场景的平衡点。
Trade-off 1: 记忆丰富度 vs 成本
- 记忆越多 → 上下文越丰富 → 回答质量越高 → 但 Token 成本线性增长,延迟线性增长
- 记忆太少 → Agent "健忘" → 重复劳动、答非所问 → 用户体验差
实践经验:对于大多数 Agent,将 memory injection 控制在 Context Window 的 25-35% 是比较好的区间。超过 40% 时,边际收益急剧下降,但成本继续线性增长。
Trade-off 2: 检索精度 vs 检索召回
不同记忆层的最佳 top_k:
- Episodic Memory:
top_k=3(过去经验不需要太多,2-3 条最相关的就够) - Semantic Memory:
top_k=5(知识检索需要更全面,特别是当问题模糊时)
Trade-off 3: 实时性 vs 一致性
写入记忆的时机也有权衡:
| 写入时机 | 优点 | 缺点 |
|---|---|---|
| 同步写入(每步结束立即写) | 记忆总是最新的 | 增加每步延迟 |
| 异步写入(后台批量写) | 不影响主循环延迟 | 可能丢失最近的记忆 |
| 任务结束后写入 | 只写入"完整"的经验 | 任务中途中断会丢失 |
建议:Working Memory 同步更新(它在 Agent Loop 的关键路径上),Episodic Memory 任务结束后异步写入(不在关键路径上)。
Trade-off 4: 通用记忆 vs 专用记忆
| 设计方向 | 场景 | 优点 | 缺点 |
|---|---|---|---|
| 通用记忆系统 | 平台型 Agent | 一套代码支撑多场景 | 每个场景都不够深入 |
| 专用记忆系统 | 垂直领域 Agent | 为特定任务深度优化 | 迁移成本高 |
建议:先用通用方案(本文的四层架构),在验证了产品方向后,对核心场景做专用优化。
9. 记忆分层决策树
在实践中,最常见的问题是:"这条信息应该存到哪一层记忆?"没有清晰的决策框架,工程师往往会在不同层之间反复修改,导致记忆架构混乱。本节提供一个系统的决策树,帮助快速定位信息的归属。
决策框架
具体示例
示例 1:用户输入 "把上次的数据改成蓝色"
Step 1: 是否为当前对话流的直接产物? → 是
└─> Conversation Buffer (L1)
存储:{
"role": "user",
"content": "把上次的数据改成蓝色",
"timestamp": "2024-03-20T10:15:00Z"
}
同时,Agent 需要在 Working Memory 中记录:
{
"task": "修改数据可视化颜色",
"reference": "上次生成的图表",
"action": "pending"
}
示例 2:执行过程中发现"数据有 15% 的缺失值"
Step 1: 是否为当前任务执行状态信息? → 是
└─> Working Memory (L2)
存储到 Scratchpad:{
"data_quality": {
"missing_percentage": 0.15,
"affected_columns": ["sales", "region"],
"handling_strategy": "forward_fill"
}
}
同时注入到当前 prompt 中,让 Agent 在后续决策时考虑这一发现
示例 3:任务完成,提取"我需要先清理缺失值再做聚合"这一教训
Step 1: 是否为当前任务执行状态信息? → 否
Step 2: 是否需要跨会话保留? → 是
Step 3: 是否可以泛化为结构化知识? → 是
└─> Semantic Memory (L4) 知识库
存储:{
"rule_id": "data_prep_order",
"title": "数据处理的执行顺序",
"content": "清理缺失值必须在聚合操作之前,否则会导致统计不准",
"applicable_scenarios": ["数据分析", "ETL流程"],
"confidence": 0.95
}
示例 4:用户说 "我发现你上次的方法不对,应该用另一种方式"
Step 1: 是否为当前任务执行状态信息? → 否
Step 2: 是否需要跨会话保留? → 是
Step 3: 是否可以泛化为结构化知识? → 否
└─> Episodic Memory (L3)
存储:{
"episode_id": "ep_2024_03_20_001",
"task_description": "处理销售数据分析",
"previous_approach": "使用线性回归预测",
"corrected_approach": "应使用时间序列模型(ARIMA)",
"user_feedback": "线性回归忽略了季节性",
"lesson": "销售数据具有明显季节性,不能用简单线性模型",
"importance": 0.85
}
示例 5:Agent 自动生成的中间结果(如 API 调用返回值)
Step 1: 是否为当前对话流的直接产物? → 是
Step 2: 是当前任务执行状态信息? → 是
└─> Working Memory (L2) + Conversation Buffer (L1)
在 L2 中作为"已执行步骤的输出":
{
"step_id": 2,
"action": "fetch_data",
"result": {
"rows": 50000,
"columns": 12,
"date_range": "2024-01 to 2024-12"
}
}
在 L1 中作为"对话历史"的一部分:
{
"role": "tool",
"content": "Successfully fetched data: 50000 rows, 12 columns"
}
决策树实现
from enum import Enum
from typing import Dict, Any
class MemoryLayer(Enum):
CONVERSATION_BUFFER = 1
WORKING_MEMORY = 2
EPISODIC_MEMORY = 3
SEMANTIC_MEMORY = 4
DISCARD = 0
class MemoryLayerDecisionTree:
"""根据信息特征自动判断应该存储到哪一层"""
@staticmethod
def decide(info: Dict[str, Any]) -> MemoryLayer:
"""
Args:
info: 包含以下关键字段的字典
- content: 信息内容
- is_direct_message: 是否为直接输入/输出
- is_task_state: 是否为任务状态
- is_cross_session: 是否需要跨会话保留
- is_generalizable: 是否可泛化为知识
- importance: 重要性评分 (0-1)
- is_user_feedback: 是否为用户反馈
Returns:
应该存储的记忆层级
"""
# 第一道:是否为当前对话流的直接产物?
if info.get("is_direct_message"):
return MemoryLayer.CONVERSATION_BUFFER
# 第二道:是否为当前任务执行状态?
if info.get("is_task_state"):
return MemoryLayer.WORKING_MEMORY
# 第三道:是否需要跨会话保留?
if not info.get("is_cross_session"):
return MemoryLayer.DISCARD
# 第四道:是否可泛化为知识?
if info.get("is_generalizable"):
# 知识库需要满足:
# 1. 相对稳定(不频繁变化)
# 2. 适用于多个场景
# 3. 重要性较高
if (info.get("stability_score", 0.5) > 0.7 and
info.get("applicability_score", 0.5) > 0.6 and
info.get("importance", 0.5) > 0.6):
return MemoryLayer.SEMANTIC_MEMORY
# 默认:episodic 记忆
return MemoryLayer.EPISODIC_MEMORY
@staticmethod
def get_storage_strategy(layer: MemoryLayer) -> Dict[str, Any]:
"""获取每一层的存储策略"""
strategies = {
MemoryLayer.CONVERSATION_BUFFER: {
"storage": "in-memory queue / Redis",
"max_entries": 100,
"retention_policy": "sliding window (last N messages)",
"access_pattern": "sequential",
"ttl": "single session"
},
MemoryLayer.WORKING_MEMORY: {
"storage": "in-memory dictionary / task cache",
"max_entries": 1, # 每个任务一份
"retention_policy": "task lifecycle",
"access_pattern": "random access by key",
"ttl": "task duration"
},
MemoryLayer.EPISODIC_MEMORY: {
"storage": "vector database (e.g., Pinecone, Weaviate)",
"max_entries": "unlimited",
"retention_policy": "time decay + importance score",
"access_pattern": "semantic similarity search",
"ttl": "days to months"
},
MemoryLayer.SEMANTIC_MEMORY: {
"storage": "structured database / knowledge graph",
"max_entries": "unlimited",
"retention_policy": "version control",
"access_pattern": "exact match / graph traversal",
"ttl": "permanent"
},
}
return strategies.get(layer, {})
10. 系统化遗忘机制
简单的时间衰减已经不够。真实的 Agent 系统需要更精细的遗忘策略:为什么有些记忆应该被遗忘得快一些,有些应该被永久保留?本节介绍三种高级遗忘策略及其组合方案。
三种遗忘策略
策略 1: 重要性评分遗忘
信息的重要性不是固定的。"用户偏好用表格格式展示数据"这条信息很重要(高权重);而"2024-03-15 调用了 API,返回 200 状态码"就没那么重要(低权重)。
import time
import math
from dataclasses import dataclass
from typing import Optional
@dataclass
class MemoryEntry:
"""记忆条目"""
id: str
content: str
timestamp: float # 写入时间
importance_score: float # 初始重要性 (0-1)
access_count: int = 0 # 被检索次数
last_access_time: Optional[float] = None
tags: list[str] = None # 标签(如 user_preference, task_result)
def set_importance(self, score: float):
"""手动设置重要性"""
assert 0 <= score <= 1
self.importance_score = score
def increment_access(self):
"""记录访问"""
self.access_count += 1
self.last_access_time = time.time()
class ImportanceBasedForgetfulness:
"""基于重要性的遗忘管理器"""
def __init__(self, base_forget_rate: float = 0.05):
"""
Args:
base_forget_rate: 基础遗忘率(每天遗忘的比例)
"""
self.base_forget_rate = base_forget_rate
def compute_forget_score(self, entry: MemoryEntry, current_time: float) -> float:
"""
计算遗忘评分:值越高,越应该被遗忘
基础公式:
forget_score = base_rate × time_factor × (1 - importance_adjustment)
where:
- time_factor: 时间越久,遗忘倾向越强
- importance_adjustment: 重要性越高,遗忘倾向越弱
"""
# 计算时间衰减因子
age_days = (current_time - entry.timestamp) / 86400
time_factor = min(1.0, age_days / 30) # 30 天后达到最大衰减
# 重要性调整:重要性越高,遗忘评分越低
# importance_score = 0.9 → importance_adjustment = 0.1(难以遗忘)
# importance_score = 0.1 → importance_adjustment = 0.9(容易遗忘)
importance_adjustment = 1 - entry.importance_score
forget_score = self.base_forget_rate * time_factor * importance_adjustment
return forget_score
def should_forget(self, entry: MemoryEntry, current_time: float, threshold: float = 0.5) -> bool:
"""判断是否应该遗忘这条记忆"""
return self.compute_forget_score(entry, current_time) > threshold
class QueryFrequencyBasedForgetfulness:
"""基于查询频率的热度衰减"""
def __init__(self, lookback_days: int = 30, hot_threshold: int = 5):
"""
Args:
lookback_days: 计算热度的时间窗口
hot_threshold: 判定为"热"的最少访问次数
"""
self.lookback_days = lookback_days
self.hot_threshold = hot_threshold
def compute_hotness(self, entry: MemoryEntry, current_time: float) -> float:
"""
计算记忆的"热度":最近被访问的频率
热度范围 0-1:
- 1.0: 非常热(最近经常被检索)
- 0.5: 中等
- 0.0: 冷(很久没被检索过)
"""
if entry.last_access_time is None:
# 从未被访问
age_days = (current_time - entry.timestamp) / 86400
return max(0, 1 - age_days / 30) # 新记忆默认较热
# 最后访问距今的天数
days_since_last_access = (
(current_time - entry.last_access_time) / 86400
)
# 查询频率(每天访问次数)
days_of_existence = (current_time - entry.timestamp) / 86400
if days_of_existence == 0:
days_of_existence = 1
access_frequency = entry.access_count / days_of_existence
# 综合评分:近期被访问 + 频繁被访问
recency_score = max(0, 1 - days_since_last_access / self.lookback_days)
frequency_score = min(1.0, access_frequency / self.hot_threshold)
hotness = recency_score * 0.6 + frequency_score * 0.4
return hotness
def should_downgrade(self, entry: MemoryEntry, current_time: float) -> bool:
"""
判断是否应该将记忆从高优先级降级到低优先级
降级意味着:
- 从 Episodic Memory 转移到冷存储(如归档库)
- 检索时优先级降低
- 自动清理列表中的排序靠后
"""
hotness = self.compute_hotness(entry, current_time)
return hotness < 0.2 # 热度过低时降级
策略 2: 混合遗忘(时间 × 重要性 × 热度)
上面的两个策略可以结合起来,形成一个更强大的遗忘模型:
class HybridForgettingManager:
"""综合时间、重要性、热度的遗忘管理器"""
def __init__(
self,
base_forget_rate: float = 0.05,
weight_time: float = 0.4,
weight_importance: float = 0.35,
weight_hotness: float = 0.25,
):
self.base_forget_rate = base_forget_rate
self.weight_time = weight_time
self.weight_importance = weight_importance
self.weight_hotness = weight_hotness
self.importance_mgr = ImportanceBasedForgetfulness(base_forget_rate)
self.hotness_mgr = QueryFrequencyBasedForgetfulness()
def compute_comprehensive_forget_score(
self, entry: MemoryEntry, current_time: float
) -> float:
"""
综合评分:越高越应该被遗忘
final_score =
weight_time × time_score +
weight_importance × (1 - importance_score) +
weight_hotness × (1 - hotness_score)
"""
# 时间评分:越久越容易遗忘
age_days = (current_time - entry.timestamp) / 86400
time_score = min(1.0, age_days / 90) # 90 天后达到最大
# 重要性评分:直接使用 entry 中存储的值
importance_penalty = 1 - entry.importance_score
# 热度评分:冷的记忆更容易遗忘
hotness = self.hotness_mgr.compute_hotness(entry, current_time)
hotness_penalty = 1 - hotness
# 加权综合
final_score = (
self.weight_time * time_score +
self.weight_importance * importance_penalty +
self.weight_hotness * hotness_penalty
)
return final_score
def should_forget(
self, entry: MemoryEntry, current_time: float, threshold: float = 0.6
) -> bool:
"""判断是否应该遗忘"""
score = self.compute_comprehensive_forget_score(entry, current_time)
return score > threshold
def get_forget_probability(
self, entry: MemoryEntry, current_time: float
) -> float:
"""
获取遗忘概率(0-1)
可用于:
- 决定是否定期执行遗忘检查
- 生成遗忘日志
- 监控记忆库的健康度
"""
score = self.compute_comprehensive_forget_score(entry, current_time)
# 使用 sigmoid 函数将评分转换为概率
return 1 / (1 + math.exp(-5 * (score - 0.5)))
class ForgettingManager:
"""
记忆遗忘的主管理类
责任:
1. 定期扫描记忆库,识别应该遗忘的条目
2. 执行遗忘操作(删除或降级)
3. 记录遗忘日志(用于调试和分析)
4. 支持撤销遗忘(soft delete)
"""
def __init__(self, memory_store, config: Dict[str, Any] = None):
self.memory_store = memory_store
self.config = config or self._default_config()
self.hybrid_mgr = HybridForgettingManager(
base_forget_rate=self.config["base_forget_rate"],
weight_time=self.config["weight_time"],
weight_importance=self.config["weight_importance"],
weight_hotness=self.config["weight_hotness"],
)
self.forget_log = []
@staticmethod
def _default_config() -> Dict[str, Any]:
return {
"base_forget_rate": 0.05,
"weight_time": 0.4,
"weight_importance": 0.35,
"weight_hotness": 0.25,
"forget_threshold": 0.6,
"batch_size": 100,
"check_interval_seconds": 3600, # 每小时检查一次
}
def scan_and_forget(self, current_time: Optional[float] = None) -> Dict[str, Any]:
"""
扫描整个记忆库,执行遗忘操作
Returns:
{
"deleted": int, # 删除的条目数
"downgraded": int, # 降级的条目数
"preserved": int, # 保留的条目数
"log": [...] # 详细日志
}
"""
if current_time is None:
current_time = time.time()
deleted_count = 0
downgraded_count = 0
preserved_count = 0
# 分批扫描
all_entries = self.memory_store.list_all()
batch_size = self.config["batch_size"]
for i in range(0, len(all_entries), batch_size):
batch = all_entries[i : i + batch_size]
for entry in batch:
should_delete = self.hybrid_mgr.should_forget(
entry, current_time, threshold=self.config["forget_threshold"]
)
if should_delete:
self.memory_store.soft_delete(entry.id)
deleted_count += 1
self.forget_log.append({
"timestamp": current_time,
"action": "delete",
"entry_id": entry.id,
"reason": "forget_threshold_exceeded",
"score": self.hybrid_mgr.compute_comprehensive_forget_score(
entry, current_time
),
})
else:
preserved_count += 1
return {
"deleted": deleted_count,
"downgraded": downgraded_count,
"preserved": preserved_count,
"total_scanned": len(all_entries),
"log_sample": self.forget_log[-10:],
}
def restore_forgotten(self, entry_id: str) -> bool:
"""
撤销遗忘操作(如果支持软删除)
通常在以下场景使用:
- 用户明确要求恢复某条记忆
- 系统发现遗忘错误
"""
return self.memory_store.restore(entry_id)
def batch_set_importance(self, tag: str, importance: float):
"""
批量设置某类记忆的重要性
Args:
tag: 记忆标签(如 "user_preference")
importance: 新的重要性评分 (0-1)
Example:
mgr.batch_set_importance("user_preference", 0.95)
# 所有标记为用户偏好的记忆都变成"难以遗忘"
"""
entries = self.memory_store.query_by_tag(tag)
for entry in entries:
entry.set_importance(importance)
self.memory_store.update(entry)
11. Embedding 成本优化
向量化(Embedding)是检索的关键,但 Token 成本不容忽视。对于一个中等规模的 Episodic Memory 库(10000 条记忆),定期重新 Embedding 所有条目会产生巨大成本。本节介绍三种成本优化策略。
问题分析
假设你有一个 Episodic Memory 库,包含 10,000 条过去的交互记录。每条记录平均 200 tokens。
全量 Embedding 的成本:
10,000 条 × 200 tokens = 2,000,000 tokens
假设 Embedding API 的价格是 $0.02 per 1M tokens(如 OpenAI text-embedding-3-small):
成本 = 2,000,000 × $0.02 / 1,000,000 = $0.04
看起来不贵。但问题是:
1. 如果每天新增 100 条记忆,每周需要重新 Embedding?
2. 如果记忆库增长到 100,000 条?
3. 如果你运行多个 Agent 实例?
实际成本考虑:
周场景:
- 每周全量 Embedding: 10,000 条 × 200 tokens × 52 周 = 104M tokens/年
- 年成本: ~$2,080/年(单个库)
如果有 10 个 Agent 实例,每个都维护自己的记忆库:
- 年成本: $20,800/年
策略 1: 增量 vs 全量
最直接的优化:只 Embedding 新增或修改的记忆,而不是全量重新计算。
from datetime import datetime, timedelta
from typing import List, Optional
class IncrementalEmbeddingStrategy:
"""增量 Embedding 策略"""
def __init__(self, embedding_client, memory_store):
self.embedding_client = embedding_client
self.memory_store = memory_store
self.last_embedding_time = None
def get_records_needing_embedding(
self,
since: Optional[datetime] = None,
include_modified: bool = True
) -> List[MemoryEntry]:
"""
获取需要 Embedding 的记录
Args:
since: 只返回此时间点之后的记录
include_modified: 是否包含修改过的记录
Returns:
待 Embedding 的记录列表
"""
if since is None:
since = self.last_embedding_time or (
datetime.now() - timedelta(days=7)
)
criteria = {
"created_after": since,
}
if include_modified:
criteria["modified_after"] = since
return self.memory_store.query(criteria)
def embed_incremental(self) -> Dict[str, Any]:
"""
执行增量 Embedding
Returns:
{
"newly_embedded": int,
"cost": float,
"tokens_used": int,
"time_elapsed": float
}
"""
records = self.get_records_needing_embedding()
if not records:
return {"newly_embedded": 0, "cost": 0, "tokens_used": 0}
# 批量获取 Embedding
texts = [r.content for r in records]
embeddings, tokens_used = self.embedding_client.embed_batch(texts)
# 写回记忆库
for record, embedding in zip(records, embeddings):
record.embedding = embedding
record.embedding_timestamp = datetime.now()
self.memory_store.update(record)
self.last_embedding_time = datetime.now()
cost = self._estimate_cost(tokens_used)
return {
"newly_embedded": len(records),
"tokens_used": tokens_used,
"cost": cost,
}
@staticmethod
def _estimate_cost(tokens: int, price_per_1m: float = 0.02) -> float:
"""估算成本"""
return tokens / 1_000_000 * price_per_1m
成本对比:
假设每天新增 50 条记忆(200 tokens/条):
- 增量 Embedding (每天): 50 × 200 = 10K tokens/天 = $0.0002/天
- 全量 Embedding (每周): 2M tokens/周 = $0.04/周 = $2.08/年

### 策略 2: 哈希缓存(内容不变则跳过重新 Embedding)
问题:有些记忆的内容可能被修改(如重要性评分改变,但文本内容不变)。此时重新 Embedding 是浪费。
```python
import hashlib
class EmbeddingCacheWithHashVerification:
"""使用内容哈希缓存 Embedding 结果"""
def __init__(self, embedding_client, memory_store):
self.embedding_client = embedding_client
self.memory_store = memory_store
self.content_hash_cache = {} # content_hash -> embedding
@staticmethod
def compute_content_hash(content: str) -> str:
"""计算内容的 SHA256 哈希"""
return hashlib.sha256(content.encode()).hexdigest()
def get_or_embed(self, record: MemoryEntry) -> List[float]:
"""
获取 Embedding,如果内容未变则返回缓存结果
Args:
record: 记忆条目
Returns:
embedding 向量
"""
content_hash = self.compute_content_hash(record.content)
# 情况 1: 记录本身就有 embedding(可能来自之前的计算)
if record.embedding is not None:
# 检查内容是否改变过
if not hasattr(record, "content_hash") or record.content_hash == content_hash:
# 内容未变,直接返回已有的 embedding
return record.embedding
# 情况 2: 哈希缓存中有
if content_hash in self.content_hash_cache:
embedding = self.content_hash_cache[content_hash]
record.embedding = embedding
record.content_hash = content_hash
return embedding
# 情况 3: 需要调用 API 计算新的 embedding
embedding = self.embedding_client.embed(record.content)
self.content_hash_cache[content_hash] = embedding
record.embedding = embedding
record.content_hash = content_hash
return embedding
def batch_get_or_embed(self, records: List[MemoryEntry]) -> Dict[str, List[float]]:
"""
批量获取 Embedding,智能判断是否需要调用 API
Returns:
{record_id: embedding}
"""
to_embed = []
cached_results = {}
for record in records:
content_hash = self.compute_content_hash(record.content)
# 检查缓存
if content_hash in self.content_hash_cache:
cached_results[record.id] = self.content_hash_cache[content_hash]
elif record.embedding is not None and (
not hasattr(record, "content_hash") or record.content_hash == content_hash
):
cached_results[record.id] = record.embedding
else:
to_embed.append((record.id, record.content, content_hash))
# 批量调用 API(只处理需要的)
newly_embedded = {}
if to_embed:
ids, contents, hashes = zip(*to_embed)
embeddings = self.embedding_client.embed_batch(contents)
for record_id, embedding, content_hash in zip(ids, embeddings, hashes):
newly_embedded[record_id] = embedding
self.content_hash_cache[content_hash] = embedding
# 合并结果
return {**cached_results, **newly_embedded}
def clear_cache(self):
"""清空缓存(可选)"""
self.content_hash_cache.clear()
缓存效果示例:
假设 10,000 条记忆中:
- 20% 的记忆内容从未改变(100% 命中缓存)
- 30% 的记忆内容改变过,但现在稳定(70% 命中)
- 50% 的记忆是新的或频繁改变(0% 命中)
实际需要 Embedding 的比例:
= 20% × 0% + 30% × 30% + 50% × 100%
= 0% + 9% + 50%
= 59%
节省成本:41%(与全量 Embedding 相比)
策略 3: 批量 Embedding 的吞吐优化
当需要 Embedding 大量记忆时,批处理和并发可以显著提升吞吐量。
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchEmbeddingOptimizer:
"""批量 Embedding 的吞吐优化"""
def __init__(
self,
embedding_client,
batch_size: int = 100,
max_workers: int = 4,
):
self.embedding_client = embedding_client
self.batch_size = batch_size
self.max_workers = max_workers
def embed_batch_sync(self, texts: List[str], show_progress: bool = True) -> List[List[float]]:
"""
同步批量 Embedding
Args:
texts: 待 Embedding 的文本列表
show_progress: 是否显示进度
Returns:
Embedding 向量列表
"""
embeddings = []
total_batches = (len(texts) + self.batch_size - 1) // self.batch_size
for i in range(0, len(texts), self.batch_size):
batch = texts[i : i + self.batch_size]
batch_embeddings = self.embedding_client.embed_batch(batch)
embeddings.extend(batch_embeddings)
if show_progress:
current_batch = (i // self.batch_size) + 1
print(f"Progress: {current_batch}/{total_batches} batches")
return embeddings
async def embed_batch_async(self, texts: List[str]) -> List[List[float]]:
"""
异步批量 Embedding(更高效)
Args:
texts: 待 Embedding 的文本列表
Returns:
Embedding 向量列表
"""
loop = asyncio.get_event_loop()
def process_batch(batch):
return self.embedding_client.embed_batch(batch)
embeddings = []
tasks = []
# 将文本分批,创建异步任务
for i in range(0, len(texts), self.batch_size):
batch = texts[i : i + self.batch_size]
task = loop.run_in_executor(None, process_batch, batch)
tasks.append(task)
# 等待所有任务完成
results = await asyncio.gather(*tasks)
for result in results:
embeddings.extend(result)
return embeddings
def estimate_cost_and_time(self, num_texts: int, avg_tokens_per_text: int) -> Dict[str, Any]:
"""
估算成本和时间
Args:
num_texts: 文本数量
avg_tokens_per_text: 平均每条文本的 token 数
Returns:
{
"total_tokens": int,
"estimated_cost": float,
"estimated_time_seconds": float,
"api_calls": int,
"batches": int,
}
"""
total_tokens = num_texts * avg_tokens_per_text
total_batches = (num_texts + self.batch_size - 1) // self.batch_size
# 假设平均响应时间 1 秒/batch,支持 4 个并发
estimated_time = (total_batches / self.max_workers) * 1.0
estimated_cost = total_tokens / 1_000_000 * 0.02 # 以 $0.02/1M 计
return {
"total_texts": num_texts,
"total_tokens": total_tokens,
"estimated_cost": estimated_cost,
"estimated_time_seconds": estimated_time,
"api_calls": total_batches,
"batches": total_batches,
}
class EmbeddingCostOptimizer:
"""综合 Embedding 成本优化管理器"""
def __init__(self, embedding_client, memory_store):
self.embedding_client = embedding_client
self.memory_store = memory_store
self.hash_cache = EmbeddingCacheWithHashVerification(
embedding_client, memory_store
)
self.batch_optimizer = BatchEmbeddingOptimizer(embedding_client)
self.cost_log = []
def optimize_and_embed(self, records: List[MemoryEntry]) -> Dict[str, Any]:
"""
使用所有优化策略进行 Embedding
流程:
1. 使用哈希缓存跳过未变更的内容
2. 使用批量处理提升吞吐
3. 记录成本和统计信息
"""
start_time = time.time()
# 步骤 1: 哈希缓存过滤
result = self.hash_cache.batch_get_or_embed(records)
elapsed_time = time.time() - start_time
# 统计
cache_hits = sum(
1 for r in records if r.id in result and r.embedding is not None
)
newly_embedded = len(records) - cache_hits
# 估算成本(简化估算)
api_calls = (newly_embedded + self.batch_optimizer.batch_size - 1) // self.batch_optimizer.batch_size
avg_tokens = 200 # 假设
estimated_cost = newly_embedded * avg_tokens / 1_000_000 * 0.02
log_entry = {
"timestamp": datetime.now(),
"total_records": len(records),
"cache_hits": cache_hits,
"newly_embedded": newly_embedded,
"cost_saved_by_cache": (cache_hits * avg_tokens / 1_000_000 * 0.02),
"estimated_cost": estimated_cost,
"elapsed_time": elapsed_time,
}
self.cost_log.append(log_entry)
return {
"completed": True,
"embedded_records": result,
"statistics": log_entry,
"cumulative_cost": sum(log["estimated_cost"] for log in self.cost_log),
}
12. 多租户记忆隔离与并发控制
当 Agent 系统支持多用户场景时,记忆隔离和并发控制变成必需品。一个用户的记忆不能泄露给另一个用户,同时多个用户可能并发修改各自的记忆。
隔离方案
方案 1: User ID 命名空间隔离
最直接的方案:所有记忆的 key 都带上 user_id 前缀。
from typing import Optional, Dict, Any
class NamespacedMemoryStore:
"""基于 user_id 命名空间的隔离存储"""
def __init__(self, backend_store):
"""
Args:
backend_store: 底层存储(如 Redis 或数据库)
"""
self.backend = backend_store
@staticmethod
def _make_key(user_id: str, memory_layer: str, entry_id: str) -> str:
"""生成命名空间化的 key"""
return f"user:{user_id}:memory:{memory_layer}:{entry_id}"
def write(
self,
user_id: str,
memory_layer: str,
entry_id: str,
value: Dict[str, Any],
) -> bool:
"""写入记忆"""
key = self._make_key(user_id, memory_layer, entry_id)
return self.backend.set(key, value)
def read(
self,
user_id: str,
memory_layer: str,
entry_id: str,
) -> Optional[Dict[str, Any]]:
"""读取记忆"""
key = self._make_key(user_id, memory_layer, entry_id)
return self.backend.get(key)
def list_user_memories(
self,
user_id: str,
memory_layer: Optional[str] = None,
) -> Dict[str, Any]:
"""列出用户的所有记忆"""
pattern = f"user:{user_id}:memory:{memory_layer or '*'}:*"
keys = self.backend.keys(pattern)
return {key: self.backend.get(key) for key in keys}
def delete(self, user_id: str, memory_layer: str, entry_id: str) -> bool:
"""删除记忆"""
key = self._make_key(user_id, memory_layer, entry_id)
return self.backend.delete(key)
隔离效果验证:
# 用户 A 的数据
store.write("user_a", "episodic", "ep_001", {"content": "..."})
# 用户 B 读取
result = store.read("user_b", "episodic", "ep_001")
# result = None(隔离成功!)
# 列出用户 A 的所有记忆
user_a_memories = store.list_user_memories("user_a")
# 只会返回 user_a 的记忆,不会泄露其他用户的数据
并发控制
方案 2: 乐观锁 + 版本号
当多个 Agent 实例可能同时修改同一条记忆时,需要并发控制。乐观锁是一个轻量级方案。
from dataclasses import dataclass
import threading
@dataclass
class VersionedMemoryEntry:
"""支持版本控制的记忆条目"""
id: str
user_id: str
content: str
version: int = 1 # 版本号,每次更新递增
last_modified_time: float = None
last_modified_by: str = None # 修改者(agent ID)
locked: bool = False
lock_owner: Optional[str] = None
lock_time: Optional[float] = None
class OptimisticLockingMemoryStore:
"""基于乐观锁的并发控制"""
def __init__(self, backend_store):
self.backend = backend_store
self.local_lock = threading.RLock() # 本地锁(保护字典操作)
def read_with_version(
self, user_id: str, entry_id: str
) -> tuple[VersionedMemoryEntry, bool]:
"""
读取记忆并获取版本号
Returns:
(entry, success)
"""
key = f"user:{user_id}:entry:{entry_id}"
entry = self.backend.get(key)
if entry is None:
return None, False
return VersionedMemoryEntry(**entry), True
def write_with_version_check(
self,
user_id: str,
entry_id: str,
new_content: str,
expected_version: int,
agent_id: str,
) -> tuple[bool, str]:
"""
有条件更新:只有版本号匹配才能更新
Args:
user_id: 用户 ID
entry_id: 条目 ID
new_content: 新内容
expected_version: 期望的版本号(乐观锁)
agent_id: 执行修改的 agent
Returns:
(success, message)
"""
key = f"user:{user_id}:entry:{entry_id}"
with self.local_lock:
current = self.backend.get(key)
if current is None:
# 新建条目
self.backend.set(
key,
{
"id": entry_id,
"user_id": user_id,
"content": new_content,
"version": 1,
"last_modified_by": agent_id,
"last_modified_time": time.time(),
},
)
return True, "Created"
current_version = current.get("version", 1)
if current_version != expected_version:
# 版本不匹配,更新失败
return (
False,
f"Version mismatch: expected {expected_version}, "
f"got {current_version}. Concurrent modification detected.",
)
# 版本匹配,执行更新
self.backend.set(
key,
{
**current,
"content": new_content,
"version": current_version + 1,
"last_modified_by": agent_id,
"last_modified_time": time.time(),
},
)
return True, f"Updated to version {current_version + 1}"
def resolve_conflict(
self,
user_id: str,
entry_id: str,
conflict_strategy: str = "latest-write-wins",
) -> tuple[VersionedMemoryEntry, bool]:
"""
当乐观锁失败时,尝试冲突解决
Args:
conflict_strategy: 冲突策略
- "latest-write-wins": 保留最新的版本(默认)
- "merge": 尝试合并两个版本
- "abort": 放弃此次更新
Returns:
(resolved_entry, success)
"""
key = f"user:{user_id}:entry:{entry_id}"
current = self.backend.get(key)
if current is None:
return None, False
# 重新读取最新版本
latest_entry = VersionedMemoryEntry(**current)
if conflict_strategy == "latest-write-wins":
# 直接返回最新版本
return latest_entry, True
elif conflict_strategy == "merge":
# 这里可以实现更复杂的合并逻辑
# 示例:如果内容是 JSON,可以做字段级合并
return latest_entry, True
elif conflict_strategy == "abort":
return None, False
return None, False
租户级配额管理
@dataclass
class TenantQuota:
"""租户级配额"""
user_id: str
max_memory_entries: int # 最多存储条目数
max_embedding_per_day: int # 每天最多 Embedding 次数
max_context_window_tokens: int # 单次推理最多注入 token
storage_limit_gb: float # 存储空间限制
class MultiTenantMemoryStore:
"""支持多租户、隔离、并发控制的记忆存储"""
def __init__(
self,
backend_store,
default_quota: Optional[TenantQuota] = None,
):
self.backend = backend_store
self.namespaced_store = NamespacedMemoryStore(backend_store)
self.locking_store = OptimisticLockingMemoryStore(backend_store)
self.quotas: Dict[str, TenantQuota] = {}
self.default_quota = (
default_quota or self._create_default_quota()
)
@staticmethod
def _create_default_quota() -> TenantQuota:
"""默认配额"""
return TenantQuota(
user_id="default",
max_memory_entries=10000,
max_embedding_per_day=50000,
max_context_window_tokens=32768,
storage_limit_gb=10.0,
)
def register_user(self, user_id: str, quota: Optional[TenantQuota] = None):
"""注册用户及其配额"""
if quota is None:
quota = TenantQuota(
user_id=user_id,
**{
k: v for k, v in self.default_quota.__dict__.items()
if k != "user_id"
},
)
self.quotas[user_id] = quota
def check_quota(self, user_id: str, operation: str) -> tuple[bool, str]:
"""检查用户是否满足配额"""
if user_id not in self.quotas:
return False, f"User {user_id} not registered"
quota = self.quotas[user_id]
if operation == "write_entry":
# 检查条目数量限制
user_memories = self.namespaced_store.list_user_memories(user_id)
if len(user_memories) >= quota.max_memory_entries:
return (
False,
f"Exceeded max entries limit: {quota.max_memory_entries}",
)
elif operation == "embedding":
# 检查每日 Embedding 配额(简化)
# 实际应用中需要记录每日消费
pass
elif operation == "context_injection":
# 检查上下文注入限制
pass
return True, "OK"
def write_memory(
self,
user_id: str,
memory_layer: str,
entry_id: str,
content: str,
agent_id: str,
) -> tuple[bool, str]:
"""
写入记忆(带租户隔离和配额检查)
Args:
user_id: 用户 ID
memory_layer: 记忆层级
entry_id: 条目 ID
content: 内容
agent_id: 执行操作的 agent
Returns:
(success, message)
"""
# 步骤 1: 检查配额
quota_ok, quota_msg = self.check_quota(user_id, "write_entry")
if not quota_ok:
return False, quota_msg
# 步骤 2: 读取旧版本(为了获取版本号)
old_entry, found = self.locking_store.read_with_version(user_id, entry_id)
expected_version = old_entry.version if found else 0
# 步骤 3: 尝试写入(带乐观锁)
success, msg = self.locking_store.write_with_version_check(
user_id=user_id,
entry_id=entry_id,
new_content=content,
expected_version=expected_version,
agent_id=agent_id,
)
return success, msg
def read_memory(self, user_id: str, entry_id: str) -> Optional[Dict[str, Any]]:
"""读取记忆(带租户隔离)"""
entry, found = self.locking_store.read_with_version(user_id, entry_id)
if not found:
return None
return entry.__dict__
def list_user_memories(self, user_id: str, memory_layer: Optional[str] = None) -> Dict[str, Any]:
"""列出用户的所有记忆(租户隔离)"""
return self.namespaced_store.list_user_memories(user_id, memory_layer)
def enforce_quota_limit(self, user_id: str):
"""
强制执行配额限制
在存储空间或条目数超限时触发:
1. 遗忘最不重要的条目
2. 压缩旧的对话历史
3. 发送告警通知
"""
quota = self.quotas.get(user_id, self.default_quota)
user_memories = self.namespaced_store.list_user_memories(user_id)
if len(user_memories) > quota.max_memory_entries:
# 触发遗忘机制
excess = len(user_memories) - quota.max_memory_entries
# ... 删除最不重要的 excess 条记忆
pass
13. 小结与下一步(更新)
本文建立了 Agent 记忆的四层架构,并深入讨论了四个进阶主题:
核心 takeaway:
- 记忆是分层的:不同信息有不同的生命周期和存储需求,不能"一刀切"
- Context Window 是硬约束:所有记忆最终都要在有限的 token 预算内竞争,需要精细的预算分配
- 遗忘是特性:没有遗忘机制的记忆系统最终会被噪声淹没
- 读写时机很关键:什么时候写入、什么时候检索、检索多少条——这些决策直接影响 Agent 的表现
- 从简单开始:先用内存 + 滑动窗口跑通,再逐步引入向量检索和 LLM 摘要
四个进阶主题总结
本文在基础四层架构之上,补充了四个生产级别的优化:
第 9 章 - 记忆分层决策树:解决"这条信息应该存哪里"的问题。提供了从信息特征自动路由到对应记忆层的决策框架,并给出了五个具体的示例及其对应的编码实现。
第 10 章 - 系统化遗忘机制:超越时间衰减的简单模型,介绍了三种高级策略:基于重要性的遗忘、基于查询热度的衰减、以及结合三维因素(时间 × 重要性 × 热度)的混合遗忘。给出了完整的 ForgettingManager 类实现。
第 11 章 - Embedding 成本优化:向量化的隐性成本往往被忽视。本章讨论增量 vs 全量 Embedding、哈希缓存(跳过内容未变的重新计算)、以及批量处理的吞吐优化,可以削减 40-60% 的 Embedding 成本。给出了 EmbeddingCostOptimizer 的完整实现。
第 12 章 - 多租户隔离与并发控制:当 Agent 服务多用户时,记忆隔离和并发一致性变成必需品。本章介绍了命名空间隔离、乐观锁 + 版本号的并发控制、以及租户级配额管理,给出了 MultiTenantMemoryStore 的框架代码。
在四层记忆中,Layer 4 Semantic Memory 的"读取"操作——即如何从大规模知识库中高效检索相关信息——是一个足够深的话题。它涉及 Ingestion、Chunking、Embedding、Hybrid Retrieval、Reranking 等一系列工程决策。
这正是下一篇文章的主题:RAG as Cognitive Memory: 检索增强生成的工程实践。我们将深入 RAG 管线的每一个环节,探讨如何为 Agent 构建高质量的"外部大脑"。
进一步思考
- Memory Consolidation:人类在睡眠中会将短期记忆"固化"为长期记忆。Agent 能否也有类似的机制——在空闲时对 Episodic Memory 做去重、聚合、抽象化?
- Shared Memory:多 Agent 协作场景下,如何设计共享记忆?一个 Agent 的发现如何高效传递给另一个 Agent?
- Memory as Skill:能否让 Agent 从记忆中"学会"新技能,而非仅仅"记住"过去的经验?比如从 10 次类似任务的记录中归纳出一个通用策略。
- Privacy-Aware Memory:用户说"忘记我刚才说的",记忆系统能否真正做到选择性遗忘?在向量数据库中,删除一条记录是否真的消除了它对其他向量的影响?
- Memory Hallucination:当 Episodic Memory 中存储了不准确的信息(比如一次错误的推理结论),它会不会在后续检索中"污染"Agent 的决策?如何设计记忆的"自校正"机制?
系列导航:本文是 Agentic 系列的第 08 篇。