跳到主要内容

Redis数据类型详解与应用场景

概述

Redis之所以被广泛使用,很大程度上得益于它丰富的数据类型。不同于传统的键值存储只支持简单的字符串,Redis提供了五种基础数据类型多种高级数据类型,让我们能够优雅地解决各种业务场景。


五种基础数据类型

String(字符串)

是什么

String是Redis最基础、最常用的数据类型。它是二进制安全的,意味着你可以存储任何数据:普通字符串、数字、JSON、甚至是图片的二进制数据。单个String值最大可以存储512MB

能干什么

应用场景说明示例
缓存缓存热点数据,减少数据库压力用户信息、商品详情
计数器原子递增/递减操作页面PV、点赞数、库存
分布式锁SETNX实现互斥锁防止重复提交、资源竞争
Session共享分布式系统Session存储登录状态、用户会话
限流配合过期时间实现接口调用频率限制

常用命令

# 基本操作
SET key value # 设置值
GET key # 获取值
DEL key # 删除

# 带过期时间
SET key value EX 3600 # 设置并指定过期时间(秒)
SETEX key 3600 value # 等价写法
TTL key # 查看剩余过期时间

# 原子操作
INCR counter # 自增1
INCRBY counter 10 # 自增指定值
DECR counter # 自减1

# 分布式锁
SETNX lock_key value # 不存在时才设置(原子操作)
SET lock_key value NX EX 30 # 推荐:不存在时设置,30秒过期

实战案例

场景1:商品库存扣减

// 商品库存key
String stockKey = "stock:product:10086";

// 初始化库存
redisTemplate.opsForValue().set(stockKey, "1000");

// 下单时扣减库存(原子操作)
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock < 0) {
// 库存不足,回滚
redisTemplate.opsForValue().increment(stockKey);
throw new RuntimeException("库存不足");
}

场景2:接口限流

public boolean isAllowed(String userId, int maxRequests, int timeWindow) {
String key = "rate_limit:" + userId;
Long count = redisTemplate.opsForValue().increment(key);

if (count == 1) {
// 第一次请求,设置过期时间
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
}

return count <= maxRequests;
}

Hash(哈希)

是什么

Hash是一个键值对集合,类似于Java中的HashMap<String, String>。它特别适合存储对象,因为可以只修改对象的某个字段,而不用读取整个对象再写回。

Hash vs String存储对象

能干什么

应用场景说明示例
对象缓存存储用户、商品等对象用户信息、购物车
购物车商品ID作为field,数量作为value电商购物车
计数器组一个key存多个计数器文章的阅读、点赞、评论数
配置信息存储配置项系统配置、功能开关

常用命令

# 基本操作
HSET user:1001 name "张三" # 设置单个字段
HGET user:1001 name # 获取单个字段
HDEL user:1001 name # 删除字段

# 批量操作
HMSET user:1001 name "张三" age 25 city "北京" # 设置多个字段
HMGET user:1001 name age # 获取多个字段
HGETALL user:1001 # 获取所有字段和值

# 其他操作
HKEYS user:1001 # 获取所有field
HVALS user:1001 # 获取所有value
HLEN user:1001 # 获取字段数量
HEXISTS user:1001 name # 判断字段是否存在
HINCRBY user:1001 age 1 # 字段值自增

实战案例:购物车

// 购物车key = cart:用户ID,field = 商品ID,value = 数量
String cartKey = "cart:" + userId;

// 添加商品
public void addToCart(Long userId, Long productId, int quantity) {
String cartKey = "cart:" + userId;
redisTemplate.opsForHash().increment(cartKey, productId.toString(), quantity);
}

// 获取购物车
public Map<String, Integer> getCart(Long userId) {
String cartKey = "cart:" + userId;
return redisTemplate.opsForHash().entries(cartKey);
}

