Kafka 消费与文本内容解析
上一篇讲完了文档上传的同步链路——文件校验、MinIO 上传、数据库入库,最后把消息丢进了 Kafka。那 Kafka 消费方拿到消息之后到底做了什么?这篇就来拆这条异步链路的前半段:从消费消息开始,到文本提取与清洗完成为止。
先上总览流程图。
异步解析链路总览
Kafka 消费者:消息入口
消费端的入口在 DocumentKafkaConsumer,它的职责很单一——把 JSON 消息反序列化,然后转交给异步处理服务。
/**
* 消费"解析路由"消息。
* <p>
* 这一步是上传完成后的异步链入口:收到消息后,会把 documentId 和 taskId 交给异步处理服务,
* 继续执行文档下载、正文解析、结构节点生成、策略推荐等步骤。
* </p>
*/
@KafkaListener(
topics = SPRING_INJECT_PREFIX_DISTINCTION_NAME + "-" + "${app.manage.kafka.parse-topic}",
groupId = "${app.manage.kafka.group-id}-parse")
public void consumeParseRoute(String payload) {
try {
// 先把 JSON 还原成强类型消息对象,避免后续处理层直接面对原始字符串。
DocumentParseRouteMessage message = objectMapper.readValue(payload,
DocumentParseRouteMessage.class);
// 真正的业务推进放到异步处理服务中,这里只承担"消费并转发"的职责。
asyncProcessService.handleParseRoute(message.getDocumentId(), message.getTaskId());
}
catch (Exception exception) {
// 消费失败只记录日志,不让异常继续向外冒泡破坏监听线程。
log.error("消费解析路由消息失败,payload={}", payload, exception);
}
}
几个要点:
- topic 名称是
环境前缀-配置的parseTopic,和上传端发送时用的是同一个 topic groupId带了-parse后缀,和索引构建的消费组区分开- 整个方法用 try-catch 包住,消费失败只打日志不抛异常——这是为了防止一条坏消息把整个消费线程搞挂
- Consumer 本身不做任何业务逻辑,纯粹是"反序列化 + 转发"
付费内容提示
该文档的全部内容仅对「JavaUp项目实战&技术讲解」知识星球用户开放
加入星球后,你可以获得:
- 超级八股文:100万+字的全栈技术知识库,涵盖技术核心、数据库、中间件、分布式等深度剖析的讲解
- 讲解文档:超级AI智能体、黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的从0到1的详细文档
- 讲解视频:超级AI智能体、黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的核心业务详细讲解
- 1 对 1 解答:可以对我进行1对1的问题提问,而不仅仅只限于项目
- 针对性服务:有没理解的地方,文档或者视频还没有讲到可以提出,本人会补充
- 面试与简历指导:提供面试回答技巧,项目怎样写才能在简历中具有独特的亮点
- 中间件环境:对于项目中需要使用的中间件,可直接替换成我提供的云环境
- 面试后复盘:小伙伴去面试后,如果哪里被面试官问住了,可以再找我解答
- 远程的解决:如果在启动项目遇到问题,本人可以帮你远程解决
进入星球后,即可享受上述所有服务,保证不会再有其他隐藏费用。
