为什么写这篇文章?

市面上很多 AI 对话系统 Demo 代码长这样:

history = [{"role": "user", "content": question}]
response = llm.chat(history)

但当项目走进实际业务场景时,这套逻辑会立即崩溃,原因包括:

  • 用户输入不是单轮,而是多轮多流程
  • 模型不是直接回答,而是分阶段决策:分析 → 拆解 → 生成 → 查询
  • 模型输出不再是一个回复,而是多个节点逐步产出
  • 用户随时刷新页面,需要可恢复的上下文与历史

于是,一个简单的问答系统,需要演进为:

多用户 + 多会话 + 多轮交互 + 多节点流图 + 状态持久化 + 前端可视化还原

本文将基于 LangGraph 构建一个支持上述功能的企业级对话系统,实现以下目标:

✅ 每轮对话可分为多个执行步骤
✅ 每一步执行结果可落库并展示
✅ 流式反馈进度,前端实时渲染
✅ 数据可恢复,还原历史对话

技术栈与架构简介

核心组件

技术 用途
LangGraph 定义每轮对话流程,支持多节点
FastAPI 提供后端 API 服务
SQLAlchemy + MySQL 数据持久化(会话、节点)
SSE(Server-Sent Events) 实时推送执行进度给前端
Redis(可选) 缓存状态或限流
cryptography 加密数据库配置信息

架构流程简图

[用户提问]

[LangGraph 构建流程图(多个节点)]

[每节点执行并通过 SSE 通知前端]

[节点信息保存至数据库]

[轮次结束后写入最终模型回复]

[前端实时渲染 + 后台持久保存]

数据结构设计

数据结构设计动机

在构建一个企业级多轮对话系统时,尤其是结合 LangGraph 多节点执行模型,我们的目标不仅是保存“用户输入”和“模型输出”,还需要支持:

  1. 多节点流式反馈与可视化回放
  2. 会话历史查询与恢复
  3. 节点异常监控与调试
  4. 多人/多线程并发处理的隔离能力
  5. 安全合规(如数据库连接信息加密)

因此,传统的简单“对话表”设计远远不够,最终我们采用了分层式三表设计方案:

为什么要拆成三张表?

核心抽象是:

  • 会话(Session):一段时间内的对话窗口(类似于聊天群)
  • 轮次(Round):用户一次提问和 AI 的一次完整响应过程
  • 节点(Node):LangGraph 中每个具体处理逻辑的执行记录

表结构设计

sessions:全局上下文容器

动机:

  • 为每一个用户的完整会话分配一个唯一的 session_id,方便跟踪;
  • 存储生命周期(创建时间 / 结束时间)、状态标识(是否结束);
  • 为后续可能的扩展(如会话配置信息、权限等)预留空间;
  • 内嵌数据库连接信息(用于动态调用外部数据源),但加密保存,确保安全。
session_id    VARCHAR PRIMARY KEY
user_id INT
start_time DATETIME
end_time DATETIME
status ENUM('active', 'inactive', 'ended')
db_info TEXT -- 加密存储数据库连接配置

每个用户开启一个对话框就对应一个 session

conversation_rounds:对话轮次管理

动机:

  • 多轮对话需要明确的“起止点”,即用户一次输入 + AI 一次主回复;
  • 每轮对话拥有独立索引 (round_index),支持分页显示、导航;
  • 可直接在列表页展示每轮提问和回答(摘要视图);
  • 若任务链失败,也能看到该轮终止在哪一阶段,利于排错。
id            INT PRIMARY KEY AUTO_INCREMENT
session_id VARCHAR
round_index INT
user_input TEXT
ai_reply TEXT
start_time DATETIME
end_time DATETIME

用户提问后,系统会新建一轮 round,并记录本轮最终 AI 的完整回答

round_nodes:LangGraph 节点轨迹追踪器

动机:

  • LangGraph 中每个“节点”都是一个子任务,可能执行耗时、可能失败;
  • 保存每个节点的开始、结束、失败等状态,可用于 实时展示离线回放
  • 支持前端复原“任务流转图”(如计划 -> SQL生成 -> 执行);
  • 支持后续统计分析:哪个节点最耗时?哪个节点最常报错?
