前言

在《消息队列入门》一文中简要介绍了分布式架构中消息队列的重要性,以及引入消息队列能带来哪些好处。消息队列支持组件通信的快速读写,很容易联想到Redis这种缓存中间件,因为Redis本身支持数据的高速访问,很符合消息队列对于读写性能需求。在轻量级应用的场景中,直接通过Redis来实现消息队列似乎是个不错的选择,无须引入更多的中间件设施,降低了运维成本……那么Redis适合做消息队列吗?

一般来说消息队列对外要提供提供三个保证:1.消息保序 2.处理重复消息 3.保证消息的可靠性。

在Redis 5.0版本前实现消息队列有多种方案,但无法完美实现上述保证,比如PUB/SUB(发布/订阅模式),基于List的消息队列解决方案,基于Sorted Set的实现。

Redis中Pub/Sub消息无法持久化,如果出现网络问题或中间件宕机,消息会直接丢失,无法保证消息的可靠性;Redis list没有消息多播,没有Ack机制,如果出现网络问题则会出现重复消费的问题;Redis Sorted Set不支持阻塞式获取消息,不支持分组消费。

Redis Streams

在官方文档中,Redis Stream被形容成一种”仅追加日志(append only log)”,又提供丰富的扩展命令操作来弥补日志局限性的数据结构。它不是Redis专门为消息队列设计的数据类型,但是它的特性满足了那些想要使用Redis作为消息队列的诉求。

XADD是Redis提供向流添加一条新数据的命令。每条流数据由一个或多个字段组成,这有点像字典或Redis中的哈希。不同的是它保证了有序,而且可以自动生成全局唯一的ID。这个全局唯一的ID由两部分组成,第一部分是数据插入时,以毫秒为单位计算的时间戳,第二部分表示插入消息在当前毫秒内的消息序号,以0编号开始。

1
2
3
4
127.0.0.1:6379> XADD mystream * name "Alice" action "login"
"1757662221963-0"
127.0.0.1:6379> XADD mystream * name1 "Alice1" action1 "login1"
"1757662226486-0"

当执行上述代码,就可以往名为 mystream 的消息队列中插入一条数据,name "Alice" action "login" 是一个字段/键值对,可以任意扩展。

1
2
3
4
5
6
7
8
9
10
11
12
127.0.0.1:6379>  XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1757662221963-0"
2) 1) "name"
2) "Alice"
3) "action"
4) "login"
2) 1) "1757662226486-0"
2) 1) "name1"
2) "Alice1"
3) "action1"
4) "login1"

XREAD命令在读取消息时,XREAD COUNT 1 STREAMS mystream 0表示从最新的消息读一条信息。此外还可以选填block配置项,实现类似于BRPOP的阻塞读取操作。如果设置了block,再次执行XREAD命令就会阻塞:

1
2
3
127.0.0.1:6379> XREAD block 10000 streams mystream $
(nil)
(10.05s)

Streams中可以使用XGROUP创建消费组(Consumer groups),与Kafka中的consumer Group术语相同但实现不同。消息队列中的消息一旦被消费组里面的某一个消费者读取了,就不能再被该消费组内的其他消费者读取了。

使用消费组的目的是让组内多个消费者共同承担读取消息的任务,XREAD虽然能实现广播,但是在不想广播的情况下,可以通过消费组将消息划分给多个客户端,每个消费组只是消费组中的一个子集,这种方式可以保证消息读取负载在多个消费者间是均衡分布的。

1
2
127.0.0.1:6379> XGROUP CREATE mystream group1 0 MKSTREAM
OK

为了保证消费者在发生故障或宕机后再次重启,依然可以读取未处理完的消息,Streams会自动使用内部队列(PENDING List)留存消费组里面每个消费者读取的消息,直到消费者使用XACK命令通知Streams,如果消费者没有发送XACK命令,那么消息依然会留存。使用XPENDING命令再次查看消费组中等待消费的消息时,可以看到(integer)的数量从2变成了1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#消费者 c1 使用XREADGROUP命令拉取两条信息
127.0.0.1:6379> XREADGROUP GROUP group1 c1 COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1757662221963-0"
2) 1) "name"
2) "Alice"
3) "action"
4) "login"
2) 1) "1757662226486-0"
2) 1) "name1"
2) "Alice1"
3) "action1"
4) "login1"

