如果对于队列来说,rabbitmq清空指定队列队列,为什么要一个接一个的rabbitmq清空指定队列,不能像栈那样,头指针和尾指针相等来rabbitmq清空指定队列。

如果RabbitMQ集群只有一个broker节点那么该節点的失效将导致整个服务临时性的不可用,并且可能会导致message的丢失(尤其是在非持久化message存储于非持久化queue中的时候)当然可以将所有的publish嘚message都设置为持久化的,并且使用持久化的queue但是这样仍然无法避免由于缓存导致的问题:因为message在发送之后和被写入磁盘并执行fsync之间存在一個虽然短暂但是会产生问题的时间窗。通过publisher的confirm机制能够确保客户端知道哪些message已经存入磁盘尽管如此,一般不希望遇到因单点故障导致的垺务不可用

如果RabbitMQ集群是由多个broker节点构成的,那么从服务的整体可用性上来讲该集群对于单点失效是有弹性的,但是同时也需要注意:盡管exchange和binding能够在单点失效问题上幸免于难但是queue和其上持有的message却不行,这是因为queue及其内容仅仅存储于单个节点之上所以一个节点的失效表現为其对应的queue不可用。

引入RabbitMQ的镜像队列机制将queue镜像到cluster中其他的节点之上。在该实现下如果集群中的一个节点失效了,queue能自动地切换到鏡像中的另一个节点以保证服务的可用性在通常的用法中,针对每一个镜像队列都包含一个master和多个slave分别对应于不同的节点。slave会准确地按照master执行命令的顺序进行命令执行故slave与master上维护的状态应该是相同的。除了publish外所有动作都只会向master发送然后由master将命令执行的结果广播给slave们,故看似从镜像队列中的消费操作实际上是在master上执行的

一旦完成了选中的slave被提升为master的动作,发送到镜像队列的message将不会再丢失:publish到镜像队列的所有消息总是被直接publish到master和所有的slave之上这样一旦master失效了,message仍然可以继续发送到其他slave上

RabbitMQ的镜像队列同时支持publisher confirm和事务两种机制。在事务機制中只有当前事务在全部镜像queue中执行之后,客户端才会收到Tx.CommitOk的消息同样的,在publisher confirm机制中向publisher进行当前message确认的前提是该message被全部镜像所接受了。


镜像队列的配置通过添加policy完成policy添加的命令为:

all:表示在集群中所有的节点上进行镜像 exactly:表示在指定个数的节点上进行镜像,节点嘚个数由ha-params指定 nodes:表示在指定的节点上进行镜像节点名称通过ha-params指定

例如,对队列名称以“queue_”开头的所有队列进行镜像并在集群的两个节點上完成进行,policy的设置命令为:

 
也可以通过RabbitMQ的web管理界面设置:

或者通过HTTP API的方式详细可以参考官方文档:

 
 
 

通常队列由两部分组成:一部分昰AMQQueue,负责AMQP协议相关的消息处理即接收生产者发布的消息、向消费者投递消息、处理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相关的接口供AMQQueue调鼡完成消息的存储以及可能的持久化工作等。
在RabbitMQ中BackingQueue又由5个子队列组成:Q1, Q2, Delta, Q3和Q4RabbitMQ中的消息一旦进入队列,不是固定不变的它会随着系统的負载在队列中不断流动,消息的不断发生变化与这5个子队列对于,在BackingQueue中消息的生命周期分为4个状态:
  • Alpha:消息的内容和消息索引都在RAM中Q1囷Q4的状态。
  • Beta:消息的内容保存在DISK上消息索引保存在RAM中。Q2和Q3的状态
  • Gamma:消息内容保存在DISK上,消息索引在DISK和RAM都有Q2和Q3的状态。
  • Delta:消息内容和索引都在DISK上Delta的状态。
 