// 修改数量
public void updateQuantity(Long userId, Long productId, int quantity) {
String cartKey = "cart:" + userId;
redisTemplate.opsForHash().put(cartKey, productId.toString(), quantity);
}

// 删除商品
public void removeFromCart(Long userId, Long productId) {
String cartKey = "cart:" + userId;
redisTemplate.opsForHash().delete(cartKey, productId.toString());
}

// 清空购物车
public void clearCart(Long userId) {
String cartKey = "cart:" + userId;
redisTemplate.delete(cartKey);
}

List(列表)

是什么

List是一个有序的字符串列表,按照插入顺序排序。你可以从头部(左边)或尾部(右边)添加元素。底层是双向链表,所以在两端插入/删除操作的时间复杂度是O(1),但按索引访问是O(N)。

能干什么

应用场景说明实现方式
消息队列简单的队列实现LPUSH + RPOP(或BRPOP阻塞)
最新消息展示最新N条数据LPUSH + LRANGE + LTRIM
排行榜固定数量排名(简单场景)配合LTRIM保持长度
时间线微博、朋友圈时间线LPUSH最新消息

常用命令

# 插入操作
LPUSH mylist "a" "b" "c" # 从左边插入,结果:c b a
RPUSH mylist "x" "y" "z" # 从右边插入

# 弹出操作
LPOP mylist # 从左边弹出
RPOP mylist # 从右边弹出
BLPOP mylist 10 # 阻塞式弹出,最多等10秒

# 查询操作
LRANGE mylist 0 -1 # 获取全部元素
LRANGE mylist 0 9 # 获取前10个元素
LINDEX mylist 0 # 获取指定位置元素
LLEN mylist # 获取列表长度

# 修剪操作
LTRIM mylist 0 99 # 只保留前100个元素

实战案例:最新消息列表

// 用户动态时间线
public void addNews(Long userId, String newsId) {
String key = "timeline:" + userId;

// 1. 新消息插入头部
redisTemplate.opsForList().leftPush(key, newsId);

// 2. 只保留最新1000条
redisTemplate.opsForList().trim(key, 0, 999);
}

// 获取最新动态
public List<String> getLatestNews(Long userId, int page, int size) {
String key = "timeline:" + userId;
int start = page * size;
int end = start + size - 1;

return redisTemplate.opsForList().range(key, start, end);
}

实战案例:简单消息队列

// 生产者:发送消息
public void sendMessage(String queue, String message) {
redisTemplate.opsForList().leftPush(queue, message);
}

// 消费者:接收消息(阻塞方式)
public String receiveMessage(String queue, long timeout) {
return redisTemplate.opsForList()
.rightPop(queue, timeout, TimeUnit.SECONDS);
}

Set(集合)

是什么

Set是一个无序的字符串集合,最大的特点是自动去重。集合中的元素是唯一的,不会有重复值。Redis的Set支持交集、并集、差集等集合运算,这使得它在某些场景下非常强大。

Set集合运算

能干什么

应用场景说明命令
标签系统文章/用户标签SADD, SMEMBERS
共同好友两个用户的共同好友SINTER
可能认识好友推荐SDIFF
抽奖活动随机抽取中奖用户SRANDMEMBER, SPOP
点赞/收藏判断是否点赞、点赞列表SADD, SISMEMBER
UV统计去重统计访问用户SADD, SCARD

常用命令

# 基本操作
SADD myset "a" "b" "c" # 添加元素
SREM myset "a" # 删除元素
SMEMBERS myset # 获取所有元素
SCARD myset # 获取元素数量
SISMEMBER myset "a" # 判断元素是否存在

# 随机操作
SRANDMEMBER myset 3 # 随机获取3个元素(不删除)
SPOP myset 3 # 随机弹出3个元素(删除)

# 集合运算
SINTER set1 set2 # 交集
SUNION set1 set2 # 并集
SDIFF set1 set2 # 差集(set1有但set2没有)

实战案例:点赞功能

