跳到主要内容

采集数据的任务-处理规则业务逻辑

本章节继续上一章节的讲解,在上一章中讲到了任务的处理,根据规则id和时间查询维度信息逻辑 的执行

org.javaup.job.DataJob#job

public void job(Long ruleId){
//删除消息记录
proxyDataJob.deleteToDayMessageRecord();
//消息的父级链路id
long messageParentTraceId = uidGenerator.getUid();
Date yesterdayDate = DateUtils.addDay(DateUtils.now(), -1);
//根据时间维度来创建出开始和结束时间
RequestTime requestTime = DateTimeFunc.createRequestTime(DateType.DAY, yesterdayDate);
//根据规则id和时间查询维度信息
//DimensionTransfers结构:视频id集合、视频分类id,父级的视频分类id
List<DimensionTransfers> dimensionTransfers = dimensionHandler.handleDimensionData(ruleId, requestTime);
for (DimensionTransfers dimensionTransfer : dimensionTransfers) {
//构建参数信息
ParamTransfers paramTransfers = new ParamTransfers();
//消息的父级链路id
paramTransfers.setMessageParentTraceId(messageParentTraceId);
//消息的链路id
paramTransfers.setMessageTraceId(uidGenerator.getUid());
paramTransfers.setRuleId(ruleId);
//采集的类型
paramTransfers.setRuleType(RuleType.GATHER);
//视频维度是videoId
paramTransfers.setVideoDimensionType(VideoDimensionType.VIDEO);
paramTransfers.setDimensionTransfers(dimensionTransfer);
//请求时间
paramTransfers.setRequestTime(requestTime);
paramTransfers.setCollectType(CollectType.SQL);
// 处理规则业务逻辑
ruleHandler.handle(paramTransfers);
}
}

那么继续分析获取到维度信息后的执行过程,也就是循环dimensionTransfers的过程

先不关乎消息记录的相关参数,只看和业务有关的,分别设置了以下参数:

paramTransfers.setRuleId(ruleId);
//采集的类型
paramTransfers.setRuleType(RuleType.GATHER);
//视频维度是videoId
paramTransfers.setVideoDimensionType(VideoDimensionType.VIDEO);
paramTransfers.setDimensionTransfers(dimensionTransfer);
//请求时间
paramTransfers.setRequestTime(requestTime);
paramTransfers.setCollectType(CollectType.SQL);

paramTransfers.setDimensionTransfers(dimensionTransfer) 这行将维度数据设置到了paramTransfers参数中,到此paramTransfers有了维度数据、时间数据了,可以看到paramTransfers的结构:

@Data
public class ParamTransfers {

private Long ruleId;

private RuleType ruleType;

private VideoDimensionType videoDimensionType;

private DimensionTransfers dimensionTransfers;

private RequestTime requestTime;

private CollectType collectType;
}

接下来就是真正处理规则业务逻辑了,也就是执行ruleHandler.handle(paramTransfers)

处理规则业务逻辑

org.javaup.handler.RuleHandler#handle

/**
* 处理规则业务逻辑
* */
public RuleHandleOutput handle(ParamTransfers paramTransfers){
//根据规则id查询指标信息
List<Metric> metricList = metricService.selectByRuleId(paramTransfers.getRuleId());
if (CollectionUtil.isEmpty(metricList)) {
return null;
}
TotalParamTransfers totalParamTransfers = new TotalParamTransfers(metricList, paramTransfers, null);
//根据规则的类型(采集 或者 查询)获取对应的数据处理器
DataHandler dataHandler =
dataHandlerContext.getDataHandler(paramTransfers.getRuleType().getCode());
if (Objects.isNull(dataHandler)){
return null;
}
//数据处理器执行
return dataHandler.dataHandle(totalParamTransfers);
}

根据规则id查询指标信息

List<Metric> metricList = metricService.selectByRuleId(paramTransfers.getRuleId());
public List<Metric> selectByRuleId(Long ruleId) {
LambdaQueryWrapper<Metric> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Metric::getRuleId, ruleId);
return metricMapper.selectList(wrapper);
}

这里的流程很简单,就是根据规则id从指标表中查询所有的此规则下的指标数据了

组装参数

TotalParamTransfers totalParamTransfers = new TotalParamTransfers(metricList, paramTransfers, null);

查询到了指标数据metricList后,设置到totalParamTransfers参数中,到此totalParamTransfers中的参数就组装完毕了,可以看下totalParamTransfers的结构

@AllArgsConstructor
@Data
public class TotalParamTransfers {

private List<Metric> metricList;

private ParamTransfers paramTransfers;

//省略...
}

数据处理的策略

//根据规则的类型(采集 或者 查询)获取对应的数据处理器
DataHandler dataHandler =
dataHandlerContext.getDataHandler(paramTransfers.getRuleType().getCode());

这行是根据规则类型获取对应的数据处理器,这里还是个策略模式

数据处理器接口

public interface DataHandler {

/**
* 处理业务逻辑
* @param totalParamTransfers 总参数传输对象
* @return 规则处理输出对象
* */
RuleHandleOutput dataHandle(TotalParamTransfers totalParamTransfers);

/**
* 获取规则类型(采集 或者 查询)
* @return 规则类型
* */
Integer getRuleType();
}

DataHandler定义了数据执行的方法dataHandle,和数据处理的策略getRuleType