使用 Redis 作为消息队列 - Pub/Sub, List, SortedSet

有好长一段时间没使用 Redis 了,之前用的都是 AWS 上的 Elastic Cache 的 Redis, 那时候还是用的版本还是 4 和 5。在新的项目由于觉得 Elastic Cache Redis 太贵而未曾使用,在去年的 AWS re:Invent 2023 上发布了 Elastic Cache Serverless,对于非长时间大数据的缓存可以考虑使用。而且还可以使用 Redis 的功能对服务进行解耦合,或作为一致性的协调中心。

此文是作为正式研究使用 Redis Stream (Redis 5.0 新特性)  的一个铺垫,探索在 Redis Stream 之前,可以何种方式把 Redis 当成一个消息队列,后面将会具体讲到如何用 Redis Stream 作为一个支持消费者组,能确认消息的更为完备的消息队列。

如以下几种方式

  1. Pub/Sub 订阅模式
  2. 基于 List 的 LPUSH + BRPOP 的实现
  3. SortedSet, 使用消息 Score 排序功能进行消费

Pub/Sub 订阅模式

它就像 AWS 的 SNS 一样,只有在线的订阅者才能收到消息,每个消费者会收到相同的消息。相关的 Redis 命令是 *sub*, *pub*, 参考 Redis Command group Pub/Sub。订阅的时候可以直接指定 Channel 名称,或用模式匹配,还可关注某些 Key 的事件。

下面用到的 Redis 命令是

psubscript news
publish news "message"

用 Docker 容器启动的 Redis 服务器

docker run -it -p 6379:6379 redis

测试时右上容器先用 psubscribe news 订阅 news 频道,然后在左下窗口用 publish news "message 1"news 频道中发送一条消息,右上窗口收到该消息。再右下窗口用 psubscribe news 订阅同一频道,它只能收到后续的消息。

Pub/Sub 模式消息支持多播,每一个在线的消费者都会收到同样的消息,消息也就会被重复处理。消息不被持久化,也不保存在 Redis 内存中,Redis 宕机就没了,没有在线的订阅者消息也是白发。Pub/Sub 是在 Redis 2.0 就有的功能,看来没怎么用开来是有原因的。

Pub/Sub 模式的功能非常像 AWS SNS, 消息只发送到在线的 SNS 订阅者,SNS 只是作为一个消息通道,没有在线订阅者 SNS 主题发送了消息只是无用功,消息不会持久化。

List 的 LPUSH/BRPOP 操作

之所以选择 list 的这两个操作是因为它们的特殊性

  1. LPUSH:  往列表后添加一个元素,如果 key 不存在就创建它
  2. BRPOP:  取走列表的最后一个元素,没有消息的话会进行等候(timeout 可设置),取到消息则退出,并在列表为空时删除该 key

由于 Redis 服务端执行 Redis 命令是单线程的, 所以即使多个 Redis 客户端用了 brpop news 9999999 命令进行阻塞等待,一个元素也只会被其中一个客户端收到(在一端取走便没有了)。再就是因为是 list,所以消息会保留在 Redis 内存当中。

BRPOP 取到消息退出后还需重新监听,一次只能取一条消息。另一个命令 RPOP key [count] 一次可以取多条消息, 取到 key 中没有元素时删除该 key, RPOP 不会进行阻塞等待,没消息则直接退出, 用 BRPOP 在消息为空时可减少请求的次数,但一条一条取消息的效率较低。两个命令可以配合使用,有消息时用 RPOP, RPOP 取不到消息再转用 BRPOP 监听,再适当的用 LLEN 检查列表的长度, key 不存在时 LLEN 返回 0。 

List 的 LPUSH/BRPOP/RPOP 比 Pub/Sub 用作消息队列还更靠点谱。

SortedSet,依据 Score 排序