// 点赞key = like:文章ID
public void like(Long articleId, Long userId) {
String key = "like:" + articleId;
redisTemplate.opsForSet().add(key, userId.toString());
}

// 取消点赞
public void unlike(Long articleId, Long userId) {
String key = "like:" + articleId;
redisTemplate.opsForSet().remove(key, userId.toString());
}

// 是否已点赞
public boolean isLiked(Long articleId, Long userId) {
String key = "like:" + articleId;
return redisTemplate.opsForSet().isMember(key, userId.toString());
}

// 点赞数
public Long getLikeCount(Long articleId) {
String key = "like:" + articleId;
return redisTemplate.opsForSet().size(key);
}

// 点赞用户列表
public Set<String> getLikeUsers(Long articleId) {
String key = "like:" + articleId;
return redisTemplate.opsForSet().members(key);
}

实战案例:共同关注

// 获取共同关注
public Set<String> getCommonFollows(Long userId1, Long userId2) {
String key1 = "follow:" + userId1;
String key2 = "follow:" + userId2;

return redisTemplate.opsForSet().intersect(key1, key2);
}

// 推荐关注(我关注的人关注了,但我没关注)
public Set<String> getRecommendFollows(Long myId, Long friendId) {
String myKey = "follow:" + myId;
String friendKey = "follow:" + friendId;

// 朋友关注的 - 我关注的 = 推荐给我的
return redisTemplate.opsForSet().difference(friendKey, myKey);
}

Sorted Set(有序集合)

是什么

Sorted Set(简称ZSet)是一个有序的集合,每个元素关联一个分数(score),Redis根据分数对元素进行从小到大排序。与Set一样,元素不能重复,但分数可以相同。

能干什么

应用场景说明score含义
排行榜游戏排名、销量排名分数/销量
热搜榜微博热搜、新闻热度热度值
延迟队列定时任务调度执行时间戳
范围查询价格区间、时间范围价格/时间
优先级队列按优先级处理任务优先级值

常用命令

# 添加/更新
ZADD leaderboard 9500 "玩家A" # 添加元素及分数
ZINCRBY leaderboard 100 "玩家A" # 增加分数

# 查询排名(从小到大)
ZRANK leaderboard "玩家A" # 获取排名(从0开始)
ZRANGE leaderboard 0 9 # 获取前10名(按分数升序)
ZRANGE leaderboard 0 9 WITHSCORES # 同时返回分数

# 查询排名(从大到小)
ZREVRANK leaderboard "玩家A" # 逆序排名
ZREVRANGE leaderboard 0 9 # 前10名(按分数降序)

# 按分数范围查询
ZRANGEBYSCORE leaderboard 1000 5000 # 分数在1000-5000的成员
ZCOUNT leaderboard 1000 5000 # 该范围内的成员数量

# 其他操作
ZSCORE leaderboard "玩家A" # 获取成员分数
ZCARD leaderboard # 获取成员总数
ZREM leaderboard "玩家A" # 删除成员

实战案例:游戏排行榜

// 排行榜key
private static final String LEADERBOARD = "game:leaderboard";

// 更新玩家分数
public void updateScore(String playerId, double score) {
redisTemplate.opsForZSet().add(LEADERBOARD, playerId, score);
}

// 增加分数
public void addScore(String playerId, double delta) {
redisTemplate.opsForZSet().incrementScore(LEADERBOARD, playerId, delta);
}

// 获取前N名(降序)
public List<PlayerRank> getTopN(int n) {
Set<ZSetOperations.TypedTuple<String>> tuples =
redisTemplate.opsForZSet().reverseRangeWithScores(LEADERBOARD, 0, n - 1);

List<PlayerRank> result = new ArrayList<>();
int rank = 1;
for (ZSetOperations.TypedTuple<String> tuple : tuples) {
result.add(new PlayerRank(rank++, tuple.getValue(), tuple.getScore()));
}
return result;
}

