跳到主要内容

单个子问题的检索流程

上一篇我们看到 retrieve 方法会并发调用 retrieveSingleSubQuestion 来检索每个子问题。这个方法是整个检索引擎的核心,负责把一个子问题转换成最终可用的证据文档。这篇就来详细拆解这个方法的完整流程。

retrieveSingleSubQuestion 方法签名

先看方法签名和注释:

/**
* 检索单个子问题。
* 该方法会并发调度所有可用检索通道,然后执行闸门过滤、RRF 融合、父块提升、rerank 和最终裁剪。
*
* @param subQuestionIndex 子问题序号,从 1 开始
* @param subQuestion 子问题文本
* @param plan 执行计划
* @param usedChannels 全局已使用通道集合
* @param notes 全局检索备注
* @param traceRecorder 检索追踪器
* @return 当前子问题对应的证据对象
*/
private SubQuestionEvidence retrieveSingleSubQuestion(int subQuestionIndex,
String subQuestion,
ConversationExecutionPlan plan,
List<String> usedChannels,
List<String> notes,
ConversationTraceRecorder traceRecorder) {
// ... 方法实现
}

这个方法接收 6 个参数:

  • subQuestionIndex:子问题序号(从 1 开始),用于日志和调试
  • subQuestion:子问题文本,比如"Java 的特点是什么?"
  • plan:执行计划,包含检索范围、文档 ID 等信息
  • usedChannels:全局已使用通道集合(同步列表),用于记录哪些通道被使用了
  • notes:全局检索备注(同步列表),用于记录检索过程中的提示信息
  • traceRecorder:检索追踪器,用于记录详细的检索观测数据

完整的方法实现

让我们看完整的源码:

