RagChatExecutor执行器的主流程
上一篇我们搞清楚了执行器是怎么被选中的。当用户选择"当前文档问答"模式时,系统会通过注册表拿到 RagChatExecutor 实例,然后调用它的 execute 方法。这篇就来拆解这个方法到底做了什么。
execute 方法的整体结构
先看 RagChatExecutor 的完整源码:
/**
* 知识问答执行器。
* 该执行器负责标准的 RAG 路径,主流程为:
* 1. 根据执行计划发起检索
* 2. 整理检索结果和引用
* 3. 组装系统 Prompt / 用户 Prompt
* 4. 调用模型基于证据流式生成答案
*/
@Component
public class RagChatExecutor implements ConversationExecutor {
private final RagRetrievalEngine ragRetrievalEngine;
private final RagPromptAssemblyService ragPromptAssemblyService;
private final StreamEventWriter streamEventWriter;
private final ObservedChatModelService observedChatModelService;
public RagChatExecutor(RagRetrievalEngine ragRetrievalEngine,
RagPromptAssemblyService ragPromptAssemblyService,
StreamEventWriter streamEventWriter,
ObservedChatModelService observedChatModelService) {
this.ragRetrievalEngine = ragRetrievalEngine;
this.ragPromptAssemblyService = ragPromptAssemblyService;
this.streamEventWriter = streamEventWriter;
this.observedChatModelService = observedChatModelService;
}
@Override
public ExecutionMode mode() {
return ExecutionMode.RETRIEVAL;
}
/**
* 执行 RAG 知识问答主流程。
*
* @param taskInfo 当前任务
* @return 模型输出的流式文本结果
*/
@Override
public Flux<String> execute(TaskInfo taskInfo) {
ConversationExecutionPlan plan = taskInfo.executionPlan();
// 先通知前端:当前正在进入知识检索规划阶段。
ExecutorEventSupport.publishThinking(taskInfo, streamEventWriter, "正在根据问题规划知识检索范围。");
ConversationTraceRecorder.StageHandle retrieveStage = taskInfo.traceRecorder() == null
? null
: taskInfo.traceRecorder().startStage(ConversationTraceStageCode.RAG_RETRIEVE, mode().name(), "正在执行双通道混合检索。", null);
// 检索通常涉及数据库、向量索引或外部 IO,因此切到弹性线程池执行,避免阻塞响应链路。
return Mono.fromCallable(() -> ragRetrievalEngine.retrieve(plan, taskInfo.traceRecorder()))
.subscribeOn(Schedulers.boundedElastic())
.doOnError(error -> {
if (taskInfo.traceRecorder() != null) {
// 检索阶段失败时,优先把阶段状态标记为失败,便于后续调试。
taskInfo.traceRecorder().failStage(retrieveStage, "RAG 检索失败。", error.getMessage(), null);
}
})
.doOnSuccess(context -> {
if (taskInfo.traceRecorder() != null && context != null) {
// 检索成功后,把使用的通道、子问题、引用情况等埋点信息完整记录下来。
taskInfo.traceRecorder().completeStage(retrieveStage, "RAG 检索完成。", Map.of(
"retrievalQuestion", StrUtil.blankToDefault(context.getRetrievalQuestion(), ""),
"usedChannels", context.getUsedChannels() == null ? List.of() : context.getUsedChannels(),
"retrievalNotes", context.getRetrievalNotes() == null ? List.of() : context.getRetrievalNotes(),
"referenceCount", context.flattenReferences().size(),
"subQuestionCount", context.getSubQuestionEvidenceList() == null ? 0 : context.getSubQuestionEvidenceList().size(),
// ... 省略详细的子问题和引用信息
));
}
})
// 检索完成后,继续进入"证据整理 -> Prompt 组装 -> 模型回答"的后续链路。
.flatMapMany(context -> streamFromRetrievalContext(taskInfo, plan, context));
}
}
这个方法看起来不长,但其实包含了 RAG 知识问答的核心逻辑。我们一步步拆解。
执行流程的三个阶段
整个 execute 方法可以分成三个阶段:
阶段一:发送 thinking 状态
// 先通知前端:当前正在进入知识检索规划阶段。
ExecutorEventSupport.publishThinking(taskInfo, streamEventWriter, "正在根据问题规划知识检索范围。");
这行代码会通过 SSE 向前端推送一个 thinking 事件,告诉用户"我正在思考"。这样用户就不会看到一片空白,体验会好很多。
阶段二:启动检索阶段追踪
ConversationTraceRecorder.StageHandle retrieveStage = taskInfo.traceRecorder() == null
? null
: taskInfo.traceRecorder().startStage(ConversationTraceStageCode.RAG_RETRIEVE, mode().name(), "正在执行双通道混合检索。", null);
这里启动了一个"阶段追踪句柄"。系统会记录这个检索阶段的开始时间、状态、输入输出等信息,最终落库到调试表中。这样后续可以在调试页面看到每个阶段的耗时和执行情况。
为什么要追踪阶段?
RAG 问答链路比较长,包括检索、Prompt 组装、模型回答等多个阶段。如果某次回答质量不好或者速度慢,我们需要知道是哪个阶段出了问题。阶段追踪就是为了解决这个问题——每个阶段都会记录开始时间、结束时间、输入输出、成功失败状态,方便后续排查。
付费内容提示
该文档的全部内容仅对「JavaUp项目实战&技术讲解」知识星球用户开放
加入星球后,你可以获得:
- 超级八股文:100万+字的全栈技术知识库,涵盖技术核心、数据库、中间件、分布式等深度剖析的讲解
- 讲解文档:超级AI智能体、黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的从0到1的详细文档
- 讲解视频:超级AI智能体、黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的核心业务详细讲解
- 1 对 1 解答:可以对我进行1对1的问题提问,而不仅仅只限于项目
- 针对性服务:有没理解的地方,文档或者视频还没有讲到可以提出,本人会补充
- 面试与简历指导:提供面试回答技巧,项目怎样写才能在简历中具有独特的亮点
- 中间件环境:对于项目中需要使用的中间件,可直接替换成我提供的云环境
- 面试后复盘:小伙伴去面试后,如果哪里被面试官问住了,可以再找我解答
- 远程的解决:如果在启动项目遇到问题,本人可以帮你远程解决
进入星球后,即可享受上述所有服务,保证不会再有其他隐藏费用。
