技术饭

redis队列详解

copylian    0 评论    13953 浏览    2023.05.01

现如今的互联网应用大都是采用分布式系统架构设计的,所以消息队列已经逐渐成为企业应用系统内部通信的核心手段,它具有低耦合、可靠投递、广播、流量控制、最终一致性 等一系列功能。当前使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等,而部分数据库 如 Redis、MySQL 以及 phxsql ,如果硬搞的话,其实也可实现消息队列的功能。

可能有人觉得,各种开源的 MQ 已经足够使用了,为什么需要用 Redis 实现 MQ 呢?有些简单的业务场景,可能不需要重量级的 MQ 组件(相比 Redis 来说,Kafka 和 RabbitMQ 都算是重量级的消息队列)。

一、List 实现消息队列:

定义原理:Redis 列表是简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)。所以常用来做异步队列使用。将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理。

380b2a317eda4a6297199b30f6b5d313.png


1)、LIST列表的基础命令:

set_time_limit(0);

$this->queue="queue"; //消息队里非key

$this->group='pin-group'  //消费者组

$consumer='consumerA';  //消费者

if(!$this->redis->exists($this->queue)){  /*队列不存在需要初始化*/

$this->redis->xadd($this->queue,'*',['type'=>0,'dat'=>'']);

}

$res=$this->redis->xinfo('groups',$this->queue);  /*获取strarm消费组信息*/

if(!$res){

$res=$this->redis->xgroup('create',$this->queue,$this->group,'0');  /*创建消费组*/

var_dump($res);

}

while (1) {

echo 'start============'.microtime(true)." \n";

//从最后读取一条,阻塞5秒

$read = $this->redis->rawCommand('xreadgroup','group',$this->group,$consumer,'block','5000', 'count', '1' ,'streams',$this->queue,'>');

//$res = $this->redis->rawCommand('xpending',$this->queue,$this->group,'-','+','10',$consumer);  //消费者的待处理消息

$info = $read[0][1] ?? [];

if (empty($info)) {

continue;

}

$msgCount = count($info);

for ($a = 0; $a < $msgCount; $a ++) {

$msgId = $info[$a][0] ?? 0;  //每条消息的id

$msg=$info[$a][1];

$type=$msg[1];

$dat=json_decode($msg[3],true);

print_r([$type,$dat]);

$xack = $this->redis->rawCommand('xack',$this->queue,$this->group,$msgId);  //确认消息已经处理

echo $msgId.':'.$xack.PHP_EOL;

}

//sleep(1);

echo "end============".microtime(true)." \n";

}

v2-d0be75099f03dbe4c7b2fc0bb0ceeb3f_r.jpg

2)、即时消费问题:

通过 LPUSH,RPOP 这样的方式,会存在一个性能风险点,就是消费者如果想要及时的处理数据,就要在程序中写个类似 while(true) 这样的逻辑,不停的去调用 RPOP 或 LPOP 命令,这就会给消费者程序带来些不必要的性能损失

所以,Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令(带 B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式就节省了不必要的 CPU 开销。

while true:

        // msg = redis.rpop("queue")

        // 没消息阻塞等待,0表示不设置超时时间    

        msg = redis.brpop("queue", 0)

        // 没有消息,继续循环

        if msg == null:

                continue

        // 处理消息

        handle(msg)


LPUSH、BRPOP 左进右阻塞出RPUSH、BLPOP 右进左阻塞出

127.0.0.1:6379> lpush yourlist c d

(integer) 4

127.0.0.1:6379> blpop yourlist 10

1) "yourlist"

2) "d"

127.0.0.1:6379> blpop yourlist 10

1) "yourlist"

2) "c"

如果将超时时间设置为 0 时,即可无限等待,直到弹出消息,因为 Redis 单线程的特点,所以在消费数据时,同一个消息会不会同时被多个 consumer 消费掉,但是需要我们考虑消费不成功的情况。


3)、可靠队列模式 | ack 机制

以上方式中, List 队列中的消息一经发送出去,便从队列里删除。如果由于网络原因消费者没有收到消息,或者消费者在处理这条消息的过程中崩溃了,就再也无法还原出这条消息。究其原因,就是缺少消息确认机制。

为了保证消息的可靠性,消息队列都会有完善的消息确认机制(Acknowledge),即消费者向队列报告消息已收到或已处理的机制。

再看上边的表格中,有两个命令, RPOPLPUSH、BRPOPLPUSH (阻塞)从一个 list 中获取消息的同时把这条消息复制到另一个 list 里(可以当做备份),而且这个过程是原子的。这样我们就可以在业务流程安全结束后,再删除队列元素,实现消息确认机制。

127.0.0.1:6379> rpush myqueue one

(integer) 1

127.0.0.1:6379> rpush myqueue two

(integer) 2

127.0.0.1:6379> rpush myqueue three

(integer) 3

127.0.0.1:6379> rpoplpush myqueue queuebak

"three"

127.0.0.1:6379> lrange myqueue 0 -11) "one"