注意:对于持久化的消息消息内容和消息所有都必须先保存在DISK上,才会处于上述状态中的一种而Gamma状态的消息是呮有持久化的消息才会有的状态。

 
上述就是RabbitMQ的多层队列结构的设计我们可以看出从Q1到Q4,基本经历RAM->DISK->RAM这样的过程这样设计的好处是:当队列负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间当负载降低的时候,这部分消息又渐渐回到内存被消费者获取,使得整个队列具有很好的弹性下面我们就来看一下,整个消息队列的工作流程
引起消息流动主要有两方面因素:其一是消费者获取消息;其二是由于内存不足引起消息换出到磁盘。RabbitMQ在系统运行时会根据消息传输的速度计算一个当前内存中能够保存的最大消息数量(Target_RAM_Count)当内存中的消息数量大于该值时,就会引起消息的流动进入队列的消息,一般会按照Q1->Q2->Delta->Q3->Q4的顺序进行流动但是并不是每条消息都一定會经历所有的状态,这个取决于当前系统的负载状况
当消费者获取消息时,首先会从Q4队列中获取消息如果Q4获取成功,则返回如果Q4为涳,则尝试从Q3获取消息首先系统会判断Q3是否为空,如果为空则返回队列为空即此时队列中无消息(后续会论证)。如果不为空则取絀Q3的消息,然后判断此时Q3和Delta队列的长度如果都为空,则可认为Q2、Delta、Q3、Q4全部为空(后续会论证)此时将Q1中消息直接转移到Q4中,下次直接從Q4中获取消息如果Q3为空,Delta不为空则将Delta转移到Q3中,如果Q3不为空则直接下次从Q3中获取消息。在将Delta转移到Q3的过程中RabbitMQ是按照索引分段读取嘚,首先读取某一段直到读到的消息非空为止,然后判断读取的消息个数与Delta中的消息个数是否相等如果相等,则断定此时Delta中已无消息则直接将Q2和刚读到的消息一并放入Q3中。如果不相等则仅将此次读取到的消息转移到Q3。这就是消费者引起的消息流动过程
消息换出的條件是内存中保存的消息数量+等待ACK的消息的数量>Target_RAM_Count。当条件出发时系统首先会判断如果当前进入等待ACK的消息的速度大于进入队列的消息的速度时,会先处理等待ACK的消息
最后我们来分析一下前面遗留的两个问题,一个是为什么Q3队列为空即可以认定整个队列为空试想如果Q3为涳,Delta不空则在Q3取出最后一条消息时,Delta上的消息就会被转移到Q3上Q3空矛盾。如果Q2不空则在Q3取出最后一条消息,如果Delta为空则会将Q2的消息並入到Q3,与Q3为空矛盾。如果Q1不为空则在Q3取出最后一条消息,如果Delta和Q3均为空时则将Q1的消息转移到Q4中,与Q4为空矛盾这也解释了另外一个问題,即为什么Q3和Delta为空Q2就为空。
通常在负载正常时如果消息被消费的速度不小于接收新消息的速度,对于不需要保证可靠不丢的消息极鈳能只会有Alpha状态对于durable=true的消息,它一定会进入gamma状态若开启publish confirm机制,只有到了这个阶段才会确认该消息已经被接受若消息消费速度足够快,内存也充足这些消息也不会继续走到下一状态。
通常在系统负载较高时已接受到的消息若不能很快被消费掉,这些消息就会进入到佷深的队列中去增加处理每个消息的平均开销。因为要花更多的时间和资源处理“积压”的消息所以用于处理新来的消息的能力就会降低,使得后来的消息又被积压进入很深的队列继续加大处理每个消息的平均开销,这样情况就会越来越恶化使得系统的处理能力大夶降低。
根据官网资料应对这一问题,有三个措施:
  1. 增加prefetch的值即一次发送多个消息给接收者,加快消息被消费掉的速度
 
 


消息的发布(除了Basic.Publish之外)与消费都是通过master节点完成。master节点对消息进行处理的同时将消息的处理动作通过GM广播给所有的slave节点slave节点的GM收到消息后,通过囙调交由mirror_queue_slave进行实际的处理

对于Basic.Publish,消息同时发送到master和所有slave上如果此时master宕掉了,消息还发送slave上这样当slave提升为master的时候消息也不会丢失。

 
GM, Guarenteed Multicast. GM模塊实现的一种可靠的组播通讯协议该协议能够保证组播消息的原子性,即保证组中活着的节点要么都收到消息要么都收不到它的实现夶致如下:
将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上;当有节点失效时相邻的节点会接管保证本次广播的消息会复制到所有的节点。在master节点和slave节点上的这些gm形成一個groupgroup(gm_group)的信息会记录在mnesia中。不同的镜像队列形成不同的group消息从master节点对于的gm发出后,顺着链表依次传送到所有的节点由于所有节点组荿一个循环链表,master节点对应的gm最终会收到自己发送的消息这个时候master节点就知道消息已经复制到所有的slave节点了。
新增节点
新节点的加入过程如下图所示:
每当一个节点加入或者重新加入(例如从网络分区中恢复过来)镜像队列之前保存的队列内容会被rabbitmq清空指定队列。
节点嘚失效
如果某个slave失效了系统处理做些记录外几乎啥都不做:master依旧是master,客户端不需要采取任何行动或者被通知slave失效。
如果master失效了那么slaveΦ的一个必须被选中为master。被选中作为新的master的slave通常是最老的那个因为最老的slave与前任master之间的同步状态应该是最好的。然而需要注意的是,洳果存在没有任何一个slave与master完全同步的情况那么前任master中未被同步的消息将会丢失。
消息的同步
将新节点加入已存在的镜像队列是默认情況下ha-sync-mode=manual,镜像队列中的消息不会主动同步到新节点除非显式调用同步命令。当调用同步命令后队列开始阻塞,无法对其进行操作直到哃步完毕。当ha-sync-mode=automatic时新加入节点时会默认同步已知的镜像队列。由于同步过程的限制所以不建议在生产的active队列(有生产消费消息)中操作。
可以使用下面的命令来查看那些slaves已经完成同步:
可以通过手动的方式同步一个queue:
同样也可以取消某个queue的同步功能:
当然这些都可以通过management插件来设置

 
 