id            INT PRIMARY KEY AUTO_INCREMENT
round_id INT
node VARCHAR -- 节点名称
node_type VARCHAR -- 节点类型
content TEXT -- 节点输出内容
node_status ENUM('start', 'finish', 'error')
timestamp DATETIME

LangGraph 中每个节点的执行状态,都完整记录,可供前端回放和审计分析

表结构层级关系图

Session(1)
└── Round(N)
└── Node(N)
  • 一个会话包含多个轮次;
  • 每个轮次包含多个节点处理记录;
  • 数据分层清晰、利于扩展、调试与复用。

与传统对话系统的区别

项目 传统对话系统 当前设计
对话粒度 用户-AI 一问一答 用户问 + 多节点 + AI答复
失败节点排查 不支持 精确到节点、时间、状态
多节点过程展示 支持流式展示 & 历史回放
AI 回复 统一存储 拆分最终回复与中间输出
上下文扩展能力 强,支持权限、配置信息等

节点持久化最佳实践模板

为什么要在每个节点落库?

  1. 前端可见:让用户看到系统在干什么(任务流程透明)
  2. 可追溯:调试时你知道哪个环节出错
  3. 可恢复:中断后用户刷新仍可回显完整流程

节点代码模板


writer = get_stream_writer()

# 通知前端:开始执行该节点
writer({
"event": "message",
"data": make_sse("plan", "str", "开始生成计划...", "start")
})

result = await plan_tool.generate(state.question)

# 持久化中间节点结果
session_manager.append_round_node(
round_id=state.round_id,
node="plan",
node_type="str",
content=result["plan"],
status="finish"
)

writer({
"event": "message",
"data": make_sse("plan", "str", result["plan"], "finish")
})

后端 SSE 实现与流式反馈

FastAPI 中定义 SSE 接口非常直观:

graph = build_db_graph()

async def event_generator():
async for mode, chunk in graph.astream(state, stream_mode=["updates", "custom"]):
if mode == "custom":
yield {"event": "message", "data": chunk}

每个节点向 writer 发送自定义事件 → 传入 custom 通道 → 推送给前端


会话管理的统一入口:SessionManager

封装了统一的 SessionManager

  • 创建会话:create_session(user_id)
  • 新建轮次:create_round(session_id, user_input)
  • 记录节点:append_round_node(round_id, node, node_type, content, status)
  • 结束轮次:finalize_round(round_id, ai_reply)
  • 获取历史:get_rounds_with_nodes(session_id)

数据库配置安全加密

为每个 session 支持绑定数据库连接:

# 设置 DB info(加密后存入 session 表)
session_manager.set_db_info(session_id, db_info)

# 获取 DB info(解密)
session_manager.get_db_info_by_session(session_id)

加密使用 cryptography.Fernet 实现,避免明文密码存储风险


前端如何使用这些数据?

流式渲染场景

  • 用户提问
  • 后端开始执行节点,SSE 推送:{"node": "plan", "nodeProcess": "start", "data": "开始生成计划"}
  • 后端执行完节点,SSE 推送:{"node": "plan", "nodeProcess": "finish", "data": "SELECT xxx FROM..."}

为什么推给前端的数据格式这样设计?

{
"event": "message",
"data": {
"code": 0,
"msg": "ok",
"data": {
"node": "plan",
"nodeProcess": "start",
"nodeType": "str",
"nodeSplitData": "开始生成计划..."
}
}
}

这种格式是专为「前端渲染驱动」设计的结构化事件消息,目标是:

支持实时「流式渲染」

LangGraph 结构决定了每个节点可能有较长的耗时,我们希望前端能逐个节点更新 UI(如流程图、卡片、日志等)而不是等整轮完成。

字段含义如下:

