redis 消息队列 最多可以支持多少个队列

小伙伴们大家好经过前面几次講解,相信大家都知道redis了它是一个基于内存亦可持久化的日志型、Key-Value数据库。非常好用免费及开源。今天就来给大家聊聊关于redis中的消息對列的优缺点

首先说一说消息队列,消息队列:Message Queue常用于解决并发系统中的资源一致性问题,提升峰值的处理能力同时保证消息的顺序性、可恢复性、必送达性,对应用进行解耦或者实现异步通讯等。

市面上的 MQ应用有很多(例如:KafkaRabbitMQ,Disque)但如果要基于 Redis 来实现的话,比较典型的方案有:

基于Stream类型的实现

在消息队列使用中有生产者producter和消费者consumer。生产者负责生成消息消费者负责使用处理消息。

生产指的是將消息放入消息队列。 消费指的是读取并处理消息。通常一个消息再被消费后就应该从消息队列中删除。

LPUSH将消息队列

BRPOP,从队列中取絀消息阻塞模式

就是一个典型的基于FIFL队列的解决方案。其中LPUSH是生产者做的事而BRPOP是消费者做的事。

2.Reids支持持久化消息意味着消息不会丢夨,可以重复查看(注意不是消费只看不用,LRANGE类的指令)

3.可以保证顺序,保证使用LPUSH命令可以保证消息的顺序性

4.使用RPUSH,可以将消息放在队列的开头达到优先消息的目的,可以实现简易的消息优先队列

1.做消费确认ACK比较麻烦,就是不能保证消费者在读取之后未处理后的宕機问题。导致消息意外丢失通常需要自己维护一个Pending列表,保证消息的处理确认

2.不能做广播模式,例如典型的Pub/Discribe模式

3.不能重复消费,一旦消费就会被删除

4.不支持分组消费需要自己在业务逻辑层解决

PUBLISH,向信道发送消息

生产者和消费者通过相同的一个信道(Channel)进行交互信道其實也就是队列。通常会有多个消费者多个消费者订阅同一个信道,当生产者向信道发布消息时该信道会立即将消息逐一发布给每个消費者。可见该信道对于消费者是发散的信道,每个消费者都可以得到相同的消息典型的对多的关系。

1.典型的广播模式一个消息可以發布到多个消费者

2.多信道订阅,消费者可以同时订阅多个信道从而接收多类消息

3.消息即时发送,消息不用等待消费者读取消费者会自動接收到信道发布的消息

1.消息一旦发布,不能接收换句话就是发布时若客户端不在线,则消息丢失不能寻回

2.不能保证每个消费者接收嘚时间是一致的

3.若消费者客户端出现消息积压,到一定程度会被强制断开,导致消息意外丢失通常发生在消息的生产远大于消费速度時

由此可见,Pub/Sub 模式不适合做消息存储消息积压类的业务,而是擅长处理广播即时通讯,即时反馈的业务

有序集合的方案是在自己确萣消息顺ID时比较常用,使用集合成员的Score来作为消息ID保证顺序,还可以保证消息ID的单调递增通常可以使用时间戳+序号的方案。确保了消息ID的单调递增利用SortedSet的依据Score排序的特征,就可以制作一个有序的消息队列了

可以自定义消息ID,在消息ID有意义时比较重要。

不允许重复消息(以为是集合)同时消息ID确定有错误会导致消息的顺序出错。

所以若不是需要自定义消息ID,这种方案就会显得无力...

这个Stream类型redis就是为了實现消息队列的支持自动生成消息ID,分组消费ACK,消息转移队列监控等核心消息队列功能。

以上就是今天关于redis消息队列优缺点的内容喜欢就请多多关注本站吧。

网上的资源对各种情况都有详细嘚解释在此不做过多赘述。本文
仅介绍如何使用Redis实现轻量级MQ的过程

为什么要用Redis实现轻量级MQ?

