跳到主要内容

Redis消息队列与通信模式

Redis消息通信机制概览

Redis作为高性能的内存数据库,不仅可以用于数据缓存,还提供了多种消息通信和队列机制。从早期的Pub/Sub模式,到后来引入的Stream数据结构,Redis在消息队列领域逐渐完善。

消息通信方案对比

发布订阅模式详解

Pub/Sub基本原理

Redis的发布订阅(Publish/Subscribe)是一种消息传递模式,发布者将消息发送到指定频道,所有订阅该频道的客户端都会实时收到消息。

核心概念:

  • 发布者(Publisher):向频道发送消息的客户端
  • 订阅者(Subscriber):订阅频道并接收消息的客户端
  • 频道(Channel):消息传递的逻辑通道

实战应用场景

场景1:实时聊天系统

/**
* 聊天室实现:多人在线聊天
*/
public class ChatRoomService {
private Jedis jedis = new Jedis("localhost", 6379);

// 发布消息到聊天室
public void sendMessage(String roomId, String username, String message) {
String channel = "chatroom:" + roomId;
String msg = username + ": " + message;

// 发布消息到频道
jedis.publish(channel, msg);

// 同时保存到历史记录(用LIST存储最近100条)
String historyKey = "chatroom:" + roomId + ":history";
jedis.lpush(historyKey, msg);
jedis.ltrim(historyKey, 0, 99);
}

// 订阅聊天室消息
public void subscribeRoom(String roomId) {
JedisPubSub listener = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("[" + channel + "] " + message);
// 在实际应用中,这里会通过WebSocket推送给客户端
}

@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("成功订阅频道: " + channel);
}
};

// 阻塞订阅(通常在单独的线程中执行)
jedis.subscribe(listener, "chatroom:" + roomId);
}
}

场景2:系统日志监控

/**
* 日志聚合系统:实时收集各服务的错误日志
*/
public class LogMonitorService {

// 日志级别频道
private static final String ERROR_CHANNEL = "logs:error";
private static final String WARNING_CHANNEL = "logs:warning";
private static final String INFO_CHANNEL = "logs:info";

// 发布日志
public void publishLog(String level, String serviceName, String message) {
Jedis jedis = new Jedis("localhost", 6379);
String channel = "logs:" + level.toLowerCase();

// 构造日志消息
String logMessage = String.format("[%s] %s - %s - %s",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
serviceName,
level,
message
);

jedis.publish(channel, logMessage);
}

// 监控错误日志
public void monitorErrors() {
Jedis jedis = new Jedis("localhost", 6379);

JedisPubSub monitor = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// 收到错误日志,触发告警
System.err.println("⚠️ 错误告警: " + message);
sendAlertToAdmin(message);
}
};

// 订阅错误和警告日志
new Thread(() -> {
jedis.subscribe(monitor, ERROR_CHANNEL, WARNING_CHANNEL);
}).start();
}

private void sendAlertToAdmin(String message) {
// 发送邮件、短信或钉钉通知
}
}

场景3:缓存更新通知

/**
* 分布式缓存同步:缓存更新时通知其他节点
*/
public class CacheInvalidationService {
private Jedis jedis = new Jedis("localhost", 6379);
private LocalCache localCache = new LocalCache();

// 更新数据并通知其他节点
public void updateProduct(Product product) {
// 1. 更新数据库
productDao.update(product);

// 2. 更新本地缓存
localCache.put("product:" + product.getId(), product);

// 3. 发布缓存失效消息
String channel = "cache:invalidate:product";
jedis.publish(channel, String.valueOf(product.getId()));
}

// 订阅缓存失效消息
public void subscribeCacheInvalidation() {
JedisPubSub listener = new JedisPubSub() {
@Override
public void onMessage(String channel, String productId) {
// 收到通知,清除本地缓存
localCache.remove("product:" + productId);
System.out.println("已清除商品缓存: " + productId);
}
};

new Thread(() -> {
jedis.subscribe(listener, "cache:invalidate:product");
}).start();
}
}

场景4:实时数据推送

