使用 Redis 实现消息队列主要有三种方法:
List 队列 发布/订阅模型 Pub/Sub Stream (Redis5+) 下面分别对这三种方法进行介绍,并编写简单例子。
List 队列 看到队列,你会想到 Redis 有个数据类型 List,List 能很好符合队列的要求。List 的底层是一个链表,在头部和尾部进行操作的时间复杂度都是 O(1)。 使用 List 进行队列操作,你可以这样使用。 生产者使用 LPUSH 进行消息发布 消费者使用 RPOP 对消息进行消费 这里存在着一个问题,如果 LIST 没有消息时,消费者执行 RPOP 时,会返回 null(nil)。 我们在编写消费者逻辑时,一般是循环不断从队列中消费数据进行处理,如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。Redis 提供了阻塞式拉起命令 BRPOP / BLPOP,使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 null,既兼顾了效率还避免了 CPU 空转问题。 这是 List 队列的代码例子:
// 生产者 package main import ( "context" "github.com/go-redis/redis/v8" ) func main() { rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 1, }) data :=[]string{`{"name": "jinzhu11221212321", "age": 18, "tags": ["tag211", "tag2"], "orgs": {"orga": "orga"}`,`{"name": "asd", "age": 18, "tags": ["tag211", "tag2"], "orgs": {"orga": "orga"}`} rdb.LPush(context.Background(), "queue", data) } // 消费者 package main import ( "context" "fmt" "log" "time" "github.com/go-redis/redis/v8" ) func main() { rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 1, }) for true { //result, err := rdb.RPop(context.Background(), "queue").Result() result, err := rdb.BRPop(context.Background(), 0, "queue").Result() if err == redis.Nil { continue } else if err != nil { log.Println(err) } fmt.Printf("consumer msg %v\n", result) } } List 队列的缺点:
...