字段 含义
event SSE 必需字段,标明事件类型
node 节点名称,如 plan/sql/execute
nodeProcess 当前节点的状态:start / finish / error
nodeType 渲染类型:字符串、表格、图形(如 "str", "table", "chart"
nodeSplitData 当前节点的实际输出内容

这种结构让前端可以做到:

  • 渐进式渲染(Progressive Rendering)
  • 精细控制每个节点的 UI 更新(start → loading → finish)

如何让前端支持「历史回放」?

用户刷新页面或重新打开旧会话时,需要还原之前的完整执行过程。SSE 是实时流式传输的机制,前端不能从 SSE 本身恢复历史。因此,要做:

数据库完整持久化

后端通过 round_nodes 表记录了每个节点的输出、状态、类型:

round_id node node_type content node_status timestamp
23 plan str 开始生成计划… start 2024-06-25 12:00:01
23 plan str SELECT name FROM… finish 2024-06-25 12:00:04

接口:/sessions/{session_id}/rounds

该接口返回内容结构如下:

{
"session_id": "abc123",
"rounds": [
{
"round_index": 1,
"user_input": "请分析销售数据",
"ai_reply": "好的,以下是分析结果...",
"nodes": [
{
"node": "plan",
"node_type": "str",
"content": "开始生成计划...",
"node_status": "start"
},
{
"node": "plan",
"node_type": "str",
"content": "SELECT * FROM sales ...",
"node_status": "finish"
}
]
}
]
}

前端回放逻辑

  • 页面加载时请求 /rounds 数据
  • 按顺序遍历每一轮和每个节点
  • 模拟 SSE 的渲染逻辑(例如执行 writer({...}) 的等价操作)
  • 渲染卡片 / 节点组件,实现「可回放」体验

总结与建议

模块 建议实践
LangGraph 节点 每节点落库,每步反馈状态
前端展示 用节点状态流式渲染,支持回放
会话设计 多用户 + 多轮次 + 多节点粒度建模
DB info 建议加密存储,避免明文泄露
快速调试 为每步添加日志和异常捕获,便于排查错误

核心优点

通过 SessionRoundNode 的结构化设计,加上 SSE 流式推送与数据库持久化,构建了一套可扩展的「对话交互回放机制」。其主要优势如下:

特性 描述
可追溯 每个 LangGraph 节点的输出、执行状态都持久化,便于调试与溯源。
可回放 前端可基于历史节点还原完整对话路径,支持页面刷新与会话审计。
可扩展 数据结构清晰,适用于对接更复杂的对话系统、多轮问答、多步任务规划。
可复用 节点执行逻辑与存储解耦,后续可迁移至消息队列、异步任务或链式服务架构。
易调试 每个节点状态都有数据库与 SSE 日志双重记录,便于排错和性能分析。
安全性好 db_info 等敏感数据加密存储,符合企业安全要求。

不止 LangGraph,还能用于哪些场景?

虽然设计是基于 LangGraph 的 DAG 流程,但这套机制具有高度通用性:

场景类型 如何适配这套机制
多轮对话系统(如 RAG) 每次检索、生成、评分作为节点记录,支持重放和调试流程。
自动化问卷 / 表单流程 每个步骤节点记录用户输入与提示生成内容,可复用节点日志。
数据分析管道(ETL/ELT) 每个分析 / 转换阶段持久化,结合 SSE 可实时报告执行进度。
多模态对话 / 多 Agent 系统 每个 agent 的响应节点化,流程清晰、记录完整。

下一步

为了进一步提高系统的稳定性、扩展性和企业级能力,可以考虑以下方向:

多用户数据隔离

  • 当前通过 session.user_id 字段支持用户关联
  • 建议增加权限校验、Token 认证、数据库层隔离支持

节点异步执行与分布式调度

  • 将每个节点封装为独立任务(如 Celery / Arq)
  • 当前结构已可支持通过 node_status 标记执行阶段
  • 增强消息队列适配能力,支持更大规模任务并发

审计日志与监控

  • 对节点异常、慢任务、用户输入进行审计分析
  • 添加每轮总耗时、失败率等字段用于监控平台集成(如 Prometheus)

数据脱敏与访问控制

  • 引入字段级脱敏策略,避免日志记录泄漏业务数据
  • 接入统一身份认证系统(如 OAuth、JWT)

可复用模板

模块 功能
SessionManager 封装对话结构操作(session/round/node)
SSEWriter 标准化推送格式和流控制
LangGraphNode 节点执行逻辑模板(支持状态更新 + DB 存储)
NodeReplayEngine 将历史节点恢复为 stream 推送

写在最后

这套基于 LangGraph 的对话系统架构可以迁移到更广泛的智能应用系统中:

  • AI 辅助编程
  • 数据问答
  • 智能分析助手
  • 低代码流程引擎

“如果你重视每一步对话,就必须记住每一个节点的痕迹。”
—— 可观测的智能系统,才是稳定且可控的。

后续继续完善,继续更新~