/**
* 股票行情推送:实时推送股票价格变动
*/
public class StockPriceService {

// 发布股票价格更新
public void publishPriceUpdate(String stockCode, double price, double change) {
Jedis jedis = new Jedis("localhost", 6379);
String channel = "stock:price:" + stockCode;

// 构造价格数据(JSON格式)
String priceData = String.format(
"{\"code\":\"%s\",\"price\":%.2f,\"change\":%.2f,\"time\":\"%s\"}",
stockCode, price, change,
new SimpleDateFormat("HH:mm:ss").format(new Date())
);

jedis.publish(channel, priceData);
}

// 订阅自选股价格变动
public void subscribeMyStocks(List<String> stockCodes) {
Jedis jedis = new Jedis("localhost", 6379);

JedisPubSub listener = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// 解析价格数据
System.out.println("价格更新: " + message);
// 实际应用中通过WebSocket推送到前端
notifyWebSocketClient(message);
}
};

// 订阅多只股票
String[] channels = stockCodes.stream()
.map(code -> "stock:price:" + code)
.toArray(String[]::new);

new Thread(() -> {
jedis.subscribe(listener, channels);
}).start();
}
}

模式匹配订阅

Redis支持通配符订阅,可以同时订阅多个符合模式的频道:

/**
* 使用模式匹配订阅多个频道
*/
public void subscribeWithPattern() {
Jedis jedis = new Jedis("localhost", 6379);

JedisPubSub listener = new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.printf("模式[%s] 频道[%s]: %s%n", pattern, channel, message);
}
};

// 订阅所有订单相关的频道
// 可以匹配: order:created, order:paid, order:shipped 等
jedis.psubscribe(listener, "order:*");

// 订阅所有用户操作频道
// 可以匹配: user:login, user:logout, user:register 等
jedis.psubscribe(listener, "user:*");
}

Pub/Sub的优缺点

优点:

  1. 实时性高:消息发布后立即推送给所有订阅者,延迟极低
  2. 灵活性强:订阅者可以随时订阅或取消订阅,支持模式匹配
  3. 解耦合:发布者和订阅者不需要知道对方的存在
  4. 支持多订阅者:一个消息可以被多个订阅者同时接收

缺点:

  1. 消息不持久化:消息发布后如果没有订阅者在线,消息会丢失
  2. 可靠性低:订阅者断线期间的消息无法恢复
  3. 无消息确认机制:发布者不知道消息是否被成功接收
  4. 不适合高频场景:大量消息会增加订阅者的处理压力

使用建议

Pub/Sub适合以下场景:

  • ✓ 实时通知、即时消息
  • ✓ 缓存同步、配置更新
  • ✓ 实时数据推送、监控告警
  • ✗ 不适合可靠性要求高的业务
  • ✗ 不适合需要消息持久化的场景

对于需要可靠消息传递的场景,建议使用Redis Stream。

Redis Stream深度剖析

Stream的诞生背景

在Redis 5.0之前,开发者常用Pub/Sub实现消息队列,但存在消息易丢失的问题。Redis Stream应运而生,提供了一个功能完善、持久化的消息流数据结构。

Stream核心特性

Stream数据结构

Stream可以看作是一个有序的消息日志,每个消息都有一个唯一的ID(时间戳+序列号):

Stream: task_queue
├── 1701234567890-0: {type: "email", to: "user@example.com", subject: "Welcome"}
├── 1701234568123-0: {type: "sms", phone: "13800138000", content: "验证码"}
├── 1701234569456-0: {type: "push", userId: "1001", message: "新订单通知"}
└── 1701234570789-0: {type: "email", to: "admin@example.com", subject: "Alert"}

核心命令详解

XADD - 添加消息

/**
* 向Stream中添加消息
*/
public String addMessage(Jedis jedis, String streamKey, Map<String, String> message) {
// 使用 * 让Redis自动生成消息ID(当前时间戳)
String messageId = jedis.xadd(streamKey, null, message);
return messageId;
}

// 使用示例:发送邮件任务
Map<String, String> emailTask = new HashMap<>();
emailTask.put("type", "email");
emailTask.put("to", "customer@example.com");
emailTask.put("subject", "订单确认");
emailTask.put("content", "您的订单已确认");

String msgId = addMessage(jedis, "task:queue", emailTask);
System.out.println("消息ID: " + msgId); // 输出: 1701234567890-0

XREAD - 读取消息

/**
* 从Stream中读取消息
*/
public void readMessages(Jedis jedis, String streamKey) {
// 从头开始读取5条消息
List<Entry<String, List<StreamEntry>>> results =
jedis.xread(5, 0, new AbstractMap.SimpleEntry<>(streamKey, "0-0"));

for (Entry<String, List<StreamEntry>> entry : results) {
String stream = entry.getKey();
List<StreamEntry> messages = entry.getValue();

for (StreamEntry msg : messages) {
System.out.println("消息ID: " + msg.getID());
System.out.println("消息内容: " + msg.getFields());
}
}
}

