跳到主要内容

进阶特性与源码探秘

//TODO (待定是否保留)

前面几篇我们学会了搭建Agent、给它配工具、让它有记忆。但在生产环境中,还有一些关键问题需要解决:

  • 敏感操作(如支付、退款)能让Agent自动执行吗?
  • 应用重启后,Agent怎么记住之前的对话?
  • ReactAgent底层到底是怎么运行的?

这篇我们来逐一攻破。

高危操作的安全阀:Human in the Loop

为什么需要人工审批

想象一个场景:用户对Agent说"帮我把这个订单退掉",Agent二话不说就调用退款接口,钱就退了。

这在某些场景下可能是你想要的。但很多时候,我们希望在执行敏感操作前让人类确认一下

  • 涉及资金的操作(支付、退款、转账)
  • 不可逆的操作(删除数据、发送通知)
  • 涉及权限的操作(修改配置、停止服务)
高危操作必须人工审批

涉及资金的操作(支付、退款、转账)、不可逆的操作(删除数据、发送通知)、涉及权限的操作(修改配置、停止服务),不应允许 Agent 全自动执行,必须引入 HITL 机制,在关键节点让人类介入审批。

HITL工作流程

HITL执行流程
HITL执行流程

Spring AI Alibaba实现

Spring AI Alibaba通过HumanInTheLoopHook实现HITL,整体分三步:配置中断、响应中断、恢复执行。

第一步:配置哪些工具需要审批

// 创建HITL Hook,指定哪些工具需要审批
HumanInTheLoopHook hitlHook = HumanInTheLoopHook.builder()
.approvalOn("process_refund", ToolConfig.builder()
.description("退款操作需要人工确认")
.build())
.approvalOn("process_payment", ToolConfig.builder()
.description("支付操作需要人工确认")
.build())
.build();

// 持久化Saver(HITL需要检查点来处理中断)
MemorySaver saver = new MemorySaver();

// 创建Agent
ReactAgent agent = ReactAgent.builder()
.name("payment_agent")
.model(chatModel)
.tools(refundTool, paymentTool, queryTool)
.hooks(List.of(hitlHook))
.saver(saver)
.build();

第二步:调用Agent,处理中断

String threadId = "order_session_001";
RunnableConfig config = RunnableConfig.builder()
.threadId(threadId)
.build();

// 第一次调用
Optional<NodeOutput> result = agent.invokeAndGetOutput(
"帮我退掉订单ORD123的款,金额299元",
config
);

// 检查是否返回中断
if (result.isPresent() && result.get() instanceof InterruptionMetadata) {
InterruptionMetadata interruption = (InterruptionMetadata) result.get();

// 获取待审批的工具调用信息
for (InterruptionMetadata.ToolFeedback feedback : interruption.toolFeedbacks()) {
System.out.println("待审批工具:" + feedback.getName());
System.out.println("调用参数:" + feedback.getArguments());
System.out.println("审批说明:" + feedback.getDescription());
// 输出示例:
// 待审批工具:process_refund
// 调用参数:{"orderId": "ORD123", "amount": 299}
// 审批说明:退款操作需要人工确认
}

// 这里可以把信息展示给用户,让用户决定
// 实际场景中可能是弹窗、发通知、进审批流程等
}

第三步:人工决策后恢复执行

// 假设人工批准了退款
InterruptionMetadata.Builder feedbackBuilder = InterruptionMetadata.builder()
.nodeId(interruption.node())
.state(interruption.state());

// 构造审批结果
for (InterruptionMetadata.ToolFeedback toolFeedback : interruption.toolFeedbacks()) {
InterruptionMetadata.ToolFeedback approved = InterruptionMetadata.ToolFeedback
.builder(toolFeedback)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.APPROVED)
.build();
feedbackBuilder.addToolFeedback(approved);
}

InterruptionMetadata approvalMetadata = feedbackBuilder.build();

// 带上审批结果恢复执行
RunnableConfig resumeConfig = RunnableConfig.builder()
.threadId(threadId)
.addMetadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY, approvalMetadata)
.build();

