10 minutes reading time
使用Redis实现消息队列主要有三种方法:
下面分别对这三种方法进行介绍,并编写简单例子。
看到队列,你会想到Redis有个数据类型List,List能很好符合队列的要求。List的底层是一个链表,在头部和尾部进行操作的时间复杂度都是O(1)。 使用List进行队列操作,你可以这样使用。 生产者使用LPUSH进行消息发布 消费者使用RPOP对消息进行消费 这里存在着一个问题,如果LIST没有消息时,消费者执行RPOP时,会返回null(nil)。 我们在编写消费者逻辑时,一般是循环不断从队列中消费数据进行处理,如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。Redis提供了阻塞式拉起命令BRPOP / BLPOP,使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 null,既兼顾了效率还避免了CPU空转问题。 这是List队列的代码例子:
// 生产者
package main
import (
"context"
"github.com/go-redis/redis/v8"
)
func main()
// 消费者
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
)
func main()
List队列的缺点:
从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。 它正好可以解决前面提到的第一个问题:重复消费。 即多组生产者、消费者的场景,我们来看它是如何做的。 Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。 使用Pub/Sub就能很好的解决不能重复消费的问题,多个消费者能够消费同一个生产的消息。 除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。 代码:
// 订阅
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main()
// 发布
package main
import (
"context"
"github.com/go-redis/redis/v8"
)
func main()
Pub/Sub 最大问题是:丢数据。 如果发生以下场景,就有可能导致数据丢失:
Pub/Sub的优缺点:
首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:
生产者发布 2 条消息:
// *表示让Redis自动生成消息ID
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"
使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。 这个消息 ID 的格式是「时间戳-自增序号」。 消费者拉取消息:
// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
2) 1) 1) "1618469123380-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618469127777-0"
2) 1) "name"
2) "lisi"
如果想继续拉取消息,需要传入上一条消息的 ID:
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)
没有消息,Redis 会返回 NULL。
以上就是 Stream 最简单的生产、消费。
// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。 2) Stream 是否支持发布 / 订阅模式? 也没问题,Stream 通过以下命令完成发布订阅:
下面我们来看具体如何做? 首先,生产者依旧发布 2 条消息:
127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"
之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:
// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK
消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。 第一个消费组开始消费:
// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
2) 1) 1) "1618470740565-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618470743793-0"
2) 1) "name"
2) "lisi"
同样地,第二个消费组开始消费:
// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
2) 1) 1) "1618470740565-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618470743793-0"
2) 1) "name"
2) "lisi"
我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。 这样一来,就达到了多组消费者「订阅」消费的目的。
// group1下的 1618472043089-0 消息已处理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0
如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。 待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。
// 消费者重新上线,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前没消费成功的数据,依旧可以重新消费
1) 1) "queue"
2) 1) 1) "1618472043089-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618472045158-0"
2) 1) "name"
2) "lisi"
而 Redis 在实现 Stream 时,采用了第 2 个方案。 在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。
// 队列长度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"
当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。 这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。 代码:
package main
import (
"context"
"github.com/go-redis/redis/v8"
)
func main()
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main()
[1] https://redis.uptrace.dev/
[2] https://redis.io/commands
[3] https://mp.weixin.qq.com/s/RthQvzLHZRGNo-z6X_7jQQ
[4] https://github.com/overstarry/queue