跳到主要内容

XXL-JOB分片任务与并行调度

分片任务的背景与价值

在实际业务场景中,经常会遇到需要处理大量数据的定时任务,例如:每日对账、批量发送通知、数据清洗等。如果这些任务只在单个执行器上运行,可能需要很长时间才能完成,无法满足业务的时效性要求。

XXL-JOB的分片任务功能允许将一个大任务拆分成多个子任务,分配给不同的执行器并行处理,从而大幅提升任务执行效率。

分片任务的核心原理

分片参数传递机制

当调度中心触发分片任务时,会向每个执行器实例传递两个关键参数:

  • 分片索引(shardIndex):当前执行器的编号,从0开始
  • 分片总数(shardTotal):参与执行的执行器总数

路由策略配置

在XXL-JOB管理后台配置任务时,需要将路由策略设置为"分片广播",这样调度中心会将任务广播到所有在线的执行器实例。

分片任务实现详解

基于取模的数据分片

最常用的分片策略是对数据的唯一标识(如ID)进行取模运算,确保数据被均匀分配到各个执行器:

/**
* 商品库存同步任务
* 演示基于商品ID取模的分片处理
*/
@Component
public class ProductStockSyncHandler {

@Autowired
private ProductService productService;

@Autowired
private InventoryService inventoryService;

@XxlJob("productStockSyncHandler")
public void syncProductStock() {
// 获取分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

XxlJobHelper.log("分片参数: 当前分片={}, 总分片数={}",
shardIndex, shardTotal);

// 分页查询待同步的商品
int pageSize = 500;
int pageNum = 0;

while (true) {
List<Product> products = productService.queryProducts(pageNum, pageSize);

if (products.isEmpty()) {
break;
}

for (Product product : products) {
// 通过商品ID取模判断是否由当前分片处理
if (product.getId() % shardTotal == shardIndex) {
try {
// 同步商品库存到缓存
syncToCache(product);
XxlJobHelper.log("同步成功: 商品ID={}", product.getId());
} catch (Exception e) {
XxlJobHelper.log("同步失败: 商品ID={}, 错误={}",
product.getId(), e.getMessage());
}
}
}

pageNum++;
}

XxlJobHelper.log("分片{}处理完成", shardIndex);
}

private void syncToCache(Product product) {
// 查询实时库存
Integer stock = inventoryService.getStock(product.getId());
// 更新缓存
redisTemplate.opsForValue().set(
"stock:" + product.getId(),
stock,
Duration.ofMinutes(30)
);
}
}

分片执行流程

高级分片策略

基于数据库分片查询

直接在数据库查询时进行分片过滤,避免查询全量数据:

/**
* 会员积分过期清理任务
* 基于数据库分片查询的高效实现
*/
@Component
public class MemberPointsExpireHandler {

@Autowired
private MemberPointsMapper memberPointsMapper;

@XxlJob("memberPointsExpireHandler")
public void clearExpiredPoints() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

// 查询当前分片负责的过期积分记录
// SQL: WHERE member_id % #{shardTotal} = #{shardIndex}
// AND expire_time < NOW()
List<MemberPoints> expiredPoints = memberPointsMapper.queryExpiredBySharding(
shardIndex,
shardTotal,
LocalDateTime.now()
);

XxlJobHelper.log("当前分片查询到{}条过期积分记录", expiredPoints.size());

for (MemberPoints points : expiredPoints) {
try {
// 记录积分变动日志
logPointsChange(points);

// 清零过期积分
memberPointsMapper.clearExpiredPoints(points.getId());

// 发送过期通知
sendExpireNotification(points.getMemberId());

} catch (Exception e) {
XxlJobHelper.log("处理失败: 记录ID={}, 错误={}",
points.getId(), e.getMessage());
}
}
}
}

对应的Mapper查询:

/**
* 会员积分Mapper
*/
@Mapper
public interface MemberPointsMapper {

/**
* 按分片查询过期积分
*/
@Select("SELECT * FROM member_points " +
"WHERE member_id % #{shardTotal} = #{shardIndex} " +
"AND expire_time < #{expireTime} " +
"AND status = 'ACTIVE'")
List<MemberPoints> queryExpiredBySharding(
@Param("shardIndex") int shardIndex,
@Param("shardTotal") int shardTotal,
@Param("expireTime") LocalDateTime expireTime
);
}

基于业务规则的分片

根据业务特性设计分片规则,实现更灵活的任务分配:

/**
* 多地区物流状态同步任务
* 基于地区编码的业务分片
*/
@Component
public class RegionalLogisticsSyncHandler {

// 地区编码与分片的映射
private static final Map<String, Integer> REGION_SHARD_MAP = new HashMap<>();

static {
// 华东地区 -> 分片0
REGION_SHARD_MAP.put("SH", 0);
REGION_SHARD_MAP.put("JS", 0);
REGION_SHARD_MAP.put("ZJ", 0);

// 华北地区 -> 分片1
REGION_SHARD_MAP.put("BJ", 1);
REGION_SHARD_MAP.put("TJ", 1);
REGION_SHARD_MAP.put("HB", 1);

// 华南地区 -> 分片2
REGION_SHARD_MAP.put("GD", 2);
REGION_SHARD_MAP.put("GX", 2);
REGION_SHARD_MAP.put("HN", 2);
}

@XxlJob("regionalLogisticsSyncHandler")
public void syncLogisticsStatus() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

// 获取当前分片负责的地区
List<String> myRegions = REGION_SHARD_MAP.entrySet().stream()
.filter(entry -> entry.getValue() == shardIndex)
.map(Map.Entry::getKey)
.collect(Collectors.toList());

XxlJobHelper.log("分片{}负责的地区: {}", shardIndex, myRegions);

for (String regionCode : myRegions) {
syncRegionLogistics(regionCode);
}
}