// 获取玩家排名(从1开始)
public Long getPlayerRank(String playerId) {
Long rank = redisTemplate.opsForZSet().reverseRank(LEADERBOARD, playerId);
return rank != null ? rank + 1 : null;
}

// 获取玩家分数
public Double getPlayerScore(String playerId) {
return redisTemplate.opsForZSet().score(LEADERBOARD, playerId);
}

实战案例:延迟队列

// 添加延迟任务(score = 执行时间戳)
public void addDelayTask(String taskId, long executeTime) {
redisTemplate.opsForZSet().add("delay:queue", taskId, executeTime);
}

// 轮询获取到期任务
public List<String> pollExpiredTasks() {
long now = System.currentTimeMillis();

// 获取所有到期任务(score <= 当前时间)
Set<String> tasks = redisTemplate.opsForZSet()
.rangeByScore("delay:queue", 0, now);

if (tasks != null && !tasks.isEmpty()) {
// 删除这些任务
redisTemplate.opsForZSet().remove("delay:queue", tasks.toArray());
}

return new ArrayList<>(tasks);
}

五种基础类型对比

类型存储结构是否有序是否唯一典型场景
String单一值--缓存、计数器、分布式锁
Hashfield-value无序field唯一对象存储、购物车
List链表有序可重复消息队列、时间线
Set哈希表无序唯一点赞、共同好友、抽奖
ZSet跳表+哈希有序唯一排行榜、延迟队列

高级数据类型

除了五种基础类型,Redis还提供了几种高级数据结构,它们针对特定场景进行了高度优化,能够以极低的内存占用解决特定问题。

Bitmap(位图)

是什么

Bitmap本质上不是一种新的数据类型,而是String类型的位操作扩展。它将一个String看作是一个由二进制位组成的数组,每个位只能是0或1。

核心优势:极致的空间效率

  • 存储1亿用户的签到状态,只需要 1亿 / 8 = 12.5MB
  • 如果用String或Set存储,可能需要几个GB

能干什么

应用场景说明位的含义
用户签到记录每日签到情况偏移量=日期,值=是否签到
活跃用户统计日/周/月活跃用户偏移量=用户ID,值=是否活跃
功能开关用户功能权限控制偏移量=功能ID,值=是否开启
在线状态用户是否在线偏移量=用户ID,值=是否在线
布隆过滤器Bitmap是布隆过滤器的底层实现哈希位置标记

常用命令

# 基本操作
SETBIT sign:202312:1001 5 1 # 用户1001在12月第5天签到
GETBIT sign:202312:1001 5 # 查询第5天是否签到

# 统计操作
BITCOUNT sign:202312:1001 # 统计12月签到总天数
BITCOUNT sign:202312:1001 0 3 # 统计前4个字节的签到天数

# 位运算(多个Bitmap)
BITOP AND result sign:day1 sign:day2 # 两天都签到的用户
BITOP OR result sign:day1 sign:day2 # 任一天签到的用户
BITOP XOR result sign:day1 sign:day2 # 只签到一天的用户

# 查找第一个指定位
BITPOS sign:202312:1001 1 # 第一次签到是哪天
BITPOS sign:202312:1001 0 # 第一次未签到是哪天

实战案例:用户签到系统

/**
* 签到服务
* key设计: sign:{年月}:{用户ID}
* offset: 当月第几天(从0开始)
*/
public class SignService {

// 签到
public boolean sign(Long userId, LocalDate date) {
String key = buildKey(userId, date);
int offset = date.getDayOfMonth() - 1;

// 设置对应位为1
return redisTemplate.opsForValue().setBit(key, offset, true);
}

// 查询某天是否签到
public boolean isSign(Long userId, LocalDate date) {
String key = buildKey(userId, date);
int offset = date.getDayOfMonth() - 1;

return redisTemplate.opsForValue().getBit(key, offset);
}

// 统计本月签到天数
public Long countMonthSign(Long userId, LocalDate date) {
String key = buildKey(userId, date);

// BITCOUNT命令
return redisTemplate.execute((RedisCallback<Long>) conn ->
conn.bitCount(key.getBytes())
);
}

// 获取连续签到天数
public int getConsecutiveDays(Long userId, LocalDate date) {
String key = buildKey(userId, date);
int dayOfMonth = date.getDayOfMonth();

// 获取本月到今天为止的签到数据
List<Long> result = redisTemplate.opsForValue()
.bitField(key, BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth))
.valueAt(0));

