post cover

技术热点落地:构建生产级 AI Agent 记忆系统(2026-04-29)


适用场景与目标

谁需要这套方案

  • 正在用 LangChain / LangGraph / Dify / 自研框架搭建 Agent 系统
  • Agent 多轮对话后期开始”失忆”——早期关键信息消失
  • 希望让 Agent 记住用户偏好、业务上下文,跨 session 保持一致性
  • 需要在长任务(数据分析、代码审查、项目管理)中维持任务状态

核心目标

  1. 短期记忆:在单次对话内管理 Context Window 高效利用
  2. 中期记忆:跨 session 记住用户偏好、项目背景、任务进度
  3. 长期记忆:积累业务知识,可被检索和遗忘

记忆层次架构

用户输入

┌─────────────────────────────────────┐
│  工作记忆 (Working Memory)          │  ← 本次对话上下文(LLM Context Window)
├─────────────────────────────────────┤
│  会话记忆 (Session Memory)          │  ← 本次会话(摘要化后存储)
├─────────────────────────────────────┤
│  项目/用户记忆 (Persistent Memory)  │  ← 跨会话(向量检索 + 结构化存储)
├─────────────────────────────────────┤
│  业务知识库 (Knowledge Base)        │  ← 静态知识(定期更新)
└─────────────────────────────────────┘

最小可行方案(MVP)步骤

Step 1:选型与安装依赖

# 推荐技术栈
pip install langchain langgraph faiss-cpu openai tiktoken pymemory

# 或者轻量替代(不用 LangChain)
pip install faiss-cpu sentence-transformers  # 向量检索
pip install sqlitebrowser  # 结构化存储(可选 PostgreSQL)

工具选型建议:

场景推荐方案原因
向量检索 < 10万条FAISS (免费、本地)零运维,够用
向量检索 > 10万条Qdrant / Milvus分布式、支持混合检索
结构化记忆存储SQLite → PostgreSQL事务保证,成本低
全文 + 向量混合Qdrant / Weaviate内置混合检索

Step 2:实现三层记忆存储

第一层:工作记忆(Context Window 管理)

核心原则:不要把所有历史都塞进 Context,分层才是正解。

# working_memory.py
from langchain.messages import HumanMessage, AIMessage, SystemMessage

class WorkingMemory:
    """单次对话的短期记忆,管理 Context Window 利用率"""
    
    def __init__(self, max_tokens: int = 128_000):
        self.max_tokens = max_tokens
        self.messages = []
        self.system_prompt = ""
    
    def add_user(self, content: str, tokens: int):
        self.messages.append(HumanMessage(content=content))
        self._maybe_summarize(tokens)
    
    def add_ai(self, content: str):
        self.messages.append(AIMessage(content=content))
    
    def set_system(self, prompt: str):
        self.system_prompt = prompt
    
    def _maybe_summarize(self, reserved_tokens: int = 8_000):
        """当使用量超过阈值时,对早期消息做摘要压缩"""
        current_tokens = sum(m.additional_kwargs.get("token_count", 0) 
                             for m in self.messages)
        if current_tokens > self.max_tokens - reserved_tokens:
            early_msgs = self.messages[:-10]  # 保留最近10条
            summary = self._summarize_messages(early_msgs)
            self.messages = [SystemMessage(content=f"对话摘要: {summary}")] \
                           + self.messages[-10:]
    
    def _summarize_messages(self, messages: list) -> str:
        # 调用 LLM 做摘要(可以用便宜的小模型)
        # 实际项目中建议用 GPT-4o-mini 或本地模型
        prompt = f"将以下对话压缩为50字以内的摘要,保留关键信息:\n{messages}"
        # return call_llm(prompt)
        return "[摘要占位,实际调用LLM]"
    
    def get_context(self) -> list:
        ctx = [SystemMessage(content=self.system_prompt)] if self.system_prompt else []
        return ctx + self.messages

第二层:会话记忆(Session Memory)

# session_memory.py
import json
import sqlite3
from datetime import datetime
from typing import Optional