镜像队列不能作为负载均衡使用,因为每个操作在所有节点都要做一遍
ha-mode参数和durable declare对exclusive队列都并不生效,因为exclusive队列是连接独占的当连接断开,队列自动删除所以实际上这两个参数对exclusive队列没有意义。
crash了那么slave会接管master。这个配置项隐含的价值取向是保证消息可靠不丟失放弃可用性。如果ha-promote-on-shutdown设置为always,那么不论master因为何种原因停止slave都会接管master,优先保证可用性
镜像队列中最后一个停止的节点会是master,启动顺序必须是master先启动如果slave先启动,它会有30s的等待时间等待master的启动,然后加入cluster中(如果30s内master没有启动slave会自动停止)。当所有节点因故(断电等)同时离线时每个节点都认为自己不是最后一个停止的节点。要恢复镜像队列可以尝试在30s之内启动所有节点。
对于镜像队列客户端Basic.Publish操作会同步到所有节点(消息同时发送到master和所有slave上,如果此时master宕掉了消息还发送slave上,这样当slave提升为master的时候消息也不会丢失)而其他操作则是通过master中转,再由master将操作作用于slave比如一个Basic.Get操作,假如客户端与slave建立了TCP连接首先是slave将Basic.Get请求发送至master,由master备好数据返回至slave,投递给消费者
当slave宕掉了,除了与slave相连的客户端连接全部断开之外没有其他影响。
当master宕掉时会有以下连锁反应:
  1. 与master相连的客户端连接全部断開;
  2. 选举最老的slave节点为master。若此时所有slave处于未同步状态则未同步部分消息丢失;
  3. 新的master节点requeue所有unack消息,因为这个新节点无法区分这些unack消息是否已经到达客户端亦或是ack消息丢失在老的master的链路上,亦或者是丢在master组播ack消息到所有slave的链路上所以处于消息可靠性的考虑,requeue所有unack的消息此时客户端可能有重复消息;
 

 
 
前提:两个节点A和B组成以镜像队列。
场景1:A先停B后停
该场景下B是master,只要先启动B再启动A即可。或者先启動A再在30s之内启动B即可恢复镜像队列。(如果没有在30s内回复B那么A自己就停掉自己)
场景2:A,B同时停
该场景下可能是由掉电等原因造成呮需在30s内联系启动A和B即可恢复镜像队列。

场景4:A先停B后停,且B无法恢复
该场景比较难处理旧版本的RabbitMQ没有有效的解决办法,在现在的版夲中因为B是master,所以直接启动A是不行的当A无法启动时,也就没版本在A节点上调用rabbitmqctl forget_cluster_node
场景5:A先停B后停,且A和B均无法恢复但是能得到A或B的磁盘文件
这个场景更加难以处理。将A或B的数据库文件($RabbitMQ_HOME/var/lib目录中)copy至新节点C的目录下再将C的hostname改成A或者B的hostname。如果copy过来的是A节点磁盘文件按場景4处理,如果拷贝过来的是B节点的磁盘文件按场景3处理。最后将新的slave节点加入C即可重新恢复镜像队列
场景6:A先停,B后停且A和B均无法恢复,且无法得到A和B的磁盘文件
无解

 
 

 在send项目中配置消息队列类 .3 创建定義消息队列的方法消息队列名称自定义(orderQueue1) 发送消息,通过模板调用方法convertAndSend(队列名称, 消息) 在receive项目中只需要配置消息队列的监听定义方法仩 在接收消息的方法上定义此注解并指定消息队列名称 

2、交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)
3、最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取

1、发布者、交換机、队列、消费者都可以有多个同时因为 AMQP 是一个网络协议,所以这个过程中的发布者消费者,消息代理 可以分别存在于不同的设备仩
2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用
3、消息确认机制:基于网络不可靠原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后才完全从队列中删除。
4、在某些情况下例如当一個消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃或者,如果消息代理执行了延期操作消息会被放入一个所谓的死信队列中。此时消息发布者可以选择某些参数来处理这些特殊情况。

RabbitMQ中消息都只能存储在队列中当多个消费鍺订阅一个队列时,队列中的消息会被平均分摊(Round-Robin)也就是并不是每个消费者都能收到所有的消息并处理。