Optional<NodeOutput> finalResult = agent.invokeAndGetOutput("", resumeConfig);
System.out.println("执行结果:" + finalResult.get());

三种审批结果

人工审批可以返回三种结果:

结果说明后续行为
APPROVED批准使用原参数执行工具
EDITED修改使用修改后的参数执行
REJECTED拒绝不执行工具,告知Agent被拒绝原因

修改参数示例

// 人工审核时发现金额有误,修改参数后批准
InterruptionMetadata.ToolFeedback edited = InterruptionMetadata.ToolFeedback
.builder(toolFeedback)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.EDITED)
.arguments("{\"orderId\": \"ORD123\", \"amount\": 199}") // 修改金额
.build();

拒绝示例

// 人工拒绝退款
InterruptionMetadata.ToolFeedback rejected = InterruptionMetadata.ToolFeedback
.builder(toolFeedback)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.REJECTED)
.rejectionReason("该订单已超过退款期限,无法退款")
.build();

完整示例

@RestController
@RequestMapping("/payment")
public class PaymentAgentController {

@Autowired
private ChatModel chatModel;

private MemorySaver saver = new MemorySaver();

@PostMapping("/chat")
public ResponseEntity<?> chat(@RequestBody ChatRequest request) {
ReactAgent agent = buildAgent();
RunnableConfig config = RunnableConfig.builder()
.threadId(request.getSessionId())
.build();

Optional<NodeOutput> result = agent.invokeAndGetOutput(request.getMessage(), config);

if (result.isPresent() && result.get() instanceof InterruptionMetadata) {
// 返回需要审批的信息
InterruptionMetadata meta = (InterruptionMetadata) result.get();
return ResponseEntity.ok(Map.of(
"status", "PENDING_APPROVAL",
"pendingTools", meta.toolFeedbacks()
));
}

// 正常返回结果
return ResponseEntity.ok(Map.of(
"status", "COMPLETED",
"response", result.map(Object::toString).orElse("")
));
}

@PostMapping("/approve")
public ResponseEntity<?> approve(@RequestBody ApprovalRequest request) {
ReactAgent agent = buildAgent();

// 构造审批结果
InterruptionMetadata approvalMetadata = buildApprovalMetadata(request);

RunnableConfig config = RunnableConfig.builder()
.threadId(request.getSessionId())
.addMetadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY, approvalMetadata)
.build();

Optional<NodeOutput> result = agent.invokeAndGetOutput("", config);

return ResponseEntity.ok(Map.of(
"status", "COMPLETED",
"response", result.map(Object::toString).orElse("")
));
}

private ReactAgent buildAgent() {
HumanInTheLoopHook hitlHook = HumanInTheLoopHook.builder()
.approvalOn("process_refund", ToolConfig.builder()
.description("退款需要确认").build())
.build();

return ReactAgent.builder()
.name("payment_agent")
.model(chatModel)
.tools(ToolCallbacks.from(new PaymentTools()))
.hooks(List.of(hitlHook))
.saver(saver)
.build();
}
}

让Agent拥有长期记忆

上一篇我们简单提了Memory,这里详细聊聊持久化方案。

持久化记忆的必要性

MemorySaver是基于内存的,应用一重启,所有对话历史都没了。生产环境肯定不能这样。持久化记忆的好处:

  • 应用重启后对话可继续
  • 支持分布式部署(多实例共享记忆)
  • 可做数据分析和审计

四种内置 Saver 实现

Spring AI Alibaba 内置了四种 Saver 实现,覆盖了从开发到生产的各种场景:

Saver存储方式特点适用场景
MemorySaver内存(ConcurrentHashMap)速度快,重启丢失开发测试、短期对话、演示
FileSystemSaver文件系统(JSON 文件)简单持久化,单机有效单机部署、数据量小
MysqlSaverMySQL 数据库关系型数据库,支持事务生产环境、已有 MySQL
PostgresSaverPostgreSQL 数据库功能强大,JSON 支持好生产环境、已有 PostgreSQL
截图提示

