异步索引构建:初始化与切块执行
上一篇我们看到,同步链路最后把消息丢进了 Kafka。这篇我们来看消费端拿到消息之后,到底做了什么——这才是索引构建的真正核心。
handleIndexBuild() 方法是整个异步链路的主控方法,它会串行推进多个阶段。这篇先讲前两个阶段:初始化和切块执行。先看一张全景图:
异步执行全景图
阶段一:初始化
方法一进来,先把三个核心数据拿到手:文档、任务、策略方案。任何一个缺失都直接退出。同时还会预先读取并排序策略步骤:
DocumentAsyncProcessServiceImpl.java — handleIndexBuild()
// 先取出索引构建链路必需的三类核心数据:文档、任务、策略方案。
// 任意一个缺失都说明这条异步消息已经失去执行基础,直接记录告警并退出。
SuperAgentDocument document = documentMapper.selectById(documentId);
SuperAgentDocumentTask task = taskMapper.selectById(taskId);
SuperAgentDocumentStrategyPlan plan = planMapper.selectById(planId);
if (document == null || task == null || plan == null) {
log.warn("索引任务对应的数据不存在,documentId={}, taskId={}, planId={}",
documentId, taskId, planId);
return;
}
然后把任务推进到运行态,同时更新文档和策略步骤的状态:
DocumentAsyncProcessServiceImpl.java — listSteps()
private List<SuperAgentDocumentStrategyStep> listSteps(Long planId) {
List<SuperAgentDocumentStrategyStep> stepList = stepMapper.selectList(
new LambdaQueryWrapper<SuperAgentDocumentStrategyStep>()
.eq(SuperAgentDocumentStrategyStep::getPlanId, planId)
.eq(SuperAgentDocumentStrategyStep::getStatus, BusinessStatus.YES.getCode()));
return stepList.stream()
.sorted(Comparator
.comparingInt((SuperAgentDocumentStrategyStep step) -> pipelineOrder(step.getPipelineType()))
.thenComparing(SuperAgentDocumentStrategyStep::getStepNo)
.thenComparing(SuperAgentDocumentStrategyStep::getId))
.toList();
}
排序规则是:先父流水线、再子流水线;同一流水线内按 stepNo 升序,最后再按主键兜底。这样保证后续切块执行时步骤顺序是稳定的。
付费内容提示
该文档的全部内容仅对「JavaUp项目实战&技术讲解」知识星球用户开放
加入星球后,你可以获得:
- 超级八股文:100万+字的全栈技术知识库,涵盖技术核心、数据库、中间件、分布式等深度剖析的讲解
- 讲解文档:超级AI智能体、黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的从0到1的详细文档
- 讲解视频:超级AI智能体、黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的核心业务详细讲解
- 1 对 1 解答:可以对我进行1对1的问题提问,而不仅仅只限于项目
- 针对性服务:有没理解的地方,文档或者视频还没有讲到可以提出,本人会补充
- 面试与简历指导:提供面试回答技巧,项目怎样写才能在简历中具有独特的亮点
- 中间件环境:对于项目中需要使用的中间件,可直接替换成我提供的云环境
- 面试后复盘:小伙伴去面试后,如果哪里被面试官问住了,可以再找我解答
- 远程的解决:如果在启动项目遇到问题,本人可以帮你远程解决
进入星球后,即可享受上述所有服务,保证不会再有其他隐藏费用。