// 阻塞读取新消息
public void readNewMessages(Jedis jedis, String streamKey, String lastId) {
// BLOCK 5000 表示最多阻塞5秒
List<Entry<String, List<StreamEntry>>> results =
jedis.xread(
XReadParams.xReadParams().block(5000),
Collections.singletonMap(streamKey, lastId)
);

// 处理新消息
if (results != null && !results.isEmpty()) {
// ...
}
}

XGROUP - 消费者组

消费者组允许多个消费者协作处理消息流,实现负载均衡:

/**
* 消费者组实战:任务分发系统
*/
public class TaskDistributionSystem {
private Jedis jedis = new Jedis("localhost", 6379);
private static final String STREAM_KEY = "task:queue";
private static final String GROUP_NAME = "task_workers";

// 创建消费者组
public void createConsumerGroup() {
try {
// 从Stream的起始位置创建消费者组
jedis.xgroupCreate(STREAM_KEY, GROUP_NAME, "0", false);
System.out.println("消费者组创建成功");
} catch (Exception e) {
System.out.println("消费者组已存在");
}
}

// 消费者从组中读取消息
public void consumeMessages(String consumerName) {
while (true) {
// 从组中读取消息,每次最多读取10条
List<Entry<String, List<StreamEntry>>> results = jedis.xreadGroup(
GROUP_NAME, // 消费者组名
consumerName, // 消费者名称(如worker-1, worker-2)
10, // 读取消息数量
5000, // 阻塞时间(毫秒)
false, // 不读取pending消息
new AbstractMap.SimpleEntry<>(STREAM_KEY, ">") // > 表示读取新消息
);

if (results != null && !results.isEmpty()) {
for (Entry<String, List<StreamEntry>> entry : results) {
for (StreamEntry message : entry.getValue()) {
// 处理消息
processTask(message);

// 确认消息已处理
jedis.xack(STREAM_KEY, GROUP_NAME, message.getID());
}
}
}
}
}

private void processTask(StreamEntry message) {
Map<String, String> task = message.getFields();
String taskType = task.get("type");

System.out.printf("[%s] 处理任务 %s: %s%n",
Thread.currentThread().getName(),
message.getID(),
taskType);

// 模拟任务处理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 查看待处理消息(Pending List)
public void checkPendingMessages() {
// 查看组中所有待处理的消息
List<StreamPendingEntry> pending =
jedis.xpending(STREAM_KEY, GROUP_NAME, null, null, 10, null);

for (StreamPendingEntry entry : pending) {
System.out.println("待处理消息ID: " + entry.getID());
System.out.println("消费者: " + entry.getConsumerName());
System.out.println("未确认时长: " + entry.getIdleTime() + "ms");
}
}
}

// 启动多个消费者
public static void main(String[] args) {
TaskDistributionSystem system = new TaskDistributionSystem();
system.createConsumerGroup();

// 启动3个工作线程
for (int i = 1; i <= 3; i++) {
final String workerName = "worker-" + i;
new Thread(() -> {
system.consumeMessages(workerName);
}, workerName).start();
}
}

消费者组工作流程

Stream实战案例

案例1:订单处理系统

/**
* 订单异步处理:将订单处理任务放入Stream
*/
public class OrderProcessingSystem {
private Jedis jedis = new Jedis("localhost", 6379);
private static final String ORDER_STREAM = "orders:processing";
private static final String GROUP = "order_processors";

// 提交订单处理任务
public void submitOrder(Order order) {
Map<String, String> orderData = new HashMap<>();
orderData.put("orderId", order.getId());
orderData.put("userId", order.getUserId());
orderData.put("amount", String.valueOf(order.getAmount()));
orderData.put("status", "PENDING");
orderData.put("createTime", String.valueOf(System.currentTimeMillis()));

String messageId = jedis.xadd(ORDER_STREAM, null, orderData);
System.out.println("订单已提交到处理队列: " + messageId);
}

// 订单处理器
public void startOrderProcessor(String processorId) {
// 确保消费者组存在
try {
jedis.xgroupCreate(ORDER_STREAM, GROUP, "0", false);
} catch (Exception ignored) {}

System.out.println("订单处理器启动: " + processorId);

while (true) {
List<Entry<String, List<StreamEntry>>> results = jedis.xreadGroup(
GROUP, processorId, 1, 5000, false,
new AbstractMap.SimpleEntry<>(ORDER_STREAM, ">")
);

if (results != null) {
for (Entry<String, List<StreamEntry>> entry : results) {
for (StreamEntry message : entry.getValue()) {
processOrder(message);
jedis.xack(ORDER_STREAM, GROUP, message.getID());
}
}
}
}
}

private void processOrder(StreamEntry message) {
Map<String, String> order = message.getFields();
String orderId = order.get("orderId");

try {
// 1. 验证库存
checkInventory(orderId);

// 2. 扣减库存
deductInventory(orderId);

// 3. 创建物流单
createShipment(orderId);

// 4. 发送通知
sendNotification(order.get("userId"), orderId);

System.out.println("✓ 订单处理完成: " + orderId);

} catch (Exception e) {
System.err.println("✗ 订单处理失败: " + orderId + ", " + e.getMessage());
// 可以将失败的订单放入死信队列
jedis.xadd("orders:failed", null, order);
}
}
}

案例2:日志收集系统

/**
* 分布式日志收集:各服务将日志发送到Stream
*/
public class LogCollectionSystem {
private Jedis jedis = new Jedis("localhost", 6379);
private static final String LOG_STREAM = "logs:application";

// 发送日志
public void sendLog(String service, String level, String message) {
Map<String, String> log = new HashMap<>();
log.put("service", service);
log.put("level", level);
log.put("message", message);
log.put("timestamp", String.valueOf(System.currentTimeMillis()));
log.put("hostname", getHostname());

jedis.xadd(LOG_STREAM, null, log);
}

// 日志消费者:写入Elasticsearch
public void startLogConsumer() {
String consumerGroup = "log_indexers";
String consumerName = "indexer-" + UUID.randomUUID().toString();

try {
jedis.xgroupCreate(LOG_STREAM, consumerGroup, "0", false);
} catch (Exception ignored) {}

while (true) {
List<Entry<String, List<StreamEntry>>> results = jedis.xreadGroup(
consumerGroup, consumerName, 100, 1000, false,
new AbstractMap.SimpleEntry<>(LOG_STREAM, ">")
);

if (results != null) {
List<String> processedIds = new ArrayList<>();
List<Map<String, String>> logs = new ArrayList<>();

for (Entry<String, List<StreamEntry>> entry : results) {
for (StreamEntry message : entry.getValue()) {
logs.add(message.getFields());
processedIds.add(message.getID().toString());
}
}

// 批量写入Elasticsearch
if (!logs.isEmpty()) {
batchIndexToES(logs);

// 批量确认
for (String id : processedIds) {
jedis.xack(LOG_STREAM, consumerGroup, id);
}
}
}
}
}

private void batchIndexToES(List<Map<String, String>> logs) {
// 实际实现中调用Elasticsearch API
System.out.println("批量写入ES: " + logs.size() + " 条日志");
}
}

Stream与Pub/Sub对比

特性Pub/SubStream
消息持久化✗ 不持久化✓ 持久化到磁盘
消息丢失✓ 订阅者离线会丢失✗ 可重新消费历史消息
消费者组✗ 不支持✓ 支持负载均衡
消息确认✗ 无ACK机制✓ 支持ACK确认
消息顺序✓ 发布顺序✓ 时间戳顺序
适用场景实时通知、广播任务队列、日志收集

Redis延迟消息实现

业务场景

延迟消息在许多业务场景中都有应用:

