使用C#如何rabbitmq取消所有rabbitmq 消费者数量连接

RabbitMQ学习系列三:.net 环境下 C#代码订阅 RabbitMQ 消息并处理
上一篇已经讲了Rabbitmq如何在Windows平台安装
不懂请移步:
.net环境下,C#代码订阅RabbitMQ消息队列,本文用easynetq开源的.net Rabbitmq api来实现,EasyNetQ 是一个易于使用的RabbitMQ的.Net客户端API,本文win服务基于topshelf实现,不懂请移步:
二、项目搭建
1、此处省略topshelf服务搭建步骤
2、服务项目结构
3、结构简要说明
NoticeProcess.cs 获取消息体后处理类
BusBuilder.cs 消息服务器连接器
Installer.cs 消息服务安装实现类
ServiceMain.cs 消息服务执行类
VaultService.cs C#订阅消息服务后的处理类,进行routingkey的约束等
Program.cs 程序启动类
三、测试发布和订阅
1、发布消息(此处省略,具体实现参照第二篇文章)
2、C#订阅服务启动测试
3、C#服务调试获取消息
可以看到这里启动后,获取到了1步骤当中的消息routingkey
4、c#处理代码
如果队列有其他的pcm.notice.xxxxx消息,同样会被c#服务获取到,都是基于pcm.notice.#这样的#完全匹配原则。
这样基本简单的c#订阅Rabbitmq消息,获取信息就完成了。
问题整理:
如果C#读取的Rabbitmq中message消息实体和你c#程序中的不一样,会报错如下:
EasyNetQ.EasyNetQInvalidMessageTypeExcepion:Message type is incorrect.Expected 'RabbitMQ_Message:RabbitMQ',but was ''
由最后编辑于:2年前
内容均为作者独立观点,不代表八零IT人立场,如涉及侵权,请及时告知。
评论努力加载中...
是否采纳当前回复为最佳答案?采纳后不可再次编辑。
下载附件将扣除您个80币,是否继续下载?C#操作RabbitMQ示例
时间: 10:38:46
&&&& 阅读:35
&&&& 评论:
&&&& 收藏:0
标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&
using RabbitMQ.C
using RabbitMQ.Client.E
using System.T
using System.T
namespace RabbitMQ
class Program
static void Main(string[] args)
var type = Console.ReadLine();
if (type == "PRODUCER")
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.ExchangeDeclare(exchange: "RabbitMQ_Exchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "RabbitMQ_Queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: "RabbitMQ_Queue", exchange: "RabbitMQ_Exchange", routingKey: "RabbitMQ_RoutingKey", arguments: null);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
for (var i = <span style="color: #; i & <span style="color: #00; i++)
var message = $"RabbitMQ_Exchange # RabbitMQ_Queue # RabbitMQ_RoutingKey # {i}";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "RabbitMQ_Exchange", routingKey: "RabbitMQ_RoutingKey", basicProperties: properties, body: body);
Console.WriteLine(" [x] Sent {0}", message);
Console.ReadLine();
if (type == "CONSUMER")
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.ExchangeDeclare(exchange: "RabbitMQ_Exchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "RabbitMQ_Queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: "RabbitMQ_Queue", exchange: "RabbitMQ_Exchange", routingKey: "RabbitMQ_RoutingKey", arguments: null);
channel.BasicQos(prefetchSize: <span style="color: #, prefetchCount: <span style="color: #, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =&
var body = ea.B
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
Thread.Sleep(<span style="color: #00);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
channel.BasicConsume(queue: "RabbitMQ_Queue", autoAck: false, consumer: consumer);
Console.ReadLine();
&标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&原文地址:https://www.cnblogs.com/xiaowangzhi/p/8642691.html
&&国之画&&&& &&&&chrome插件&&
版权所有 京ICP备号-2
迷上了代码!本系列文章均来自官网原文,属于个人翻译,如有雷同,权当个人归档,忽喷.
RabitMQ 是一个消息中间件,其实就是从消息生产者那里接受消息,然后发送给消息消费者.在这个传输过程中,可以定义一些缓存,持久化,路由的规则。
相关对象的术语简介:
1:生产者(producters)---发送消息的程序叫做生产者,使用带字母P的图来表示
2:队列(queue)--存储消息的邮箱名,存在于RabbitMQ内部,虽然消息流在RabbitMQ和应用程序之间流转,但消息存储的地方只能是队列,队列的绑定使用不受任何限制,它可以存储尽可能多的消息--事实上,它的缓冲大小是不受限制的。
许多生产者可以发送消息被路由到同一个队列,许多消费者也可以从一个队列接受消息,可以使用如下顶部带"queue_name"的图片表示.
3:消费者(consuming)--消费者比较类似接收者的概念,消费者实际上就是一个持续接受消息的程序,可以用带"C"的图片表示
本系列文章均来自官网原文,属于个人翻译,如有雷同,权当个人归档,忽喷.
.NET/C# RabbitMQ 客户端下载地址:
关于RabbitMQ在windows 平台的安装和管理配置请参考:
确保安装成功:
这部分会写两个程序,一个消息生产者发送一个消息;一个消费者接受消息然后输出到控制台,在这个过程中我会忽略一些.Net的细节,把注意力放在这个简单的"hello word" 消息程序上。
在下图中"P" 是我们的生产者,"C" 是我们的消费者,两者之间的中间这是我们的消息队列--一个隐藏在消费者后面的消息缓冲区.
创建一个Send.cs 来写发送程序,发送方会连接RabbitMQ 服务器,发送消息,然后退出.
class Send
public static void Main()
var factory = new ConnectionFactory() { HostName = "192.168.15.128" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
首先需要创建一个连接工厂去连接我们的RabbitMQ服务器,这里我们使用RabbitMQ-dotnet-client 提供的类库来进行回话的创建.
这里的这里的connection 连接已经为我们把socket 连接, 版本协议和认证会话,都已经为我们做了.这里我连接的服务器是"192.168.15.128"(由于我把RabbitMQ的测试环境搭栽了一台虚拟机上,如果是本机可写成"localhost"),直接指定服务器IP即可.
然后我们在这个连接上创建了一次回话(channel),我们所做的大部分Api操作都要基于会话进行。
为了发送消息,我们需要创建一个队列用来存储消息,然后可以把我们的消息发送到该队列上,创建队列代码:
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false,arguments: null);
//queue:队名名
//durable:是否持久化
//exclusive:是否排他
//autoDelete:自动删除
注:创建队列的API是具有幂等性的--即只有当所指定的队列不存在时才会去创建.
然后发送消息:
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",routingKey: "hello", basicProperties: null, body: body);
发送的消息必须是字节数组,我们可以自己指定所需的编码
完整代码如下:
&View Code
运行代码,可以通过客户端管理工具看到结果.
可以看到名字为"hello"的队列被创建,并且有一个消息一经存储在队列当中.
注:正常来说我们的消息是需经过交换机(exchange)进行路由(route)才能到达队列的,这里创建完队列然后直接(没有手动绑定exchange和Queue)发送&routingKey为"hello"的消息到名为""的交换机上之所以成功,是因为当我们创建一个队列的时候,RabbitMQ会自动把我们把新建的队列和RoutingKey为该队列名绑定到一个默认名为""的的交换机上。
接收消息:
RabbitMQ会主动把消息推送给我们的消息接收者,不像消息发送者发送单个消息,我们会让消息接收者持续化的监听消息并且打印出来.
创建一个Receive.cs 来写接收消息的代码
class Receive
private static void Main(string[] args)
var factory = new ConnectionFactory() { HostName = "192.168.15.128" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
这里的初始代码和Send.cs 基本上是一行的,创先连接,创建会话,这里之所以同样进行名为"hello"的队列的创建,是为了防止客户端先启动,找不到求请求的目标队列.
连接服务器后,我们告诉RabbitMQ主动把消息推送给我们,由于RabbitMQ推送消息是异步(asynchronously)进行的,所以我们使用EventingBasicConsumer.Received &来进行消息的接受.
var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =&
var body = ea.B
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
channel.BasicConsume(queue: "hello",noAck: true,consumer: consumer);//noAck(no manual acks):ack的概念:当Consumer接收到消息、处理任务完成之后,会发送带有这个消息标示符的ack,来告诉server这个消息接收到并处理完成.//如果设置为true,这个Consumer在收到消息之后会马上返回ack(由程序自动完成 noAck=true)//设置为 false:需要手动发送,否者RabbitMQ会一直等到处理某个消息的Consumer的链接失去之后,才确定这个消息没有正确处理,从而RabbitMQ重发这个消息
完整代码如下:
&View Code
class Receive
private static void Main(string[] args)
var factory = new ConnectionFactory() { HostName = "192.168.15.128" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =&
var body = ea.B
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
channel.BasicConsume(queue: "hello",
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
运行代码Send:
成功发送。
运行代码Reveive:
消息成功接收.
阅读(...) 评论()
随笔 - 16523
评论 - 1558RabbitMQ能否实现一个消费者监听多个队列的消息,任何一个队列有消息立即读出
RabbitMQ能否实现一个消费者监听多个队列的消息,任何一个队列有消息立即读出
看了帮助,consume都是阻塞监听一个指定队列的,有没有办法监听多个队列呢?
完全可以,要不然玩不转
引用来自“大竹叶青”的答案完全可以,要不然玩不转 请问,哪如何实现呢?
/** &* PHP amqp(RabbitMQ) Demo-2 &*
&yuansir &/yuansir-web.com& &*/ $exchangeName = 'demo'; $queueName = 'task_queue'; $routeKey = 'task_queue';
$connection = new AMQPConnection(array('host' =& '127.0.0.1', 'port' =& '5672', 'vhost' =& '/', 'login' =& 'guest', 'password' =& 'guest')); $connection-&connect() or die(&Cannot connect to the broker!\n&); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange-&setName($exchangeName); $exchange-&setType(AMQP_EX_TYPE_DIRECT); $exchange-&declare(); $queue = new AMQPQueue($channel); //$queue-&setName($queueName); $queue-&setFlags(AMQP_DURABLE); $queue-&declare(); $queue-&bind($exchangeName, $routeKey);
var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { & & & & $queue-&consume('callback'); & & & & $channel-&qos(0,1); } $connection-&disconnect();
function callback($envelope, $queue) { & & & & $msg = $envelope-&getBody(); & & & & var_dump(& [x] Received:& . $msg); & & & & sleep(substr_count($msg,'.')); & & & & $queue-&ack($envelope-&getDeliveryTag()); }
比如说,象上面这个生产者的PHP代码,就阻塞在了一个队列的读取上,怎样同时监听多个队列的消息呢,能说明的详细点吗?
consume的阻塞监听可以设置timeout,通过设置一个较小的timeout,可以轮流监听几个channel。变相实现监听多个queue
引用来自“陆吾科技”的答案 consume的阻塞监听可以设置timeout,通过设置一个较小的timeout,可以轮流监听几个channel。变相实现监听多个queue 这样读取效率会有所降低,咨询了其它人,建议用线程池,多个纯程一起监听处理
--- 共有 2 条评论 ---
: 是的,就是多个消费者监听多个队列
还有个办法就是先取出一个队列的消息数,我后循环的都读出后,转去读另一个队列,所以队列如果都没有消息了,就这样循环等待着
多个线程一起监听,就不算是一个consumer了吧。
我是这么认为的:消费者(consumer)这是个业务层的概念,而消费或者说订阅(也就是 consume)是 AMQP 协议层的东西,所以,你问一个消费者能否订阅多个queue,答案是当然可以。方案也就一种,按照协议的流程分别向不同的 queue 进行 consume。至于是使用多线程方式来处理,还是使用事件驱动的方式(单线程)来处理这就取决于实现了。
引用来自“摩云飞”的答案我是这么认为的:消费者(consumer)这是个业务层的概念,而消费或者说订阅(也就是 consume)是 AMQP 协议层的东西,所以,你问一个消费者能否订阅多个queue,答案是当然可以。方案也就一种,按照协议的流程分别向不同的 queue 进行 consume。至于是使用多线程方式来处理,还是使用事件驱动的方式(单线程)来处理这就取决于实现了。谢谢你的答复,很专业,学习了
但是代码里面,一个listener,只能监听一个queue
有个东西叫文档知道不 不说 直接看代码
$exchangeName = 'demo';
$queueName = 'task_queue';
$routeKey = 'task_queue';
$connection = new AMQPConnection(array('host' =& '127.0.0.1', 'port' =& '5672', 'vhost' =& '/', 'login' =& 'guest', 'password' =& 'guest'));
$connection-&connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange-&setName($exchangeName);
$exchange-&setType(AMQP_EX_TYPE_DIRECT);
$exchange-&declare();
$queue = new AMQPQueue($channel);
$queue-&setName($queueName);
$queue-&setFlags(AMQP_DURABLE);
$queue-&declare();
$queue-&bind($exchangeName, $routeKey);
$queue2 = new AMQPQueue($channel);
$queue2-&setName($queueName.'_2');
$queue2-&setFlags(AMQP_DURABLE);
$queue2-&declare();
$queue2-&bind($exchangeName, $routeKey.'_2');
function do_callback($envelope, $queue) {
&&&&&&& $msg = $envelope-&getBody();
&&&&&&& var_dump(" [x] Received:" . $msg);
&&&&&&& sleep(substr_count($msg,'.'));
&&&&&&& $queue-&ack($envelope-&getDeliveryTag());
var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
&& &$msg1 = $queue-&get();
&& &if (!$msg1){
&& &&& &//queue 没消息了
&& &&& &do_callbak($msg1, $queue)
&& &$msg2 = $queue2-&get();
&& &if (!$msg2){
&& &&& &//queue2 没消息了
&& &&& &do_callbak($msg2, $queue2)
$connection-&disconnect();
我不知道我想的对不对:我使用一个channel声明了多个queues,然后消费者直接消费这些queue,最后消息的消费顺序是与其到达消费端的顺序相同的,哪个queue中的消息先到消费哪个
消费者能不能收到几个消息,不是看它绑定了什么队列,而是看它绑定了什么exchange和routekey。 换句话说,不管你绑定了什么队列,只要exchange和routekey一致的话,你都可以收到消息  我们知道rabbitmq是一个专业的MQ产品,而且它也是一个严格遵守AMQP协议的玩意,但是要想骚,一定需要拿出高可用的东西出来,这不本篇就跟大家说
一下cluster的概念,rabbitmq是erlang写的一个成品,所以知道如何构建erlang的node集群就ok了,他需要一个统一的cookie机制。。。本篇的测试环境如下:
centos1:192.168.23.147
centos2:192.168.23.145
截图如下:
一:cookie机制
& & & 刚才也说了,要想实现cluster集群,必须保证各台机器上的cookie文件内容一致,那问题来了。。。cookie在哪呢?从rabbitmq的官网上可以找到这么
一句话,如下图:
ok,官网说的非常清楚了,那接下来我们看一下$HOME变量指向的是哪里。。。
[root@rabbitmq1 Desktop]# echo $HOME
[root@rabbitmq1 Desktop]#
那接下来我就去看看(Centos1 .147)这台的/root 文件下可否能够找到,如下图:
牛逼了吧,嘿嘿,现在我们要做的事情,就是把Centos2的cookie文件内容替换成Centos1的cookie内容。
二:使用host映射erlang节点
& &现在cookie值是一样的了,然后需要在/etc/hosts中追加一下host影射,方便erlang节点之间相互发现,接下来就是在2台centos上追加同样的host地址:
三:rabbitmqctl cluster命令
& & 好了,准备工作我们都做好了,大家可以重启一下机器,开启我们的rabbitmq,这时候会有惊喜发现的。。。
由原来的localhost改成现在的rabbitmq2了,看到了吧~~~ 接下来大家可以把两台rabbitmq开启了。
1. 在centos1上使用rabbitmqctl cluster_status看看集群现在的状况
[root@rabbitmq1 Desktop]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 ...
[{nodes,[{disc,[rabbit@rabbitmq1]}]},
{running_nodes,[rabbit@rabbitmq1]},
{cluster_name,&&"rabbit@rabbitmq1"&&},
{partitions,[]},
{alarms,[{rabbit@rabbitmq1,[]}]}]
[root@rabbitmq1 Desktop]#
可以看到,当前的running-nodes中只有一台,刚好就是本机的erlang节点本身,接下来我们看一下是否能够连接到rabbit@rabbitmq2上去。。。
2.&join_cluster命令
& & 这个命令之前,需要将本机的rabbitmq关闭,然后进行join操作,从下图中可以看到,我们已经连接到了centos2上的rabbitmq了。。。
[root@rabbitmq1 Desktop]# rabbitmqctl stop_app
Stopping node rabbit@rabbitmq1 ...
[root@rabbitmq1 Desktop]#
rabbitmqctl join_cluster rabbit@rabbitmq2
Clustering node rabbit@rabbitmq1 with rabbit@rabbitmq2 ...
[root@rabbitmq1 Desktop]# rabbitmqctl start_app
Starting node rabbit@rabbitmq1 ...
[root@rabbitmq1 Desktop]#
3. 使用webui看一下最后的效果
看到没有,现在我们的rabbitmq集群已经搭建成功了,如果你有更多的机器,都可以使用这个join命令加入吧,很简单吧~~~
四:mirror queue
& & 从名字上可以看出,就是镜像队列的意思,也就是说queue能在我们多台机器中同步,设置的方式也能简单,只需要在webui的policy上面设置即可。。。
这段设置表示当前如果是mytest开头的队列都是&镜像队列&,当然也可以用代码来实现,并且实现自动同步的功能,如下:
rabbitmqctl set_policy ha-all "^mytest" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
五:使用C#驱动连接
  再好的cluster最后都需要用语言驱动连接,这样才能真正的落地,我选择的驱动是官方的,大家可以在nuget上面下载一下:
接下来我需要演示向 queue=mytest1队列中推送数据,亮点在于我在CreateConnection方法中塞入了多个ip地址。。。如下代码:
class Program
static void Main(string[] args)
ConnectionFactory factory = new ConnectionFactory()
UserName = "datamip",
Password = "datamip",
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true
//第一步:创建connection
var connection = factory.CreateConnection(new string[2] { "192.168.23.147", "192.168.23.145" });
//第二步:创建一个channel
var channel = connection.CreateModel();
var result = channel.QueueDeclare("mytest1", true, false, false, null);
for (int i = 0; i & int.MaxV i++)
channel.BasicPublish(string.Empty, "mytest1", null, new byte[10]);
Console.WriteLine("{0} 推送成功", i);
Thread.Sleep(1000);
Console.Read();
最后我们看一下webui,可以清清楚楚的看到消息已经进入了rabbitmq集群啦。。。
好了,本篇就说这么多了,希望对您有帮助~~~
阅读(...) 评论()

我要回帖

更多关于 rabbitmq 没有消费者 的文章

 

随机推荐