private void syncRegionLogistics(String regionCode) {
// 查询该地区的待更新物流单
List<LogisticsOrder> orders = logisticsService.queryByRegion(regionCode);

for (LogisticsOrder order : orders) {
// 调用物流API查询最新状态
LogisticsStatus status = logisticsApi.queryStatus(order.getTrackingNo());

// 更新物流状态
logisticsService.updateStatus(order.getId(), status);
}
}
}

分片任务的数据一致性保障

避免数据遗漏与重复

在分片任务中,必须确保每条数据恰好被一个执行器处理:

处理执行器动态扩缩容

当执行器数量发生变化时,需要注意分片参数的变化:

/**
* 消息推送任务
* 处理执行器扩缩容场景
*/
@Component
public class MessagePushHandler {

@XxlJob("messagePushHandler")
public void pushMessages() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

// 记录本次执行的分片配置
XxlJobHelper.log("本次执行分片配置: index={}, total={}",
shardIndex, shardTotal);

// 使用数据库记录处理进度,支持断点续传
String progressKey = "message_push_progress_" + shardIndex + "_" + shardTotal;
Long lastProcessedId = getProgress(progressKey);

while (true) {
// 查询待推送消息(增量查询)
List<PushMessage> messages = messageMapper.queryUnpushed(
shardIndex,
shardTotal,
lastProcessedId,
1000 // 每批处理1000条
);

if (messages.isEmpty()) {
break;
}

for (PushMessage message : messages) {
// 推送消息
pushService.push(message);
lastProcessedId = message.getId();
}

// 更新处理进度
saveProgress(progressKey, lastProcessedId);
}
}
}

分片任务最佳实践

分片数量规划

任务幂等性设计

分片任务应当设计为幂等操作,即使同一数据被重复处理也不会产生副作用:

/**
* 账户余额对账任务
* 幂等性设计示例
*/
@Component
public class AccountReconciliationHandler {

@Autowired
private ReconciliationService reconciliationService;

@XxlJob("accountReconciliationHandler")
public void reconcile() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

// 获取今日对账批次号
String batchNo = generateDailyBatchNo();

List<Account> accounts = accountMapper.queryBySharding(shardIndex, shardTotal);

for (Account account : accounts) {
// 幂等检查:判断该账户今日是否已对账
if (reconciliationService.isReconciled(account.getId(), batchNo)) {
XxlJobHelper.log("账户{}今日已对账,跳过", account.getId());
continue;
}

try {
// 执行对账逻辑
ReconciliationResult result = doReconcile(account);

// 记录对账结果(包含批次号,保证幂等)
reconciliationService.saveResult(account.getId(), batchNo, result);

} catch (Exception e) {
XxlJobHelper.log("账户{}对账失败: {}", account.getId(), e.getMessage());
}
}
}

private String generateDailyBatchNo() {
return "RECON_" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
}
}

异常处理与告警

/**
* 分片任务异常处理模板
*/
@Component
public class ShardingTaskTemplate {

@Autowired
private AlertService alertService;

public void executeWithMonitoring(ShardingTask task) {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

long startTime = System.currentTimeMillis();
int successCount = 0;
int failCount = 0;
List<String> errors = new ArrayList<>();

try {
// 执行分片任务
ShardingResult result = task.execute(shardIndex, shardTotal);
successCount = result.getSuccessCount();
failCount = result.getFailCount();
errors = result.getErrors();

} catch (Exception e) {
// 任务整体异常
alertService.sendAlert(
"分片任务执行异常",
String.format("分片%d/%d执行失败: %s", shardIndex, shardTotal, e.getMessage())
);
throw e;

} finally {
long duration = System.currentTimeMillis() - startTime;

// 记录执行统计
XxlJobHelper.log("分片{}/{} 执行完成: 耗时={}ms, 成功={}, 失败={}",
shardIndex, shardTotal, duration, successCount, failCount);

// 失败率告警
if (failCount > 0 && failCount * 1.0 / (successCount + failCount) > 0.1) {
alertService.sendAlert(
"分片任务失败率过高",
String.format("分片%d失败率超过10%%,失败数: %d", shardIndex, failCount)
);
}
}
}
}

分片任务与普通任务对比

特性普通任务分片任务
执行器选择单个执行器所有在线执行器
处理能力受限于单机性能可水平扩展
适用场景小数据量任务大数据量批处理
实现复杂度简单需要设计分片逻辑
容错能力单点故障影响大局部故障影响小

总结

XXL-JOB的分片任务功能为大数据量的定时任务处理提供了高效的解决方案:

  • 核心机制:调度中心广播任务到所有执行器,每个执行器根据分片参数处理对应的数据切片
  • 分片策略:支持ID取模、数据库分片查询、业务规则分片等多种实现方式
  • 最佳实践:合理规划分片数量、设计幂等性逻辑、完善异常处理机制

通过分片任务,原本需要数小时才能完成的大批量数据处理,可以在多个执行器的并行协作下大幅缩短执行时间,有效提升系统的数据处理能力。