# 表示有两条正在等待被确认的信息:(integer) 2
127.0.0.1:6379> XPENDING mystream group1
1) (integer) 2
2) "1757662221963-0"
3) "1757662226486-0"
4) 1) 1) "c1"
2) "2"

#使用XACK来确定1757662221963-0
127.0.0.1:6379> XACK mystream group1 1757662221963-0
(integer) 1

#再次查看group1等待消费的消息
127.0.0.1:6379> XPENDING mystream group1
1) (integer) 1
2) "1757662226486-0"
3) "1757662226486-0"
4) 1) 1) "c1"
2) "1"

Redis为Streams提供了多个命令来适应不同的场景,例如XTRIM来”裁剪”Stream来避免它无限增长,占满内存或磁盘。官方文档中包含了这部分命令的详细使用,这里不再过多赘述。

通过上述示例可以看出Streams已经提供了消息队列的保证,全局唯一ID保证了队列的顺序,XACK保证了重试,可靠性方面Redis本身也支持持久化和高可用。相比Kafka等专用做消息队列的中间件,Kafka和RabbitMQ被认为是更重量级的消息队列,在直觉上要显得更为稳妥。

如果要选用Redis作为消息队列,大部分原因在于它轻量级场景下的执行效率,有限的内存空间就能满足消息存储的需求,高性能特性支持快速的消息读写……它更适合一些比如发送邮件,导出报表,重试等轻量级消息队列的任务。这种任务生命周期较短,延迟性较低但又不要求长期存放。

不仅仅只是消息队列

Redis的作者Salvatore Sanfilippo在谈论他设计的Streams时发表了一篇博客,表示他并不希望提到Streams时就直接把它与消息队列划等号,所以在本篇开头部分就表明了他的立场:Streams可以用来作为消息队列,但不仅限于消息队列。

作者认为“流(Streaming)”本身就是一种极好的模式和“思维模型”,它具有更强的通用性,消息队列的使用场景只不过是它其中一个部分。

Streams可以被认为是CSV的超集,在记录一些轻量级的数据时,很多开发者会直接生成一个CSV文件,并把这些数据在这个文件中追加进去。

那么为什么不把这些数据放入到内存中?

CSV文件的局限性:

  1. 做范围查询很难(效率低)。
  2. 存在大量冗余信息:时间几乎每条都一样,字段也重复。去掉的话又会失去灵活性,比如换一组字段就不方便。
  3. 偏移量只是文件的字节位置:只要文件结构变了,偏移就失效了,根本没有真正的主 ID 概念。条目没有唯一地址。
  4. 不能删除条目,只能标记无效,而且无法垃圾回收,除非重写日志。但日志重写通常很糟糕,能避免就避免。

CSV文件的好处:

1.没有固定结构,字段可以变

2.生成简单

3.相当紧凑

Redis Streams的设计思路就是为了保留这些优点的情况下,解决掉它的缺陷。所以它最终变成一个混合型数据结构,看起来像Sorted Set但是内部使用了多种实现方式:

  • 宏节点里包含多个元素
  • 数据结构使用 listpack,非常紧凑(整数即使作为字符串语义,仍以二进制存)
  • 进一步应用了 增量压缩相同字段压缩
  • 节点通过基数树链接,既省内存又支持高效按 ID/时间检索

增量压缩的宏节点+基数树的方式能让顺序遍历达到O(M)级别,ID有序的方式更适合时间排序,相当于兼顾了List的顺序高效Sorted Set 的范围检索能力,内存的占用也极低,使用Sorted Set + Hash存储相同数量级的数据,内存占用率要比Stream要高13倍!极端情况下使用18GB内存可以让Streams存放约10亿条记录。