if (result == null || result.isEmpty()) {
return 0;
}

long bits = result.get(0);
int count = 0;

// 从最低位开始检查连续的1
while ((bits & 1) == 1) {
count++;
bits >>>= 1;
}

return count;
}

private String buildKey(Long userId, LocalDate date) {
return String.format("sign:%d%02d:%d",
date.getYear(), date.getMonthValue(), userId);
}
}

实战案例:统计活跃用户

// 记录用户活跃(用户ID作为offset)
public void recordActive(Long userId) {
String key = "active:" + LocalDate.now().toString();
redisTemplate.opsForValue().setBit(key, userId, true);
}

// 统计日活跃用户数
public Long getDailyActiveUsers(LocalDate date) {
String key = "active:" + date.toString();
return redisTemplate.execute((RedisCallback<Long>) conn ->
conn.bitCount(key.getBytes())
);
}

// 统计连续N天都活跃的用户数
public Long getConsecutiveActiveUsers(LocalDate endDate, int days) {
List<String> keys = new ArrayList<>();
for (int i = 0; i < days; i++) {
keys.add("active:" + endDate.minusDays(i).toString());
}

String destKey = "active:consecutive:" + days;
// AND运算:所有天都活跃
redisTemplate.execute((RedisCallback<Long>) conn -> {
byte[][] keyBytes = keys.stream()
.map(String::getBytes)
.toArray(byte[][]::new);
return conn.bitOp(RedisStringCommands.BitOperation.AND,
destKey.getBytes(), keyBytes);
});

return redisTemplate.execute((RedisCallback<Long>) conn ->
conn.bitCount(destKey.getBytes())
);
}

HyperLogLog(基数统计)

是什么

HyperLogLog是一种概率性数据结构,用于统计集合中不重复元素的数量(基数)。它的神奇之处在于:

  • 固定占用12KB内存
  • 可以统计2^64个不同元素
  • 标准误差仅0.81%

适用场景:允许一定误差的大规模去重计数

能干什么

应用场景说明为什么用HyperLogLog
UV统计页面独立访客数用户量大,允许误差
搜索词统计搜索关键词去重计数关键词极多,精确计数成本高
IP统计独立IP数量IP数量庞大
在线用户在线用户数估算实时性要求高

常用命令

# 添加元素
PFADD page:uv:20231201 user1 user2 user3
PFADD page:uv:20231201 user1 user4 # user1重复,不会重复计数

# 获取基数估计值
PFCOUNT page:uv:20231201 # 返回约4

# 合并多个HyperLogLog
PFMERGE page:uv:week page:uv:20231201 page:uv:20231202 page:uv:20231203
PFCOUNT page:uv:week # 获取一周的UV

实战案例:页面UV统计

/**
* UV统计服务
* 适用于大流量页面的访客统计,允许约1%的误差
*/
public class UVStatisticsService {

// 记录页面访问
public void recordPageView(String pageId, String visitorId) {
String key = "uv:" + pageId + ":" + LocalDate.now();
redisTemplate.opsForHyperLogLog().add(key, visitorId);
}

// 获取今日UV
public Long getTodayUV(String pageId) {
String key = "uv:" + pageId + ":" + LocalDate.now();
return redisTemplate.opsForHyperLogLog().size(key);
}

// 获取日期范围内的UV(去重合并)
public Long getUVInRange(String pageId, LocalDate start, LocalDate end) {
List<String> keys = new ArrayList<>();
LocalDate current = start;

while (!current.isAfter(end)) {
keys.add("uv:" + pageId + ":" + current);
current = current.plusDays(1);
}

String destKey = "uv:" + pageId + ":range:" + start + ":" + end;

// 合并多天数据
redisTemplate.opsForHyperLogLog()
.union(destKey, keys.toArray(new String[0]));

Long count = redisTemplate.opsForHyperLogLog().size(destKey);

// 删除临时key
redisTemplate.delete(destKey);

return count;
}
}