2) "two"

127.0.0.1:6379> lrange queuebak 0 -1

1) "three"


二、订阅与发布实现消息队列

发布/订阅模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。

Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式。

这个 频道 和 模式 有什么区别呢?

频道我们可以先理解为是个 Redis 的 key 值,而模式,可以理解为是一个类似正则匹配的 Key,只是个可以匹配给定模式的频道。这样就不需要显式的去订阅多个名称了,可以通过模式订阅这种方式,一次性关注多个频道。

1682932707394155.gif

1682932752772519.png

1682932764479024.png

参考:https://www.copylian.com/technology/513.html


三、Streams 实现消息队列

1)、定义:

Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。它就像是个仅追加内容的消息链表,把所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。而且消息是持久化的。每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。Streams 是 Redis 专门为消息队列设计的数据类型,所以提供了丰富的消息队列操作命令。

v2-25408c9f2b3bb6cdb96bf1afc5de0125_r.jpg

2)、基础命令:

v2-05d16f254563d5bf5dba40b731787150_r.jpg

3)、基础操作:

# * 号表示服务器自动生成 ID,后面顺序跟着一堆 key/value

127.0.0.1:6379> xadd mystream * f1 v1 f2 v2 f3 v3

"1609404470049-0"  ## 生成的消息 ID,有两部分组成,毫秒时间戳-该毫秒内产生的第1条消息


# 消息ID 必须要比上个 ID 大

127.0.0.1:6379> xadd mystream 123 f4 v4  

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item


# 自定义ID

127.0.0.1:6379> xadd mystream 1609404470049-1 f4 v4

"1609404470049-1"


# -表示最小值 , + 表示最大值,也可以指定最大消息ID,或最小消息ID,配合 -、+ 使用

127.0.0.1:6379> xrange mystream - +

1) 1) "1609404470049-0"

    2) 1) "f1"

        2) "v1"

        3) "f2"

        4) "v2"

        5) "f3"

        6) "v3"

2) 1) "1609404470049-1"

    2) 1) "f4"

        2) "v4"


127.0.0.1:6379> xdel mystream 1609404470049-1

(integer) 1


127.0.0.1:6379> xlen mystream

(integer) 1


# 删除整个 stream

127.0.0.1:6379> del mystream

(integer) 1


4)、独立消费:xread 以阻塞或非阻塞方式获取消息列表,指定 BLOCK 选项即表示阻塞,超时时间 0 毫秒(意味着永不超时)

# 从ID是0-0的开始读前2条

127.0.0.1:6379> xread count 2 streams mystream 0

1) 1) "mystream"

    2) 1) 1) "1609405178536-0"

        2) 1) "f5"

        2) "v5"

2) 1) "1609405198676-0"

    2) 1) "f1"

        2) "v1"

        3) "f2"

        4) "v2"


# 阻塞的从尾部读取流,开启新的客户端xadd后发现这里就读到了,block 0 表示永久阻塞

127.0.0.1:6379> xread block 0 streams mystream $

1) 1) "mystream"

    2) 1) 1) "1609408791503-0"

    2) 1) "f6"

        2) "v6"

(42.37s)

可以看到,我并没有给流 mystream 传入一个常规的 ID,而是传入了一个特殊的 ID $这个特殊的 ID 意思是 XREAD 应该使用流 mystream 已经存储的最大 ID 作为最后一个 ID。以便我们仅接收从我们开始监听时间以后的新消息。这在某种程度上相似于 Unix 命令tail -f。当然,也可以指定任意有效的 ID。而且, XREAD 的阻塞形式还可以同时监听多个 Strema,只需要指定多个键名即可。

127.0.0.1:6379> xread block 0 streams mystream yourstream $ $


5)、消费组

xread 虽然可以扇形分发到 N 个客户端,然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是从同一流向许多客户端提供不同的消息子集。比如下图这样,三个消费者按轮训的方式去消费一个 Stream。

v2-113f57e6e0fdcbdfd9f990b280cd0548_r.jpg

Redis Stream 借鉴了很多 Kafka 的设计。

Consumer Group:有了消费组的概念,每个消费组状态独立,互不影响,一个消费组可以有多个消费者

last_delivered_id :每个消费组会有个游标 last_delivered_id 在数组之上往前移动,表示当前消费组已经消费到哪条消息了

pending_ids :消费者的状态变量,作用是维护消费者的未确认的 id。pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

v2-0773322051d599ab1b027dfe40af09ad_r.jpg

Stream 不像 Kafak 那样有分区的概念,如果想实现类似分区的功能,就要在客户端使用一定的策略将消息写到不同的 Stream。

xgroup create:创建消费者组

xgreadgroup:读取消费组中的消息

xack:ack 掉指定消息

# 创建消费者组的时候必须指定 ID, ID 为 0 表示从头开始消费,为 $ 表示只消费新的消息,也可以自己指定

127.0.0.1:6379> xgroup create mystream mygroup $

OK

# 查看流和消费者组的相关信息,可以查看流、也可以单独查看流下的某个组的信息