这里可以补充 Spring AI Alibaba 源码中 Saver 实现类的截图,展示 com.alibaba.cloud.ai.graph.store 包下的类结构。

MySQL持久化方案

@Configuration
public class AgentMemoryConfig {

@Bean
public MysqlSaver mysqlSaver(DataSource dataSource) {
return new MysqlSaver.Builder()
.dataSource(dataSource)
.build();
}
}

@Service
public class PersistentAgent {

@Autowired
private ChatModel chatModel;

@Autowired
private MysqlSaver mysqlSaver;

public String chat(String message, String userId) {
ReactAgent agent = ReactAgent.builder()
.name("persistent_agent")
.model(chatModel)
.saver(mysqlSaver)
.build();

RunnableConfig config = RunnableConfig.builder()
.threadId("user_" + userId)
.build();

return agent.call(message, config).getText();
}
}

框架会自动创建两张表:

  • graph_thread:存储会话线程信息
  • graph_checkpoint:存储对话状态检查点
表结构说明

graph_thread 表:保存线程基础信息

  • thread_id:线程唯一标识(对应你传的 sessionId/userId)
  • created_at:创建时间
  • updated_at:最后更新时间

graph_checkpoint 表:保存对话状态的检查点

  • thread_id:关联的线程 ID
  • checkpoint_id:检查点 ID
  • state:序列化后的状态数据(包含消息历史)
  • parent_checkpoint_id:父检查点,用于回溯

每次 Agent 对话完成后,框架会自动写入一个新的 checkpoint。下次对话时,通过 threadId 读取最新的 checkpoint 恢复上下文。

不同Saver对比

Saver特点适用场景
MemorySaver内存存储,速度快开发测试、短期对话
MysqlSaverMySQL持久化生产环境、需要关系型数据库
PostgresSaverPostgreSQL持久化生产环境、需要PostgreSQL
FileSystemSaver文件系统存储单机部署、简单持久化

自定义Saver

如果内置的Saver不满足需求,可以自己实现:

public class RedisSaver implements BaseSaver {

private RedisTemplate<String, Object> redisTemplate;

@Override
public void save(String threadId, OverAllState state) {
redisTemplate.opsForValue().set("agent:state:" + threadId, state);
}

@Override
public Optional<OverAllState> load(String threadId) {
Object state = redisTemplate.opsForValue().get("agent:state:" + threadId);
return Optional.ofNullable((OverAllState) state);
}

// ... 其他方法
}

ReactAgent源码原理揭秘

了解原理能帮助你更好地使用框架,遇到问题也知道从哪儿下手排查。

整体架构

ReactAgent 底层架构

ReactAgent底层基于**Graph(图)实现。Graph由Node(节点)Edge(边)**组成:

  • Node:执行具体逻辑的单元
  • Edge:定义节点之间的流转规则

核心节点:Model Node 负责调用大模型进行推理,Tool Node 负责执行模型决定调用的工具。

ReactAgent底层架构
ReactAgent底层架构

核心节点:

  • Model Node:调用大模型进行推理
  • Tool Node:执行模型决定调用的工具

执行流程

当你调用agent.call(message)时,发生了什么?

ReactAgent执行流程
ReactAgent执行流程

简单说:

  1. 用户输入进来,构建成Prompt
  2. 调用LLM,看看返回的是普通文本还是Tool Call
  3. 如果有Tool Call,执行工具,把结果加到上下文,再调LLM
  4. 如果没有Tool Call,说明任务完成,返回结果
  5. 这个循环就是ReAct的"思考→行动→观察"

关键源码解读

1. ReactAgent初始化Graph

ReactAgent.initGraph()方法中,框架构建了Graph的结构:

// 伪代码,展示核心逻辑
protected void initGraph() {
// 添加Model节点
graph.addNode("model", node_async(this::llmNode));

// 添加Tool节点
if (hasTools) {
graph.addNode("tool", node_async(this::toolNode));
}

// 添加边(路由规则)
graph.addConditionalEdges("model",
state -> hasToolCalls(state) ? "tool" : END);

graph.addConditionalEdges("tool",
state -> shouldContinue(state) ? "model" : END);
}