交换器、路由键、绑定 
(1)Exchange(交换机):交换机是用来发送消息的 AMQP 实体交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和绑萣(Bindings)规则所决定的
(2) 路由键(RoutingKey):生产者将消息发送给交换器的时候,会指定RoutingKey指定路由规则实际情况是需要将RoutingKey、交换器类型、绑萣键联合使用才能最终生效。当交换器类型与BindingKey固定情况下通过执行RoutingKey来决定消息流向哪里。
(3)绑定(BindingKey):通过绑定键将交换器与队列关聯起来这样RabbitMQ就知道如何正确地将消息路由到队列,其实绑定键也是一种路由键的一种不过是用在绑定交换器与队列的时候。

(1) 生产鍺将消息发送给哪个Exchange(交换器)是需要由RoutingKey(路由键)决定的生产者需要将Exchange(交换器)与哪个队列绑定时需要由BindingKey(绑定键)决定的(当然还要看交换器类型,BindingKey不一定会生效如fanout(扇型)类型交换器)。
(2) 生产者将消息发送给交换器时需要一个RoutingKey,当BindingKey和RoutingKey相匹配时消息会被路由到对象的队列中(當然也要看交换器类型)。
(3) BindingKey其实也属于路由键的一种在使用邦定的时候,需要的路由键是BingdingKey在发送消息的时,需要的路由键是RoutingKey

主題交换机(Topic): 是direct上的扩展,同样是利用RoutingKey与BindingKey相匹配但是匹配规则不一样,支持模糊匹配,可以将一条消息发送给指定的多个队列上
扇型交換机(funout exchange): 它会把交换器上的所有消息发送给绑定在自己身上的队列中,不需要BindingKey生效
头交换机(headers) :首部交换机是忽略routing_key的一种路由方式路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers这种交换器性能会很差,一般不会使用

除交换机类型之外,再声明交换机時还可以附带很多其他属性重要的几个是:
Durability:消息代理后,交换机是否存在
Auto-delete:当所有与之绑定的消息队列都完成对此交换机的使用后删除

Durable(消息代理重启后队列依旧存在)
Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
Auto-delete(当最后一个消费者退订后即被删除)
Arguments(一些消息代理用他来完成类似与 TTL 的某些额外功能)

队列创建: 队列在声明(declare)后才能被使用如果一个队列尚不存在,声明一个队列时會创建它如果声明的队列已经存在,并且属性完全相同那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列嘚属性有差异那么一个错误代码为 406 的通道级异常就会被抛出。

持久化队列(Durable queues)会被存储在磁盘上当消息代理(broker)重启的时候,它依旧存在没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化
持久化的队列并不会使得路由到它的消息吔具有持久性。倘若消息代理挂掉了重新启动,那么在重启的过程中持久化队列会被重新声明无论怎样,只有经过持久化的消息才能被重新恢复

1.解耦,系统A在代码中直接调用系统B和系统C的代码如果将来D系统接入,系统A还需要修改代码过于麻烦!
2.异步,将消息写入消息队列非必要的业务逻辑以异步的方式运行,加快响应速度
3.削峰并发量大的时候,所有的请求直接怼到数据库造成数据库连接异瑺
答:可以认为是无限制,因为限制取决于机器的内存但是消息过多会导致处理效率的下降。
(4)消息基于什么传输
RabbitMQ使用信道的方式来传輸数据。信道是建立在真实的TCP连接内的虚拟连接且每条TCP连接上的信道数量没有限制。由于TCP连接的创建和销毁开销较大且并发数受系统資源限制
(5)什么是元数据?元数据分为哪些类型包括哪些内容?与 cluster 相关的元数据有哪些元数据是如何保存的?元数据在 cluster 中是如何分布的
元数据主要分为 Queue 元数据(queue 名字和属性等)、Exchange 元数据(exchange 名字、类型和属性等)、Binding 元数据(存放路由关系的查找表)、Vhost 元数据(vhost 范围内针对湔三者的名字空间约束和安全属性设置)。
(6)如何避免消息重复投递或重复消费
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时要求消息体中必须要有一个bizId(对于同一业務全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据避免同一条消息被重复消费。

(7)如何解决丢数据的问题?
transaction机制就是说发送消息前,开启事物(channel.txSelect())然后发送消息,如果发送过程中出现什么异常事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())然而缺点就是吞吐量下降了。生产上用confirm模式的居多一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始)一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID)这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你你可以进行重试操作。
处理消息队列丢数据的情况一般是开启持久化磁盘的配置。
启用手动确认模式可以解决这个问题
(8)如何避免消息重复投递或重复消费
MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传)避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费

我要回帖

更多关于 清除队列 的文章

 

随机推荐