private SubQuestionEvidence retrieveSingleSubQuestion(int subQuestionIndex,
String subQuestion,
ConversationExecutionPlan plan,
List<String> usedChannels,
List<String> notes,
ConversationTraceRecorder traceRecorder) {

// 针对每个子问题,把所有"支持当前 plan 的通道"并发跑起来,最大化利用不同召回能力。
List<CompletableFuture<RetrievalChannelResult>> futures = retrievalChannels.stream()

.filter(channel -> channel.supports(plan))
.map(channel -> CompletableFuture.supplyAsync(() -> channel.retrieve(subQuestion, plan), executorService)
// 每个通道同样有独立超时,避免某一路检索长期阻塞。
.orTimeout(Math.max(properties.getChannelTimeoutMs(), 1L), TimeUnit.MILLISECONDS)
.exceptionally(throwable -> {

// 通道异常时记录根因并自动降级为空结果,避免单通道失败拖垮当前子问题。
Throwable rootCause = unwrapThrowable(throwable);
log.warn("检索通道失败: subQuestionIndex={}, subQuestion='{}', channel='{}', exceptionType={}, message={}",
subQuestionIndex,
subQuestion,
channel.channelName(),
rootCause == null ? "" : rootCause.getClass().getName(),
rootCause == null ? "" : rootCause.getMessage(),
throwable);
notes.add("子问题" + subQuestionIndex + "通道[" + channel.channelName() + "]检索失败或超时,已自动降级。");
return new RetrievalChannelResult(channel.channelName(), List.of());
}))
.toList();
if (futures.isEmpty()) {
// 如果没有任何通道支持当前计划,直接返回空证据,并写入提示说明。
notes.add("子问题" + subQuestionIndex + "没有可用的检索通道。");
return new SubQuestionEvidence(subQuestionIndex, subQuestion, List.of(), new ArrayList<>(), List.of(), 0, 0, 0);
}

// 原始结果保留的是通道直接召回的候选,用于后续观测记录和过滤前后对比。
List<RetrievalChannelResult> rawChannelResults = futures.stream()
.map(CompletableFuture::join)
.filter(result -> result.getDocuments() != null)
.toList();

// 对不同通道应用对应的证据闸门,过滤掉分数过低的候选文档。
List<RetrievalChannelResult> channelResults = rawChannelResults.stream()
.map(this::applyEvidenceGate)
.toList();

// 构建每个通道"召回数 / 通过闸门数"的观察轨迹,供调试界面展示。
List<SubQuestionChannelTrace> channelTraces = buildChannelTraces(rawChannelResults, channelResults);

channelResults.stream()
.filter(result -> !result.getDocuments().isEmpty())
// 只要某个通道最终贡献了有效候选,就把它标记为"本轮实际使用过"。
.forEach(result -> markUsedChannel(usedChannels, result.getChannelName()));

// 用 RRF 融合多个通道的候选,减少单一通道偏差,并让多通道共同命中的文档获得更高优先级。
List<Document> mergedCandidates = fuseByRrf(channelResults);

// 将分块级证据尽量提升回父块,避免最终回答只看到过短的孤立片段。
List<Document> parentCandidates = documentKnowledgeService.elevateToParentBlocks(
mergedCandidates,
properties.getParentEvidenceMaxChars()
);

// 对父块候选再做 rerank,把与当前子问题最相关的内容排到更前面。
List<Document> rerankedCandidates = applyRerank(subQuestion, parentCandidates, usedChannels);

// 最终只保留 topK 证据,控制后续 Prompt 的上下文体积。
List<Document> finalDocuments = rerankedCandidates.stream()
.limit(properties.getFinalTopK())
.toList();

// 写入子问题检索摘要,便于前端或日志快速查看各通道命中情况。
notes.add("子问题" + subQuestionIndex + "检索完成:"
+ summarizeChannelResults(channelResults)
+ ",final=" + finalDocuments.size());

if (traceRecorder != null) {
try {
// 记录通道执行情况与候选文档选择情况,供后续调试页回放完整检索过程。
recordChannelObservations(traceRecorder, subQuestionIndex, subQuestion,
rawChannelResults, channelResults, channelTraces);
recordRetrievalResultObservations(traceRecorder, subQuestionIndex, subQuestion,
rawChannelResults, channelResults, mergedCandidates, rerankedCandidates, finalDocuments);
} catch (RuntimeException exception) {
log.warn("记录检索观测数据失败, subQuestionIndex={}", subQuestionIndex, exception);
}
}

// 当前子问题的最终输出是"最终文档 + 引用占位 + 通道轨迹 + 过程统计"。
return new SubQuestionEvidence(
subQuestionIndex,
subQuestion,
finalDocuments,
new ArrayList<>(),
channelTraces,
mergedCandidates.size(),
parentCandidates.size(),
rerankedCandidates.size()
);
}

这个方法的流程比较长,我们分步骤详细讲解。

付费内容提示

该文档的全部内容仅对「JavaUp项目实战&技术讲解」知识星球用户开放

加入星球后,你可以获得:

  • 超级八股文:100万+字的全栈技术知识库,涵盖技术核心、数据库、中间件、分布式等深度剖析的讲解
  • 讲解文档:超级AI智能体、黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的从0到1的详细文档
  • 讲解视频:超级AI智能体、黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的核心业务详细讲解
  • 1 对 1 解答:可以对我进行1对1的问题提问,而不仅仅只限于项目
  • 针对性服务:有没理解的地方,文档或者视频还没有讲到可以提出,本人会补充
  • 面试与简历指导:提供面试回答技巧,项目怎样写才能在简历中具有独特的亮点
  • 中间件环境:对于项目中需要使用的中间件,可直接替换成我提供的云环境
  • 面试后复盘:小伙伴去面试后,如果哪里被面试官问住了,可以再找我解答
  • 远程的解决:如果在启动项目遇到问题,本人可以帮你远程解决
进入星球后,即可享受上述所有服务,保证不会再有其他隐藏费用。
知识星球二维码

1. 打开微信 -> 扫描左侧二维码 -> 加入「JavaUp项目实战&技术讲解」知识星球

2. 查看星球使用指导,获取完整项目讲解资料索引

👉 点击解锁全部付费内容
🎁优惠