  • 订单超时关闭:下单30分钟未支付自动取消
  • 定时提醒:会议前15分钟提醒
  • 延迟任务:用户注册后3天发送关怀邮件
  • 限时优惠:优惠券到期自动失效

方案对比

方案一:键过期监听(不推荐)

/**
* 基于键过期事件的延迟消息(存在严重问题)
*/
public class KeyExpirationListener extends JedisPubSub {

@Override
public void onPMessage(String pattern, String channel, String message) {
// message 是过期的key
if (message.startsWith("order:timeout:")) {
String orderId = message.replace("order:timeout:", "");
closeOrder(orderId);
}
}

private void closeOrder(String orderId) {
System.out.println("关闭超时订单: " + orderId);
}

public void subscribe() {
Jedis jedis = new Jedis("localhost", 6379);
// 订阅键过期事件(需要配置 notify-keyspace-events Ex)
jedis.psubscribe(this, "__keyevent@0__:expired");
}
}

// 设置延迟任务
public void createDelayedTask(String orderId, int delaySeconds) {
Jedis jedis = new Jedis("localhost", 6379);
String key = "order:timeout:" + orderId;
// 设置键和过期时间
jedis.setex(key, delaySeconds, orderId);
}

严重问题:

  1. Redis不保证立即触发过期事件,延迟可能达到数分钟
  2. 基于Pub/Sub,消息不持久化,客户端离线会丢失
  3. 过期删除策略是惰性的,依赖访问或定期扫描

方案二:Zset实现(推荐)

/**
* 基于Zset实现的延迟队列
*/
public class ZsetDelayQueue {
private Jedis jedis = new Jedis("localhost", 6379);
private static final String DELAY_QUEUE_KEY = "delay:queue:orders";

// 添加延迟任务
public void addDelayTask(String orderId, long delayMillis) {
long executeTime = System.currentTimeMillis() + delayMillis;

// 将订单ID作为member,执行时间作为score
jedis.zadd(DELAY_QUEUE_KEY, executeTime, orderId);

System.out.println("添加延迟任务: " + orderId +
", 执行时间: " + new Date(executeTime));
}

// 扫描并执行到期任务
public void scanAndExecute() {
while (true) {
long now = System.currentTimeMillis();

// 获取当前时间之前的所有任务(score <= now)
Set<String> tasks = jedis.zrangeByScore(
DELAY_QUEUE_KEY,
0,
now,
0, // offset
10 // 每次最多取10个
);

if (tasks.isEmpty()) {
try {
Thread.sleep(1000); // 没有任务,休眠1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}

for (String orderId : tasks) {
// 尝试删除任务(防止重复执行)
Long removed = jedis.zrem(DELAY_QUEUE_KEY, orderId);

if (removed > 0) {
// 删除成功,执行任务
executeTask(orderId);
}
}
}
}

private void executeTask(String orderId) {
System.out.println("执行延迟任务: " + orderId);

// 执行订单关闭逻辑
closeOrder(orderId);
}

private void closeOrder(String orderId) {
// 1. 查询订单状态
// 2. 如果未支付,关闭订单
// 3. 释放库存
// 4. 发送通知
System.out.println("订单已关闭: " + orderId);
}
}

// 使用示例
public class OrderService {
private ZsetDelayQueue delayQueue = new ZsetDelayQueue();

public void createOrder(Order order) {
// 保存订单
orderDao.save(order);

// 添加30分钟后的关闭任务
delayQueue.addDelayTask(order.getId(), 30 * 60 * 1000);

System.out.println("订单创建成功,30分钟后自动关闭");
}

// 启动延迟队列扫描线程
public void startDelayQueueScanner() {
new Thread(() -> {
delayQueue.scanAndExecute();
}, "delay-queue-scanner").start();
}
}

优化版本:分布式锁防止重复消费

public class ZsetDelayQueueWithLock {
private Jedis jedis = new Jedis("localhost", 6379);
private static final String DELAY_QUEUE_KEY = "delay:queue:orders";

public void scanAndExecute() {
while (true) {
long now = System.currentTimeMillis();
Set<String> tasks = jedis.zrangeByScore(DELAY_QUEUE_KEY, 0, now, 0, 10);

if (tasks.isEmpty()) {
sleep(1000);
continue;
}

for (String orderId : tasks) {
// 使用分布式锁防止多个消费者同时处理
String lockKey = "lock:order:" + orderId;
String lockValue = UUID.randomUUID().toString();

// 尝试获取锁(SET NX EX)
String result = jedis.set(lockKey, lockValue,
SetParams.setParams().nx().ex(10));

if ("OK".equals(result)) {
try {
// 获取锁成功,从队列中移除并执行
jedis.zrem(DELAY_QUEUE_KEY, orderId);
executeTask(orderId);
} finally {
// 释放锁(Lua脚本保证原子性)
releaseLock(lockKey, lockValue);
}
}
}
}
}

private void releaseLock(String lockKey, String lockValue) {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(lockValue));
}
}

方案三:Redisson延迟队列(最佳)

Redisson是基于Redis的Java客户端,提供了开箱即用的延迟队列实现:

/**
* 使用Redisson实现延迟队列
*/
public class RedissonDelayQueueExample {

public static void main(String[] args) {
// 配置Redisson
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
RedissonClient redisson = Redisson.create(config);

// 获取延迟队列
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("order_queue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);

// 生产者:添加延迟任务
String orderId = "ORDER_20231201_001";
delayedQueue.offer(orderId, 30, TimeUnit.MINUTES);
System.out.println("订单已创建,30分钟后自动关闭");

// 消费者:处理到期任务
new Thread(() -> {
while (true) {
try {
// 阻塞获取到期的任务
String order = blockingQueue.take();
System.out.println("处理超时订单: " + order);
closeOrder(order);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

private static void closeOrder(String orderId) {
System.out.println("关闭订单: " + orderId);
// 执行关单逻辑
}
}

Redisson延迟队列完整案例:

/**
* 订单超时自动关闭系统
*/
@Service
public class OrderTimeoutService {

@Autowired
private RedissonClient redissonClient;

// 订单任务数据结构
@Data
public static class OrderTask implements Serializable {
private String orderId;
private String userId;
private Long createTime;
}

private RBlockingQueue<OrderTask> blockingQueue;
private RDelayedQueue<OrderTask> delayedQueue;

@PostConstruct
public void init() {
blockingQueue = redissonClient.getBlockingQueue("order:timeout:queue");
delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

// 启动消费者线程
startConsumer();
}

// 创建订单时添加延迟任务
public void addOrderTimeoutTask(String orderId, String userId) {
OrderTask task = new OrderTask();
task.setOrderId(orderId);
task.setUserId(userId);
task.setCreateTime(System.currentTimeMillis());

// 30分钟后执行
delayedQueue.offer(task, 30, TimeUnit.MINUTES);

log.info("订单超时任务已添加: {}", orderId);
}

// 支付成功后取消延迟任务
public void cancelOrderTimeoutTask(String orderId) {
// Redisson延迟队列不支持直接移除,可以通过标记位实现
// 或者在Redis中记录已支付订单,消费时检查
redissonClient.getBucket("order:paid:" + orderId)
.set("true", 31, TimeUnit.MINUTES);
}

// 启动消费者
private void startConsumer() {
// 可以启动多个消费者线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
try {
// 阻塞获取到期任务
OrderTask task = blockingQueue.take();
processTimeoutOrder(task);
} catch (Exception e) {
log.error("处理订单超时任务失败", e);
}
}
}, "order-timeout-consumer-" + i).start();
}
}

private void processTimeoutOrder(OrderTask task) {
String orderId = task.getOrderId();

// 检查订单是否已支付
RBucket<String> paidFlag = redissonClient.getBucket("order:paid:" + orderId);
if (paidFlag.isExists()) {
log.info("订单已支付,跳过关闭: {}", orderId);
return;
}

// 查询订单状态
Order order = orderMapper.selectById(orderId);

if (order == null) {
log.warn("订单不存在: {}", orderId);
return;
}

if (!"PENDING".equals(order.getStatus())) {
log.info("订单状态不是待支付,跳过关闭: {}", orderId);
return;
}

// 关闭订单
order.setStatus("CLOSED");
order.setCloseTime(new Date());
orderMapper.updateById(order);

// 恢复库存
inventoryService.restoreStock(order.getProductId(), order.getQuantity());

// 发送通知
notificationService.sendOrderClosedNotification(task.getUserId(), orderId);

log.info("订单已自动关闭: {}", orderId);
}

@PreDestroy
public void destroy() {
// 销毁延迟队列
if (delayedQueue != null) {
delayedQueue.destroy();
}
}
}

三种方案对比

方案优点缺点推荐度
键过期监听实现简单延迟不可控、消息易丢失
Zset方案可控性强、支持持久化需要自己实现、可能重复消费⭐⭐⭐
Redisson开箱即用、稳定可靠、支持集群依赖第三方库⭐⭐⭐⭐⭐

总结

Redis提供了多种消息通信和队列机制,各有特点:

发布订阅(Pub/Sub)

  • 适用场景:实时通知、缓存同步、实时数据推送
  • 优点:实时性高、使用简单
  • 缺点:消息不持久化、可靠性低
  • 典型应用:聊天室、系统监控、配置更新通知

Stream消息流

  • 适用场景:任务队列、日志收集、可靠消息传递
  • 优点:持久化、支持消费者组、消息确认机制
  • 缺点:相对复杂、需要管理消费者组
  • 典型应用:订单处理、数据同步、事件溯源

延迟队列

  • 适用场景:定时任务、订单超时、延迟提醒
  • 推荐方案:Redisson延迟队列(生产环境)、Zset方案(简单场景)
  • 不推荐:键过期监听(不可靠)

选择建议:

  • 实时通知、广播消息 → Pub/Sub
  • 可靠任务队列、需要确认机制 → Stream
  • 延迟任务处理 → Redisson延迟队列
  • 核心业务数据处理 → 考虑使用专业MQ(如RabbitMQ、Kafka)