2. Model Node执行逻辑

AgentLlmNode的核心是调用大模型:

// 伪代码
public Map<String, Object> apply(OverAllState state, RunnableConfig config) {
// 构建消息列表
List<Message> messages = buildMessages(state);

// 调用模型
ChatResponse response = chatModel.call(new Prompt(messages, chatOptions));

// 更新状态
state.updateMessage(response.getResult().getOutput());

return state.data();
}

3. Tool Node执行逻辑

AgentToolNode负责执行工具调用:

// 伪代码
public Map<String, Object> apply(OverAllState state, RunnableConfig config) {
// 获取上一次模型输出中的Tool Calls
AssistantMessage lastMessage = getLastAssistantMessage(state);
List<ToolCall> toolCalls = lastMessage.getToolCalls();

// 执行每个工具调用
List<ToolResponseMessage> responses = new ArrayList<>();
for (ToolCall call : toolCalls) {
String result = executeToolCall(call);
responses.add(new ToolResponseMessage(call.id(), result));
}

// 将工具结果加入状态
state.addMessages(responses);

return state.data();
}

4. 边的路由逻辑

路由决定了节点之间的跳转:

// Model节点后的路由
private String routeAfterModel(OverAllState state) {
AssistantMessage lastMessage = getLastAssistantMessage(state);

if (lastMessage.hasToolCalls()) {
return "tool"; // 有Tool Call,去Tool节点
} else {
return END; // 没有Tool Call,结束
}
}

// Tool节点后的路由
private String routeAfterTool(OverAllState state) {
// 检查是否有工具标记了return_direct
if (allToolsReturnDirect(state)) {
return END; // 直接返回结果
}
return "model"; // 回到Model节点继续推理
}

Hooks是怎么工作的

Hooks通过在Graph中注入额外的节点实现:

// 伪代码
if (hasHooks) {
// 在Model节点前后插入Hook节点
graph.addNode("before_model_hook", hookNode);
graph.addNode("after_model_hook", hookNode);

// 修改边
graph.addEdge("before_model_hook", "model");
graph.addEdge("model", "after_model_hook");
}

HITL是怎么实现的

HumanInTheLoopHook实现了InterruptableAction接口,关键方法是interrupt()

// 伪代码
public Optional<InterruptionMetadata> interrupt(OverAllState state, RunnableConfig config) {
// 检查是否已有人工反馈
if (hasHumanFeedback(config)) {
// 验证反馈是否合法
if (validateFeedback(config)) {
return Optional.empty(); // 放行,继续执行
}
}

// 检查是否有受控工具被调用
List<ToolCall> controlledCalls = findControlledToolCalls(state);
if (!controlledCalls.isEmpty()) {
// 构造中断元数据
return Optional.of(buildInterruptionMetadata(controlledCalls));
}

return Optional.empty(); // 不中断
}
HITL 中断原理

interrupt()方法在每个节点执行前被调用。如果返回了InterruptionMetadata,Graph执行引擎会立即暂停,保存当前状态,等待人工反馈后再恢复。

小结

这篇我们深入了Agent的进阶特性:

Human in the Loop

  • 用于敏感操作的人工审批
  • 三步流程:配置中断→响应中断→恢复执行
  • 三种审批结果:APPROVED、EDITED、REJECTED

长期记忆持久化

  • MysqlSaver/PostgresSaver用于生产环境
  • 框架自动维护graph_threadgraph_checkpoint
  • 可自定义Saver实现特殊需求

ReactAgent源码原理

  • 基于Graph(图)实现
  • Model Node负责调用LLM
  • Tool Node负责执行工具
  • 通过Edge(边)的路由规则控制流转
  • Hooks通过在Graph中注入节点实现

理解了这些原理,你就能更自如地使用框架,遇到问题也知道从哪儿排查。

至此,Agent开发实战系列就告一段落了。从概念到架构,从协作到实战,从入门到进阶,希望这几篇能帮你建立起对AI Agent开发的整体认知,在实际项目中能派上用场。

🎁优惠