HyperLogLog vs Set 对比

维度HyperLogLogSet
内存占用固定12KB随元素增加而增长
精确度约0.81%误差100%精确
支持操作添加、计数、合并丰富的集合操作
适用场景大规模UV统计精确去重、集合运算
1亿用户12KB约1.2GB(假设每个ID 12字节)

Geo(地理位置)

是什么

Geo是Redis 3.2引入的地理位置数据类型,底层使用Sorted Set实现。它可以存储地理坐标(经度、纬度),并支持计算两点距离、查找附近的点等操作。

能干什么

应用场景说明
附近的人社交App查找附近用户
附近门店查找最近的门店/餐厅
打车服务查找附近的司机
外卖配送骑手位置、配送范围
签到功能基于位置的签到打卡

常用命令

# 添加地理位置
GEOADD restaurants 116.403963 39.915119 "海底捞"
GEOADD restaurants 116.405285 39.920328 "肯德基" 116.410872 39.918473 "麦当劳"

# 获取坐标
GEOPOS restaurants "海底捞"

# 计算两点距离
GEODIST restaurants "海底捞" "肯德基" km # 返回千米
GEODIST restaurants "海底捞" "肯德基" m # 返回米

# 查找附近(圆形范围)
GEORADIUS restaurants 116.405 39.918 1 km WITHDIST COUNT 10 ASC
# 以坐标为中心,1公里内,返回距离,最多10个,按距离升序

# 以某成员为中心查找
GEORADIUSBYMEMBER restaurants "海底捞" 2 km WITHDIST

# Redis 6.2+ 新命令
GEOSEARCH restaurants FROMMEMBER "海底捞" BYRADIUS 2 km
GEOSEARCH restaurants FROMLONLAT 116.405 39.918 BYBOX 2 2 km

实战案例:附近的餐厅

/**
* 位置服务
*/
public class LocationService {

private static final String GEO_KEY = "geo:restaurants";

// 添加餐厅位置
public void addRestaurant(String restaurantId, double lng, double lat) {
redisTemplate.opsForGeo().add(GEO_KEY,
new Point(lng, lat), restaurantId);
}

// 批量添加
public void addRestaurants(List<Restaurant> restaurants) {
List<RedisGeoCommands.GeoLocation<String>> locations =
restaurants.stream()
.map(r -> new RedisGeoCommands.GeoLocation<>(
r.getId(), new Point(r.getLng(), r.getLat())))
.collect(Collectors.toList());

redisTemplate.opsForGeo().add(GEO_KEY, locations);
}

// 查找附近餐厅
public List<NearbyRestaurant> findNearby(double lng, double lat,
double radiusKm, int limit) {
// 构建圆形查询条件
Circle circle = new Circle(
new Point(lng, lat),
new Distance(radiusKm, Metrics.KILOMETERS));

// 查询参数:返回距离、按距离升序、限制数量
RedisGeoCommands.GeoRadiusCommandArgs args =
RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs()
.includeDistance()
.sortAscending()
.limit(limit);

GeoResults<RedisGeoCommands.GeoLocation<String>> results =
redisTemplate.opsForGeo().radius(GEO_KEY, circle, args);

return results.getContent().stream()
.map(r -> new NearbyRestaurant(
r.getContent().getName(),
r.getDistance().getValue()))
.collect(Collectors.toList());
}

// 计算两点距离
public Double getDistance(String id1, String id2) {
Distance distance = redisTemplate.opsForGeo()
.distance(GEO_KEY, id1, id2, Metrics.KILOMETERS);
return distance != null ? distance.getValue() : null;
}

// 获取坐标
public Point getPosition(String restaurantId) {
List<Point> positions = redisTemplate.opsForGeo()
.position(GEO_KEY, restaurantId);
return positions != null && !positions.isEmpty()
? positions.get(0) : null;
}
}

