Redis核心技术12-消息队列
# Redis核心技术12-消息队列
消息队列要支持组件通信消息的快速读写,而Redis本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。本文主要从以下两个方面进行展开:
- 消息队列的消息存取需求是什么?
- Redis如何实现消息队列的需求?
# 消息队列的消息存取需求
当两个组件要基于消息队列进行通信时,一个组件会把要处理的数据以消息的形式传递给消息队列,然后,这个组件就可以继续执行其他操作了;远端的另一个组件从消息队列中把消息读取出来,再在本地进行处理。其中,发送消息称为生产者
,消费消息称为消费者
。
在消息队列中,消费者可以异步读取生产者消息,然后再处理。这样就能避免因为消费者消费速度小于生产者生产速度而造成阻塞。这也是消息队列作为分布式组件通信的一大优势。
不过,消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。
# 消息保存
虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。
# 处理重复的消息
因为网络原因导致生产者以为消费者没有接收到信息,因此生产者会额外发送一条信息,当消费者接收到两条相同的消息的时候,需要具备处理重复消息的能力。
# 保证消息可靠性
消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
# 基于List的消息队列解决方案
消息保存
由于List本身就是按照先进先出的顺序对数据进行存取的,所以已经满足了消息保存的需求。具体来说,生产者可以使用 LPUSH 命令把要发送的消息依次写入 List,而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。
但是这种方案存在一个问题,那就是消费者并不知道何时去取消息,就需要再程序中不停调用RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。这会导致消费者的程序CPU一直消耗在执行RPOP命令上,带来不必要的性能损失。
对于这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式更能节省CPU开销。
处理重复消息
一方面,消息队列要能给每一个消息提供全局唯一的 ID 号;另一方面,消费者程序要把已经处理过的消息的 ID 号记录下来。
对于消费者:消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。
对于生产者:由于List本身是不会为每个消息生成ID号的。所以,消息的全局唯一 ID 号就需要生产者程序在发送消息前自行生成。生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。
消息可靠性
当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。
为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
List做消息队列的时候,三个基本业务都能够保证,但是如果生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致 List 中的消息越积越多,给 Redis 的内存带来很大压力。所以我们希望启动多个消费者组成一个消费组,来一起处理List中的消息,但是List类型并不支持消费组的实现。
# 基于Streams的消息队列解决方案
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
- XREAD:用于读取消息,可以按 ID 读取数据;
- XREADGROUP:按消费组形式读取消息;
- XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。
插入
XADD mqstream * repo 5
"1599203861727-0"
往名称为 mqstream 的消息队列中插入一条消息,消息的键是 repo,值是 5。消息队列名称后面的*,表示让 Redis 为插入的数据自动生成一个全局唯一的 ID。
消息的全局唯一 ID 由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。
读取
XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
消费者也可以在调用 XRAED 时设定 block 配置项,实现类似于 BRPOP 的阻塞读取操作。当消息队列中没有消息时,一旦设置了 block 配置项,XREAD 就会阻塞,阻塞的时长可以在 block 配置项进行设置。Block后面的数字单位为毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 100毫秒,如果一直没有消息,100毫秒之后返回空。
消费组
Streams 本身可以使用 XGROUP 创建消费组,创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。
XGROUP create mqstream group1 0
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
XREADGROUP group group2 consumer1 count 1 streams mqstream >
XREADGROUP group group2 consumer2 count 1 streams mqstream >
XREADGROUP group group2 consumer3 count 1 streams mqstream >
可以看到Streams的基本操作就能满足消息保存
、处理重复消息
的业务。对于保证消息可靠性
,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。