使用 SortedSet 作为消息队列的原理是

  1. ZADD 添加消息时以时间戳作为 Score 值, SortedSet 将以 Score 值进行排序,后添加的值总是被追加到后端。Score 值实质充当了像 Kafka 消息的 Offset 概念
  2. 消费端用 ZRANGEBYSCORE 或 ZPOPMIN 从头端取一定数量的消息
  3. ZREM 删除已消费的消息
  4. 重复 #2 和 #3 便能处理所有的消息

示范

ZADD 添加消息,用当前时间戳作为 score

$ redis-cli zadd news $(date +%s%3) message1
$ redis-cli zadd news $(date +%s%3) message2
$ redis-cli zadd news $(date +%s%3) message3
$ redis-cli zadd news $(date +%s%3) message4
$ redis-cli zadd news $(date +%s%3) message5

不连接到 redis-cli 提示符下操作是因为可以 $(date +%s%s) 获得系统时间戳

我们可以列出上面添加的消息和相应的 Score 值

127.0.0.1:6379> ZRANGEBYSCORE news -inf +inf WITHSCORES
1) "message1"
2) "17042588773"
3) "message2"
4) "17042588843"
5) "message3"
6) "17042588873"
7) "message4"
8) "17042588883"
9) "message5"
10) "17042588903"

+inf, -inf 是两个特殊的 Score 值,最大和最小

消息是有序的,只要用 ZRANGEBYSCORE 从头端开始消费

先取两条

127.0.0.1:6379> ZRANGEBYSCORE news -inf +inf WITHSCORES LIMIT 0 2
1) "message1"
2) "17042588773"
3) "message2"
4) "17042588843"

如果此时想要把取到的这两条消息用 ZREM 删除掉,执行命令

127.0.0.1:6379> ZREM news message1 message2

或者用 ZREMRANGEBYSCORE 来删除已收到的消息也行

127.0.0.1:6379> ZREMRANGEBYSCORE news 17042588773 17042588843

为保证 ZREMRANGEBYSCORE 操作成功准确,Score 值不能重复,必须能与消息一一对应,ZADD 的 Score 总是要递增的。

再用相同的  ZRANGEBYSCORE 就能取到下两条消息

127.0.0.1:6379> ZRANGEBYSCORE news -inf +inf WITHSCORES LIMIT 0 2
1) "message3"
2) "17042588873"
3) "message4"
4) "17042588883"

或者不想删除原来的消息,可以用取到消息的 Score 值 + 1 往后取消息,如用 message4 的  Score 17042588883 + 1 = 17042588884

127.0.0.1:6379> ZRANGEBYSCORE news 17042588884 +inf WITHSCORES LIMIT 0 2
1) "message5"
2) "17042588903"

不用 ZRMRANGEBYSCORE/ZREM, 我们还能用直接 ZPOPMIN 从头端取走若干条消息,取后即从中删除,这能避免多客户端重复消息相同的消息。像上面如果没有执行 ZREM 或 ZREMRANGEBYSCORE 的话 , 后三条消息仍然在 Redis 中,这时我们用 ZPOPMIN 试下

127.0.0.1:6379> ZPOPMIN news 2
1) "message3"
2) "17042588873"
3) "message4"
4) "17042588883"
127.0.0.1:6379> ZPOPMIN news 2
1) "message5"
2) "17042588903"

第一次得到两条消息,后一次只获得一条消息,news 中的消息最后被清空

使用 SortedSet 的方式要注意的几点

  1. 尽量不要重复的 Score 值,时间戳加序号,如果多个节点执行 ZADD 操作可使用一个中央自增 ID 产生器
  2. SortedSet 不允许重复的消息,支持重复消息的话可以消息中混入 GUID 或其他的唯一 ID, 如重用不重复的 Score 值
  3. ZRANGEBYSCORE 无法避免消息被多客户端重复消费
  4. 用 ZPOPMIN 命令,消息不会在多客户端中重复消费

本文链接 https://yanbin.blog/use-redis-as-message-queue-pub-sub-list-sortedset/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

1 Comment
Inline Feedbacks
View all comments
trackback

[…] Redis 之前如何曲线的方式用作消息队列 使用 Redis 作为消息队列 - Pub/Sub, List, SortedSet. […]