实战案例:司机位置上报与匹配

/**
* 打车服务 - 司机位置管理
*/
public class DriverLocationService {

private static final String ONLINE_DRIVERS = "geo:drivers:online";

// 司机上报位置(通常每隔几秒上报一次)
public void updateDriverLocation(String driverId, double lng, double lat) {
redisTemplate.opsForGeo().add(ONLINE_DRIVERS,
new Point(lng, lat), driverId);

// 设置位置过期时间(司机掉线自动清除)
// 注意:Geo底层是ZSet,需要额外维护过期
redisTemplate.opsForValue().set(
"driver:heartbeat:" + driverId, "1", 30, TimeUnit.SECONDS);
}

// 为乘客匹配最近的司机
public List<String> findNearestDrivers(double lng, double lat,
double radiusKm, int count) {
Circle circle = new Circle(
new Point(lng, lat),
new Distance(radiusKm, Metrics.KILOMETERS));

RedisGeoCommands.GeoRadiusCommandArgs args =
RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs()
.includeDistance()
.sortAscending()
.limit(count);

GeoResults<RedisGeoCommands.GeoLocation<String>> results =
redisTemplate.opsForGeo().radius(ONLINE_DRIVERS, circle, args);

// 过滤掉已掉线的司机
return results.getContent().stream()
.map(r -> r.getContent().getName())
.filter(this::isDriverOnline)
.collect(Collectors.toList());
}

private boolean isDriverOnline(String driverId) {
return redisTemplate.hasKey("driver:heartbeat:" + driverId);
}
}

Stream(消息流)

是什么

Stream是Redis 5.0引入的数据类型,专为消息队列场景设计。它结合了List的有序性和Pub/Sub的消息广播能力,还支持消费者组消息确认消息回溯等高级特性。

Stream vs List vs Pub/Sub

特性ListPub/SubStream
消息持久化支持不支持支持
消息回溯不支持不支持支持
消费者组不支持不支持支持
消息确认不支持不支持支持
广播消费不支持支持支持
阻塞读取支持支持支持

能干什么

应用场景说明
消息队列替代简单的List队列
事件驱动微服务间事件传递
日志收集日志流式处理
实时数据实时数据管道
任务分发多消费者任务处理

常用命令

# 添加消息(*表示自动生成ID)
XADD mystream * name "order" action "create" orderId "10086"
# 返回: 1701388800000-0 (时间戳-序号)

# 读取消息
XRANGE mystream - + # 读取所有消息
XRANGE mystream - + COUNT 10 # 读取10条
XREVRANGE mystream + - COUNT 10 # 逆序读取10条
XLEN mystream # 消息数量

# 阻塞读取
XREAD BLOCK 5000 STREAMS mystream 0 # 从头开始读,阻塞5秒
XREAD BLOCK 0 STREAMS mystream $ # 只读取新消息,永久阻塞

# 消费者组
XGROUP CREATE mystream mygroup $ MKSTREAM # 创建消费者组
XREADGROUP GROUP mygroup consumer1 COUNT 1 BLOCK 5000 STREAMS mystream >
# 以consumer1身份从mygroup读取1条新消息

# 确认消息
XACK mystream mygroup 1701388800000-0 # 确认消息已处理

# 查看待确认消息
XPENDING mystream mygroup # 查看待处理消息概况
XPENDING mystream mygroup - + 10 # 查看详细待处理消息

