跳到主要内容

采集数据的任务-维度数据的执行

本章将介绍数据采集任务的执行过程是怎样的,从此篇开始,会设计到大量的设计模式,代码会有点绕,但不用担心,星哥会把每一个步骤都写的非常的清晰,让小伙伴肯定能够容易得理解

采集数据的升级注意

项目中的数据采集是有维度升级的,具体的维度升级内容,可以查看:项目采集维度的升级

采集数据任务执行入口

@RestController
@RequestMapping("/api/job")
@Tag(name = "job", description = "任务")
public class JobController {

@Autowired
private DataJob dataJob;

@Operation(summary = "采集数据任务执行")
@PostMapping(value = "/execute")
public ApiResponse<Void> execute(@Valid @RequestBody JobDto jobDto) {
dataJob.job(jobDto.getRuleId());
return ApiResponse.ok();
}

}

这是一个API,可以通过请求来调用,也可以定时任务来执行

执行任务

org.javaup.job.DataJob#job

public void job(Long ruleId){
//删除消息记录
proxyDataJob.deleteToDayMessageRecord();
//消息的父级链路id
long messageParentTraceId = uidGenerator.getUid();
Date yesterdayDate = DateUtils.addDay(DateUtils.now(), -1);
//根据时间维度来创建出开始和结束时间
RequestTime requestTime = DateTimeFunc.createRequestTime(DateType.DAY, yesterdayDate);
//根据规则id和时间查询维度信息
//DimensionTransfers结构:视频id集合、视频分类id,父级的视频分类id
List<DimensionTransfers> dimensionTransfers = dimensionHandler.handleDimensionData(ruleId, requestTime);
for (DimensionTransfers dimensionTransfer : dimensionTransfers) {
//构建参数信息
ParamTransfers paramTransfers = new ParamTransfers();
//消息的父级链路id
paramTransfers.setMessageParentTraceId(messageParentTraceId);
//消息的链路id
paramTransfers.setMessageTraceId(uidGenerator.getUid());
paramTransfers.setRuleId(ruleId);
//采集的类型
paramTransfers.setRuleType(RuleType.GATHER);
//视频维度是videoId
paramTransfers.setVideoDimensionType(VideoDimensionType.VIDEO);
paramTransfers.setDimensionTransfers(dimensionTransfer);
//请求时间
paramTransfers.setRequestTime(requestTime);
paramTransfers.setCollectType(CollectType.SQL);
// 处理规则业务逻辑
ruleHandler.handle(paramTransfers);
}
}

在讲解流程时,为了不混淆,就先不讲解消息记录的相关内容,后续会有相应的篇幅来详细的讲解

背景与角色

  • DataJob: 数据采集任务入口,负责任务清理、时间窗口构建、维度拆解、参数装配与规则执行。
  • DimensionHandler: 根据规则和时间窗口,产出“维度传输对象”集合(如视频ID集合、分类ID等)。
  • RuleHandler: 执行具体的采集/处理规则(如 SQL 采集、下游入库或投递)。

根据时间维度来创建出开始和结束时间

RequestTime requestTime = DateTimeFunc.createRequestTime(DateType.DAY, yesterdayDate);
public static RequestTime createRequestTime(DateType dateType, Date date) {
RequestTime requestTime = new RequestTime();
Date startTime;
Date endTime;
switch (dateType) {
case DAY -> {
startTime = DateUtils.obtainStartDay(date);
endTime = DateUtils.obtainEndDay(date);
}
case WEEK -> {
startTime = DateUtils.obtainStartWeek(date);
endTime = DateUtils.obtainEndWeek(date);
}
case MONTH -> {
startTime = DateUtils.obtainStartMonth(date);
endTime = DateUtils.obtainEndMonth(date);
}
case YEAR -> {
startTime = DateUtils.obtainStartYear(date);
endTime = DateUtils.obtainEndYear(date);
}
default -> {
throw new DockDataCenterFrameException("Unsupported date type: " + dateType);
}
}
requestTime.setGatherDate(date);
requestTime.setDateType(dateType);
requestTime.setStartTime(startTime);
requestTime.setEndTime(endTime);
return requestTime;
}

首先要构建出要查询的时间参数,包括:开始时间、结束时间。一开始执行肯定是 的维度,所以执行的是DateTimeFunc.createRequestTime(DateType.DAY, yesterdayDate)

通过此方法内部的逻辑能看到,可以通过 天、周、月、年的时间类型来获得对应的开始时间、结束时间