在业务的实现过程中就算没有大量的流量,解耦和异步化几乎也是处处可用此时MQ就显得尤为重要。但与此同时MQ也是一个蛮重的组件例如我们如果用RabbitMQ就必须为它搭建一个服务器,同时如果要考虑可用性就要为服务端建立一个集群,而且在生产如果有问题也需要查找功能在中小型业务的开发过程中,可能业務的其他整个实现都没这个重过重的组件服务会成倍增加工作量。
所幸的是Redis提供的list数据结构非常适合做消息队列。
但是如何实现即时消费如何实现ack机制?这些是实现的关键所在

让我们来看看阻塞式弹出的使用方式:

1、当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞直到等待超时或发现可弹出元素为止。
2、当给定多个key参数时按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素

另外,BRPOP 除了弹出元素的位置和 BLPOP 不同之外其他表现一致。

以此来看列表的阻塞式弹出有两个特点:

1、如果list中没有任务的時候,该连接将会被阻塞
2、连接的阻塞有一个超时时间当超时时间设置为0时,即可无限等待直到弹出消息

由此看来,此方式是可行的但此为传统的观察者模式,业务简单则可使用如A的任务只由B去执行。但如果A和Z的任务B和C都能执行,那使用这种方式就相形见肘这個时候就应该使用订阅/发布模式,使业务系统更加清晰
好在Redis也支持Pub/Sub(发布/订阅)。在消息A入队list的同时发布(PUBLISH)消息B到频道channel此时已经订閱channel的worker就接收到了消息B,知道了list中有消息A进入即可循环lpop或rpop来消费list中的消息。流程如下:

其中的worker可以是单独的线程也可以是独立的服务,其充当了Consumer和业务处理者角色下面做实例说明。

示例场景为:worker要做同步文件功能等到有文件生成时立马同步。

首先开启一个线程代表worker來订阅频道channel:

//处理业务(同步文件)

处理业务的时候,就去list中去消费消息:

这样我们就达到了消息的实时消费的目的

  • Publisher把消息通知给Consumer,如果Consumer已处理完任务那么它将向Broker发送ACK消息,告知某条消息已被成功处理可以从队列中移除。如果Consumer没有发送回ACK消息那么Broker会认为消息处理失敗,会将此消息及后续消息分发给其他Consumer进行处理(redeliver flag置为true)
  • 这种确认机制和TCP/IP协议确立连接类似。不同的是TCP/IP确立连接需要经过三次握手,而RabbitMQ只需要一次ACK
  • 值的注意的是,RabbitMQ当且仅当检测到ACK消息未发出且Consumer的连接终止时才会将消息重新分发给其他Consumer因此不需要担心消息处理时间过长而被重新分发的情况。

那么在我们用Redis实现消息队列的ack机制的时候该怎么做呢

  1. work处理失败后,要回滚消息到原始pending队列
  2. 假如worker挂掉也要回滚消息箌原始pending队列

上面第一点可以在业务中完成,即失败后执行回滚消息

(该方案主要解决worker挂掉的情况)

  1. 由pending队列出队后,workers分配一个线程(单个worker)去处理消息——给目标消息append一个当前时间戳和当前线程名称将其写入doing表,然后该worker去消费消息完成后自行在doing表擦除信息。
  2. 启用一个定時任务每隔一段时间去扫描doing队列,检查每隔元素的时间戳如果超时,则由worker的ThreadPoolExecutor去检查线程是否存在如果存在则取消当前任务执行,并紦事务rollback最后把该任务从doing队列中pop出,再重新push进pending队列
  3. 在worker的某线程中,如果处理业务失败则主动回滚,并把任务从doing队列中移除重新push进pending队列。

Redis作为消息队列是有很大局限性的因为其主要特性及用途决定它只能实现轻量级的消息队列。写在最后:没有绝对好的技术只有对業务最友好的技术,谨此献给所有developer

我要回帖

 

随机推荐