# 消息转移(处理超时消息)
XCLAIM mystream mygroup consumer2 60000 1701388800000-0
# 将消息转给consumer2(超时60秒的消息)

实战案例:订单事件处理

/**
* 基于Stream的订单事件队列
*/
public class OrderEventService {

private static final String STREAM_KEY = "stream:order:events";
private static final String GROUP_NAME = "order-processor";

// 初始化消费者组
@PostConstruct
public void initConsumerGroup() {
try {
redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);
} catch (Exception e) {
// 组已存在,忽略
}
}

// 发布订单事件
public String publishEvent(OrderEvent event) {
Map<String, String> message = new HashMap<>();
message.put("orderId", event.getOrderId());
message.put("action", event.getAction());
message.put("timestamp", String.valueOf(System.currentTimeMillis()));
message.put("data", JSON.toJSONString(event.getData()));

RecordId recordId = redisTemplate.opsForStream()
.add(STREAM_KEY, message);

return recordId.getValue();
}

// 消费消息
public void consumeEvents(String consumerName) {
while (true) {
try {
// 阻塞读取新消息
List<MapRecord<String, String, String>> records =
redisTemplate.opsForStream().read(
Consumer.from(GROUP_NAME, consumerName),
StreamReadOptions.empty()
.count(10)
.block(Duration.ofSeconds(5)),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())
);

if (records == null || records.isEmpty()) {
continue;
}

for (MapRecord<String, String, String> record : records) {
try {
// 处理消息
processEvent(record.getValue());

// 确认消息
redisTemplate.opsForStream()
.acknowledge(STREAM_KEY, GROUP_NAME, record.getId());

} catch (Exception e) {
log.error("处理消息失败: {}", record.getId(), e);
// 不确认,消息会保留在pending列表
}
}

} catch (Exception e) {
log.error("读取Stream异常", e);
Thread.sleep(1000);
}
}
}

// 处理死信(长时间未确认的消息)
public void handleDeadLetters(long idleTimeMs) {
// 获取待处理消息
PendingMessagesSummary summary = redisTemplate.opsForStream()
.pending(STREAM_KEY, GROUP_NAME);

if (summary.getTotalPendingMessages() == 0) {
return;
}

// 获取详细的待处理消息
PendingMessages pending = redisTemplate.opsForStream()
.pending(STREAM_KEY, GROUP_NAME, Range.unbounded(), 100);

for (PendingMessage pm : pending) {
// 检查消息是否超时
if (pm.getElapsedTimeSinceLastDelivery().toMillis() > idleTimeMs) {
// 转移给当前消费者重新处理
redisTemplate.opsForStream().claim(
STREAM_KEY, GROUP_NAME, "dead-letter-handler",
Duration.ofMillis(idleTimeMs), pm.getId());
}
}
}

private void processEvent(Map<String, String> eventData) {
String action = eventData.get("action");
String orderId = eventData.get("orderId");

switch (action) {
case "CREATE":
handleOrderCreate(orderId, eventData);
break;
case "PAY":
handleOrderPay(orderId, eventData);
break;
case "CANCEL":
handleOrderCancel(orderId, eventData);
break;
}
}
}

高级类型对比

类型核心特点内存占用精确度典型场景
Bitmap位级别操作极低(1亿=12.5MB)精确签到、活跃统计
HyperLogLog概率去重计数固定12KB约0.81%误差UV统计
Geo地理位置计算中等(ZSet)精确附近的人/店
Stream可靠消息队列随消息增长精确事件驱动、消息队列

总结:如何选择数据类型

选型建议:

  1. 缓存对象:小对象用String(JSON),频繁修改字段用Hash
  2. 计数器:String的INCR系列命令
  3. 排行榜:Sorted Set
  4. 消息队列:简单场景用List,可靠消息用Stream
  5. 去重统计:精确用Set,大规模允许误差用HyperLogLog
  6. 签到/在线:Bitmap
  7. 位置服务:Geo