class SessionMemoryStore:
    """跨会话持久化记忆,SQLite 实现"""
    
    def __init__(self, db_path: str = "memory.db"):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self._init_tables()
    
    def _init_tables(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS sessions (
                session_id TEXT PRIMARY KEY,
                user_id TEXT NOT NULL,
                created_at TEXT,
                updated_at TEXT,
                summary TEXT,          -- 会话摘要
                status TEXT DEFAULT 'active'  -- active / archived
            )
        """)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS session_messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id TEXT,
                role TEXT,            -- user / assistant
                content TEXT,
                created_at TEXT,
                FOREIGN KEY (session_id) REFERENCES sessions(session_id)
            )
        """)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS user_preferences (
                user_id TEXT PRIMARY KEY,
                preferences TEXT,     -- JSON 存储用户偏好
                updated_at TEXT
            )
        """)
        self.conn.commit()
    
    def save_session(self, session_id: str, user_id: str, summary: str):
        now = datetime.now().isoformat()
        self.conn.execute("""
            INSERT OR REPLACE INTO sessions (session_id, user_id, created_at, updated_at, summary)
            VALUES (?, ?, 
                COALESCE((SELECT created_at FROM sessions WHERE session_id=?), ?),
                ?, ?)
        """, (session_id, user_id, session_id, now, now, summary))
        self.conn.commit()
    
    def load_session(self, session_id: str) -> Optional[dict]:
        cur = self.conn.execute(
            "SELECT * FROM sessions WHERE session_id=?", (session_id,))
        row = cur.fetchone()
        if not row:
            return None
        return {"session_id": row[0], "user_id": row[1], "summary": row[4]}
    
    def get_user_preferences(self, user_id: str) -> dict:
        cur = self.conn.execute(
            "SELECT preferences FROM user_preferences WHERE user_id=?", (user_id,))
        row = cur.fetchone()
        return json.loads(row[0]) if row else {}
    
    def update_user_preferences(self, user_id: str, preferences: dict):
        now = datetime.now().isoformat()
        self.conn.execute("""
            INSERT OR REPLACE INTO user_preferences (user_id, preferences, updated_at)
            VALUES (?, ?, ?)
        """, (user_id, json.dumps(preferences, ensure_ascii=False), now))
        self.conn.commit()

第三层:长期记忆(向量检索 + 知识库)

# persistent_memory.py
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
import json

class PersistentMemory:
    """长期记忆:向量检索 + 可遗忘机制"""
    
    def __init__(self, embedding_model: str = "thenlper/gte-large-zh"):
        self.encoder = SentenceTransformer(embedding_model)
        self.index = faiss.IndexFlatL2(self.encoder.get_sentence_embedding_dimension())
        self.memory_store = []  # [{id, content, metadata, timestamp, access_count}]
        self.next_id = 0
    
    def add(self, content: str, metadata: dict = None):
        vec = self.encoder.encode([content])
        self.index.add(vec)
        self.memory_store.append({
            "id": self.next_id,
            "content": content,
            "metadata": metadata or {},
            "timestamp": datetime.now().isoformat(),
            "access_count": 0,
            "last_access": None
        })
        self.next_id += 1
    
    def retrieve(self, query: str, top_k: int = 5, user_id: str = None) -> list:
        vec = self.encoder.encode([query])
        scores, indices = self.index.search(vec, top_k * 3)  # 多取一些,后面过滤
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx < len(self.memory_store):
                mem = self.memory_store[idx]
                # 过滤:优先同用户记忆 + 提升高频访问
                if user_id and mem["metadata"].get("user_id") != user_id:
                    continue
                mem["access_count"] += 1
                mem["last_access"] = datetime.now().isoformat()
                mem["_relevance_score"] = float(score)
                results.append(mem)
                if len(results) >= top_k:
                    break
        return results
    
    def forget_low_value(self, threshold_access_count: int = 2, max_age_days: int = 90):
        """遗忘低价值记忆:很少访问 + 超过90天"""
        now = datetime.now()
        self.memory_store = [
            m for m in self.memory_store
            if not (m["access_count"] < threshold_access_count 
                    and (now - datetime.fromisoformat(m["timestamp"])).days > max_age_days)
        ]
        # 注意:删除后需要重建 FAISS 索引(全量重建)
        self._rebuild_index()
    
    def _rebuild_index(self):
        self.index.reset()
        if self.memory_store:
            vecs = self.encoder.encode([m["content"] for m in self.memory_store])
            self.index.add(vecs)

Step 3:组装完整的 Agent 记忆系统

# agent_with_memory.py
from langgraph.graph import StateGraph, END

class AgentWithMemory:
    
    def __init__(self, llm, config: dict):
        self.llm = llm
        self.working = WorkingMemory(max_tokens=config.get("max_context_tokens", 128_000))
        self.session = SessionMemoryStore(config.get("db_path", "memory.db"))
        self.persistent = PersistentMemory(config.get("embedding_model"))
        
        # 加载用户偏好到 System Prompt
        self.user_prefs = self.session.get_user_preferences(config["user_id"])
        self._update_system_prompt()
    
    def _update_system_prompt(self):
        prefs_text = json.dumps(self.user_prefs, ensure_ascii=False, indent=2)
        self.working.set_system(
            f"你是专业助手。用户偏好设置:\n{prefs_text}\n\n"
            "根据用户偏好调整回答风格和重点。"
        )
    
    def query(self, user_input: str) -> str:
        session_id = self._get_or_create_session()
        
        # 1. 从长期记忆检索相关上下文
        relevant_memories = self.persistent.retrieve(user_input, user_id=self.config["user_id"])
        
        # 2. 把检索到的记忆注入 Context
        memory_context = ""
        if relevant_memories:
            memory_lines = [f"[相关记忆 {i+1}] {m['content']}" 
                           for i, m in enumerate(relevant_memories)]
            memory_context = "\n".join(memory_lines)
        
        # 3. 构建完整上下文
        system_content = self.working.system_prompt
        if memory_context:
            system_content += f"\n\n## 相关长期记忆:\n{memory_context}"
        
        # 4. 执行推理
        response = self.llm.invoke(
            [{"role": "system", "content": system_content}]
            + [{"role": m.type.replace("human","user").replace("ai","assistant"), 
                "content": m.content} for m in self.working.messages[-20:]]
            + [{"role": "user", "content": user_input}]
        )
        
        # 5. 存储本次对话
        self.working.add_user(user_input, tokens=0)
        self.working.add_ai(response.content)
        
        # 6. 更新长期记忆(异步,避免阻塞)
        self._maybe_store_memory(user_input, response.content)
        
        return response.content
    
    def _maybe_store_memory(self, user_input: str, response: str):
        """判断是否需要将本次交互存入长期记忆"""
        # 简单策略:显式要求记忆 或 涉及偏好/决策
        if any(kw in user_input for kw in ["记住", "偏好", "以后都", "以后请"]):
            self.persistent.add(
                content=f"用户偏好/决策: {user_input}{response}",
                metadata={"user_id": self.config["user_id"], "type": "preference"}
            )

关键实现细节

1. 记忆压缩策略:不要每次都全量摘要

# 推荐:基于 token 阈值的渐进压缩
# ❌ 错误做法:每 N 条消息强制摘要(会丢失关键细节)
# ✅ 正确做法:保留"锚点信息",选择性摘要

ANCHOR_TYPES = {"preference", "decision", "requirement", "constraint"}

def smart_summarize(messages: list) -> tuple[str, list]:
    """
    返回 (摘要, 保留消息)
    策略:保留所有包含锚点信息的原始消息,压缩其他消息
    """
    anchors = []
    compressible = []
    
    for msg in messages:
        content = msg.content.lower()
        if any(anchor in content for anchor in ANCHOR_TYPES):
            anchors.append(msg)
        else:
            compressible.append(msg)
    
    # 对 compressible 做摘要,anchors 保留原文
    compressed_summary = _batch_summarize(compressible)
    return compressed_summary, anchors

2. 记忆一致性保证:使用版本号 + 乐观锁

# 给每次记忆写入加版本号,防止并发更新覆盖
class VersionedMemory:
    def __init__(self):
        self.locks = {}  # memory_id -> lock
    
    def update_with_version(self, memory_id: str, new_content: str, expected_version: int):
        with self.locks.setdefault(memory_id, asyncio.Lock()):
            current = self.get(memory_id)
            if current["version"] != expected_version:
                raise MemoryConflictError(
                    f"版本冲突: 期望 {expected_version}, 实际 {current['version']}"
                )
            self.save(memory_id, new_content, current["version"] + 1)

3. 记忆召回优化:Query Decomposition

# 避免单一 query 检索不全面
def decomposed_retrieve(memory: PersistentMemory, query: str, user_id: str, top_k: int = 5):
    """将复杂 query 分解为多个子 query,分别检索后合并"""
    sub_queries = [
        query,
        f"{query} 的背景",
        f"{query} 相关的偏好"
    ]
    
    all_results = []
    seen_ids = set()
    
    for sq in sub_queries:
        results = memory.retrieve(sq, user_id=user_id, top_k=top_k)
        for r in results:
            if r["id"] not in seen_ids:
                all_results.append(r)
                seen_ids.add(r["id"])
    
    # 按综合相关性排序
    all_results.sort(key=lambda x: x["_relevance_score"])
    return all_results[:top_k]

常见坑与规避清单

坑 1:Context Window 污染——把什么都往里塞

症状: 模型后期开始重复之前说过的内容、遗忘关键指令 原因: 简单拼接历史消息,没有做信息密度管理 规避:

  • messages.append(user_msg); messages.append(ai_msg) 无限堆积
  • ✅ 实现 token 预算 + 摘要压缩(见 Step 2 第一层代码)
  • ✅ 保留”锚点”(用户偏好、关键决策),其他压缩

坑 2:记忆检索”大海捞针”——向量相似度不精准

症状: 明明存了相关记忆,检索就是不出来 原因: 用户 query 表达和存储内容语义不完全对齐 规避:

  • ✅ 使用 Query Decomposition(分解检索)
  • ✅ 混合检索:向量 + 关键词(Qdrant / Weaviate 内置支持)
  • ✅ 在 metadata 中存储结构化标签,检索时做预过滤

坑 3:记忆写入风暴——每次交互都写,导致成本爆炸

症状: 记忆系统比 Agent 本身调用 LLM 还贵 规避:

  • ❌ 用户说的每句话都存
  • ✅ 只存”值得记住”的内容(用户偏好、关键决策、任务状态变更)
  • ✅ 使用异步批量写入,不阻塞主流程

坑 4:跨用户记忆泄露

症状: A 用户的偏好出现在了 B 用户的对话里 规避:

  • ✅ 检索时强制带 user_id 过滤
  • ✅ 用户记忆和公共知识库严格分开存储和检索
  • ✅ SQLite/PG 中用户记忆表用 user_id 做行级隔离

坑 5:遗忘机制永远不执行

症状: 记忆库越来越大,检索越来越慢,回答质量下降 规避:

  • ✅ 实现定时遗忘任务(每周一次,清理低价值 + 过期记忆)
  • ✅ 遗忘后重建向量索引(FAISS 删除后必须重建)
  • ✅ 设置记忆上限(如单个用户最多 1000 条)

坑 6:Session 摘要质量差

症状: 新 session 恢复后,Agent”失忆”严重 规避:

  • ✅ 摘要 prompt 要包含关键实体(人名、项目名、决策结果)
  • ✅ 多轮对话结束后立即摘要,趁信息新鲜
  • ✅ 重要记忆加 status: pinned,永远不自动遗忘

成本 / 性能 / 维护权衡

成本对比(单用户 / 月)

方案向量存储成本LLM 调用成本适用规模
全本地(FAISS + SQLite)~$0仅推理< 1000 用户
Qdrant 云(3GB)~$25/月仅推理1000~10万用户
Pinecone Serverless按请求计费仅推理变动流量
全 OpenAI API$0$10~$50/月取决于用量

建议: MVP 阶段用全本地方案(FAISS + SQLite),用户量破万后再迁移到 Qdrant。

性能目标

指标目标值优化手段
记忆检索延迟< 100ms向量索引预加载、限制 top_k
上下文构建延迟< 500ms异步加载、缓存热点记忆
记忆写入延迟< 200ms异步批量写入
记忆库规模(单用户)< 5000 条遗忘机制

维护成本

  • 数据备份:定期导出 SQLite/PG,避免丢失用户记忆
  • 索引重建:遗忘后 FAISS 索引需要全量重建(O(n)),大记忆库安排低峰期
  • 版本迁移:记忆 schema 变更时写迁移脚本,不要直接ALTER大表

一周内可执行行动清单

Day 1–2:理解需求,选型

  • 梳理你的 Agent 需要记住什么(用户偏好 / 项目状态 / 业务知识 / 全部?)
  • 评估用户规模:< 100 用户用 SQLite 本地,> 1000 用 Qdrant/PG
  • 决定三层记忆(工作 / 会话 / 长期)中哪些是 MVP 必须的

Day 3–4:实现核心模块

  • 克隆 WorkingMemory 类,调整 token 阈值(128k / 64k / 32k 视模型而定)
  • 搭建 SessionMemoryStore(SQLite),实现 session 创建 / 摘要 / 恢复
  • 部署 PersistentMemory(FAISS),跑通 add + retrieve 流程

Day 5:集成测试

  • 在 LangGraph / Dify 中接入记忆模块
  • 验证:用户说”记住我的偏好是 X” → 新 session 中 Agent 知道 X
  • 压测:连续 50 轮对话,验证 Context 不溢出

Day 6:处理边界情况

  • 实现遗忘机制(定时清理低价值记忆)
  • 测试多用户隔离(防止跨用户记忆泄露)
  • 添加记忆版本号,防止并发更新冲突

Day 7:上线 & 监控

  • 上线记忆系统,设置监控:检索延迟 / 写入成功率 / 记忆库大小
  • 配置备份策略(每日导出记忆库)
  • 写好降级预案:记忆系统挂了 → Agent 降级为无记忆模式(不要因此崩溃)

参考资源


记忆系统是 Agent 从”玩具”走向”生产力工具”的关键基础设施。一周 MVP 可以跑通,三周打磨后才能上生产。核心原则:分层管理、按需记忆、优雅遗忘。