127.0.0.1:6379> xinfo stream mystream

1) "length"

2) (integer) 4  # 共 4 个消息

3) "radix-tree-keys"

4) (integer) 1

5) "radix-tree-nodes"

6) (integer) 2

7) "last-generated-id"

8) "1609408943089-0"

9) "groups"

10) (integer) 1  # 一个消费组

11) "first-entry" # 第一个消息

12) 1) "1609405178536-0"

    2) 1) "f5"

        2) "v5"

13) "last-entry"  # 最后一个消息

14) 1) "1609408943089-0"

    2) 1) "f6"

        2) "v6"

127.0.0.1:6379>


按消费组消费

Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

#  消费组 mygroup1 中的 消费者 c1 从 mystream 中 消费组数据

# > 号表示从当前消费组的 last_delivered_id 后面开始读

# 每当消费者读取一条消息,last_delivered_id 变量就会前进

127.0.0.1:6379> xreadgroup group mygroup1 c1 count 1 streams mystream >

1) 1) "mystream"

2) 1) 1) "1609727806627-0"

    2) 1) "f1"

        2) "v1"

        3) "f2"

        4) "v2"

        5) "f3"

        6) "v3"


127.0.0.1:6379> xreadgroup group mygroup1 c1 count 1 streams mystream >

1) 1) "mystream"

2) 1) 1) "1609727818650-0"

    2) 1) "f4"

        2) "v4"


# 已经没有消息可读了            

127.0.0.1:6379> xreadgroup group mygroup1 c1 count 2 streams mystream >

(nil)


# 还可以阻塞式的消费

127.0.0.1:6379> xreadgroup group mygroup1 c2 block 0 streams mystream >

µ1) 1) "mystream"

2) 1) 1) "1609728270632-0"

    2) 1) "f5"

        2) "v5"

(89.36s)


# 观察消费组信息

127.0.0.1:6379> xinfo groups mystream

1) 1) "name"

    2) "mygroup1"

    3) "consumers"

    4) (integer) 2  # 2个消费者

    5) "pending"

    6) (integer) 3   # 共 3 条正在处理的信息还没有 ack

    7) "last-delivered-id"

    8) "1609728270632-0"


127.0.0.1:6379> xack mystream mygroup1 1609727806627-0  # ack掉指定消息

(integer) 1

就目前来说,Stream 还是不能当做主流的 MQ 来使用的,而且使用案例也比较少,慎用。


redis stream php案例代码:https://blog.csdn.net/qq_18743819/article/details/107276380

set_time_limit(0);

$this->queue="queue"; //消息队里非key

$this->group='pin-group'  //消费者组

$consumer='consumerA';  //消费者

if(!$this->redis->exists($this->queue)){  /*队列不存在需要初始化*/

        $this->redis->xadd($this->queue,'*',['type'=>0,'dat'=>'']);

}

$res=$this->redis->xinfo('groups',$this->queue);  /*获取strarm消费组信息*/

if(!$res){

        $res=$this->redis->xgroup('create',$this->queue,$this->group,'0');  /*创建消费组*/

        var_dump($res);

}


while (1) {

        echo 'start============'.microtime(true)." \n";

        //从最后读取一条,阻塞5秒

        $read = $this->redis->rawCommand('xreadgroup','group',$this->group,$consumer,'block','5000', 'count', '1' ,'streams',$this->queue,'>');

        //$res = $this->redis->rawCommand('xpending',$this->queue,$this->group,'-','+','10',$consumer);  //消费者的待处理消息

        $info = $read[0][1] ?? [];

        if (empty($info)) {

                continue;

        }

        $msgCount = count($info);

        for ($a = 0; $a < $msgCount; $a ++) {

                $msgId = $info[$a][0] ?? 0;  //每条消息的id

                $msg=$info[$a][1];

                $type=$msg[1];

                $dat=json_decode($msg[3],true);

                print_r([$type,$dat]);

                $xack = $this->redis->rawCommand('xack',$this->queue,$this->group,$msgId);  //确认消息已经处理

                echo $msgId.':'.$xack.PHP_EOL;

        }

        //sleep(1);

        echo "end============".microtime(true)." \n";

}


参考:

Redis 消息队列的三种方案(List、Streams、Pub/Sub):https://zhuanlan.zhihu.com/p/343632121https://mp.weixin.qq.com/s/_q0bI62iFrG8h-gZ-bCvNQ

Redis队列:https://blog.csdn.net/xiaodong_526/article/details/12042232

Redis实现消息队列的几种方式及其优劣:https://blog.csdn.net/le_17_4_6/article/details/124457648

Redis实现简单消息队列:https://www.jianshu.com/p/9c04890615b

redis的详解和项目应用之PHP操作总结:https://blog.csdn.net/m0_68949064/article/details/126541922

只袄早~~~
感谢你的支持,我会继续努力!
扫码打赏,感谢您的支持!

文明上网理性发言!

  • 还没有评论,沙发等你来抢