java rabitmq 环境搭建channel 什么时候关闭

Producer.java
import java.io.IOE
import java.util.concurrent.TimeoutE
import com.rabbitmq.client.C
import com.rabbitmq.client.C
import com.rabbitmq.client.ConnectionF
public class Producer {
public final static String QUEUE_NAME="rabbitMQ_test2";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("100.51.15.10");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息到队列中
String message = "Hello RabbitMQ";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
Consumer.java
import java.io.IOE
import java.util.concurrent.TimeoutE
import com.rabbitmq.client.C
import com.rabbitmq.client.C
import com.rabbitmq.client.ConnectionF
import com.rabbitmq.client.C
import com.rabbitmq.client.DefaultC
import com.rabbitmq.client.E
import com.rabbitmq.client.AMQP;
public class Customer {
private final static String QUEUE_NAME = "rabbitMQ_test2";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("100.51.15.10");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Customer Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
Producer.java
Producer Send +'Hello RabbitMQ'
Producer Send +'Hello RabbitMQ'
Consumer.java
Customer Received 'Hello RabbitMQ'
Customer Received 'Hello RabbitMQ'
首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者,连接队列的代码都是一样的,这样可以通用一些。
EndPoint.java
//package co.syntx.examples.
import java.io.IOE
import java.util.concurrent.TimeoutE
import com.rabbitmq.client.C
import com.rabbitmq.client.C
import com.rabbitmq.client.ConnectionF
* Represents a connection with a queue
* @author syntx
public abstract class EndPoint{
protected C
protected C
protected String endPointN
public EndPoint(String endpointName) throws IOException{
this.endPointName = endpointN
//Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
//hostname of your rabbitmq server
factory.setHost("100.51.15.10");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
//getting a connection
connection = factory.newConnection();
}catch (TimeoutException ex) {
System.out.println(ex);
connection = null;
//creating a channel
channel = connection.createChannel();
//declaring a queue for this channel. If queue does not exist,
//it will be created on the server.
channel.queueDeclare(endpointName, false, false, false, null);
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
* @throws IOException
public void close() throws IOException{
this.channel.close();
} catch (TimeoutException ex){
System.out.println("ex" + ex);
this.connection.close();
Producer2.java
import java.io.IOE
import java.io.S
import mons.lang.SerializationU
public class Producer2 extends EndPoint{
public Producer2(String endPointName) throws IOException{
super(endPointName);
public void sendMessage(Serializable object) throws IOException {
channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
QueueConsumer.java
import java.io.IOE
import java.util.HashM
import java.util.M
import mons.lang.SerializationU
import com.rabbitmq.client.AMQP.BasicP
import com.rabbitmq.client.C
import com.rabbitmq.client.E
import com.rabbitmq.client.ShutdownSignalE
public class QueueConsumer extends EndPoint implements Runnable, Consumer{
public QueueConsumer(String endPointName) throws IOException{
super(endPointName);
public void run() {
//start consuming messages. Auto acknowledge messages.
channel.basicConsume(endPointName, true,this);
} catch (IOException e) {
e.printStackTrace();
* Called when consumer is registered.
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer "+consumerTag +" registered");
* Called when new message is available.
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+ map.get("message number") + " received.");
public void handleCancel(String consumerTag) {}
public void handleCancelOk(String consumerTag) {}
public void handleRecoverOk(String consumerTag) {}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
import java.io.IOE
import java.sql.SQLE
import java.util.HashM
public class Main {
public Main() throws Exception{
QueueConsumer consumer = new QueueConsumer("queue");
Thread consumerThread = new Thread(consumer);
consumerThread.start();
Producer2 producer = new Producer2("queue");
for (int i = 0; i & 5; i++) {
HashMap message = new HashMap();
message.put("message number", i);
producer.sendMessage(message);
System.out.println("Message Number "+ i +" sent.");
public static void main(String[] args) throws Exception{
new Main();
System.out.println("##############end...");
阅读(...) 评论()博客分类:
工作队列:为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
消费者1输出
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hi hi. hi.. hi...1'
[x] Received 'hi hi. hi.. hi...3'
[x] Received 'hi hi. hi.. hi...5'
[x] Received 'hi hi. hi.. hi...7'
[x] Received 'hi hi. hi.. hi...9'
消费者2输出
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hi hi. hi.. hi...0'
[x] Received 'hi hi. hi.. hi...2'
[x] Received 'hi hi. hi.. hi...4'
[x] Received 'hi hi. hi.. hi...6'
[x] Received 'hi hi. hi.. hi...8'
默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。试着添加三个或更多得工作者(workers)。
生产者代码
package com.duowan.rabbit.
import java.io.IOE
import com.rabbitmq.client.C
import com.rabbitmq.client.C
import com.rabbitmq.client.ConnectionF
public class MultiMQClient {
private final static String QUEUE_NAME="hello";
public static void main( String[] args ) throws IOException
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
args = new String[]{"hi","hi.","hi..","hi..."};
for (int i = 0; i & 10; i++) {
String message = getMessage(args)+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
private static String getMessage(String[] strings){
if (strings.length & 1)
return "Hello World!";
return joinStrings(strings, " ");
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i & i++) {
words.append(delimiter).append(strings[i]);
return words.toString();
消费者代码
package com.duowan.rabbit.
import java.io.IOE
import com.rabbitmq.client.C
import com.rabbitmq.client.C
import com.rabbitmq.client.ConnectionF
import com.rabbitmq.client.ConsumerCancelledE
import com.rabbitmq.client.QueueingC
import com.rabbitmq.client.ShutdownSignalE
public class MultiMQServer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck =
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
//确认消息已经收到
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");
private static void doWork(String message) throws InterruptedException {
for (char ch : message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。
为了防止消息丢失,RabbitMQ提供了消息[i]响应(acknowledgments)[/i]。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。
设置消息响应
boolean autoAck =
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
运行两个消费者进程,运行生产者,关闭其中一个消费者进程,输出如下:
[*] Waiting for messages. To exit press CTRL+C
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hi hi. hi.. hi...0'
[x] Received 'hi hi. hi.. hi...2'
运行到此,kill掉进程
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hi hi. hi.. hi...1'
[x] Received 'hi hi. hi.. hi...3'
[x] Received 'hi hi. hi.. hi...5'
[x] Received 'hi hi. hi.. hi...7'
[x] Received 'hi hi. hi.. hi...9'
[x] Received 'hi hi. hi.. hi...0'
[x] Received 'hi hi. hi.. hi...2'
[x] Received 'hi hi. hi.. hi...4'
[x] Received 'hi hi. hi.. hi...6'
[x] Received 'hi hi. hi.. hi...8'
运行上面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
消息持久化
那么在它退出或者崩溃的时候,它将会流失所有的队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
boolean durable =
channel.queueDeclare("hello", durable , false, false, null);
尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫hello的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue
boolean durable =
channel.queueDeclare("task_queue", durable , false, false, null);
这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。
这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。另外,我们需要把我们的消息也要设为持久化——将delivery_mode的属性设为PERSISTENT_TEXT_PLAIN。
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务(transaction)。
以上代码,rabbitmq没有按照我们期望的那样进行分发。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。
这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。
我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
channel.basicPublish("", "task_queue",
int prefetchCount = 1;
channel.basicQos(prefetchCount);
浏览: 302530 次
来自: 深圳
哥们你帮我大忙了,谢啦。一直在尝试使用内嵌的disco ...
yugouai 写道wxcking 写道请问,Windows下 ...
wxcking 写道请问,Windows下怎么配置呢?Data ...
请问,Windows下怎么配置呢?
总结的非常好,每次都来看RabbitMQ远程不能访问问题的解决_服务器应用_Linux公社-Linux系统门户网站
你好,游客
RabbitMQ远程不能访问问题的解决
来源:Linux社区&
作者:dwf07223
刚刚安装的RabbitMQ-Server-3.3.5,并且也已经开启了Web管理功能,但是现在存在一个问题:
出于安全的考虑,guest这个默认的用户只能通过http://localhost:15672 来登录,不能使用IP地址登录,也就是不能远程访问,这对于服务器上没有安装桌面的情况是无法管理维护的。
要解决这个问题需要配置远程登录权限,这里通过配置文件来实现远程访问。
5.6 安装RabbitMQ
RabbitMQ客户端C++安装详细记录
用Python尝试RabbitMQ
RabbitMQ集群环境生产实例部署
下PHP + RabbitMQ使用
在CentOS上安装RabbitMQ流程
这里主要介绍Unix和Windows的配置文件修改。
一、Windows
& & Windows环境下默认配置文件为目录/%RabbitMQ Server%/rabbitmq_server-3.3.5/etc下的rabbitmq.config.example文件,我们可以直接在这个文件中修改(可以不用去设置环境变量了),也可以自己再新建一个rabbitmq.config文件,然后把这个文件路径配置到环境变量中,这里介绍就直接修改rabbitmq.config.example文件。
& & 注意:修改之前,需要先停止RabbitMQ服务!!不然是不能保存的!!!
配置之前需要先添加用户,用于外网的访问,可以使用命令行来实现添加用户,需要在RabbitMQ的安装目录sbin目录下执行:
duanwf@master:/opt/rabbitmq_server-3.3.5/sbin$ rabbitmqctl add_user admin admin
& & 也可以通过web管理页面来添加用户和密码,使用guest登录web管理页面http://localhost:15672,进入&admin&标签页,然后点击&Add a user &,输入对用的帐号密码,然后选择用户角色(一定要选择):
& & 为了授权该用户对VirtualHost"/" 的访问,用户添加之后,需要对该用户进行授权,不然运行会出现错误:
1 Caused by: com.rabbitmq.client.ShutdownSignalException: reason: {#method&channel.close&(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'hello' in vhost '/' refused for user 'admin', class-id=50, method-id=10), null, ""}
详细错误日志为:
&java.io.IOException&at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)&at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)&at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)&at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:766)&at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)&at com.asiainfo.mq.rabbitmq.rabbitmqtest.SendTest.main(SendTest.java:29)Caused by: com.rabbitmq.client.ShutdownSignalException: reason: {#method&channel.close&(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'hello' in vhost '/' refused for user 'admin', class-id=50, method-id=10), null, ""}&at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)&at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)&at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)&at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)&at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)&... 3 moreCaused by: com.rabbitmq.client.ShutdownSignalException: reason: {#method&channel.close&(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'hello' in vhost '/' refused for user 'admin', class-id=50, method-id=10), null, ""}&at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:473)&at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:313)&at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)&at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)&at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
& 操作过程为:在Admin标签页下点击新增的用户"admin",进入授权页面,默认直接点击"set permission"即可:
用户以及授权添加完成之后,在rabbitmq.config.example文件中,添加以下内容,保存后重启RabbitMQ服务:
&&[&{rabbit,& [%%& %% Network Connectivity& %% ====================& %%& %% By default, RabbitMQ will listen on all interfaces, using& %% the standard (reserved) AMQP port.& %%& {tcp_listeners, [5672]},& {loopback_users, ["admin"]},&&& ]}].
在浏览器中输入http://192.168.0.124:15672实现通过IP地址访问,成功登录:
测试用例见博文&RabbitMQ远程调用测试用例 &
更多详情见请继续阅读下一页的精彩内容:
相关资讯 & & &
& (12/26/:47)
& (11/07/:42)
& (02月24日)
& (12/10/:52)
& (11/04/:46)
   同意评论声明
   发表
尊重网上道德,遵守中华人民共和国的各项有关法律法规
承担一切因您的行为而直接或间接导致的民事或刑事法律责任
本站管理人员有权保留或删除其管辖留言中的任意内容
本站有权在网站内转载或引用您的评论
参与本评论即表明您已经阅读并接受上述条款关于断开 Rabbit MQ 连接的问题 - ITeye问答
while (falg)
("find msg of: " + acid);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String text = new String(delivery.getBody());
// 解析消息
String from = "";
String to = "";
。。。。。。。。
当我试着去关闭 channel 通道的时候。。报以下错误:
com.rabbitmq.client.ShutdownSignalException: cl reason: #method&channel.close&(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:198)
at com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:214)
at com.handbb.dating.mq.MQMsgMrg.recmsg(MQMsgMrg.java:251)
at com.handbb.dating.action.UserAction$1.run(UserAction.java:156)
Caused by: com.rabbitmq.client.ShutdownSignalException: cl reason: #method&channel.close&(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:521)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:480)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:473)
at com.handbb.dating.mq.MQMsgMrg.closeMq(MQMsgMrg.java:519)
at com.handbb.dating.action.UserAction.userlogout(UserAction.java:194)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:585)
at com.opensymphony.xwork2.DefaultActionInvocation.invokeAction(DefaultActionInvocation.java:452)
at com.opensymphony.xwork2.DefaultActionInvocation.invokeActionOnly(DefaultActionInvocation.java:291)
at com.opensymphony.xwork2.DefaultActionInvocation.invoke(DefaultActionInvocation.java:254)
原因可能能为我还在不停的从队列中去接受消息,现在去关闭channel的时候。发生异常。。
现在问题是。我要怎么去合理的关闭channel,不会发生异常,或者理解为。怎么样去更好的接受消息??
求好心人指点。。谢谢。。。
请问您这问题解决了吗,我也遇到了和您一样的问题却不知如何解决
已解决问题
未解决问题&前言:在这里我将用java来简单的实现rabbitMQ。下面我们带着下面问题来一步步的了解和学习rabbitMQ。
1:如果消费者连接中断,这期间我们应该怎么办
2:如何做到负载均衡
3:如何有效的将数据发送到相关的接收者?就是怎么样过滤
4:如何保证消费者收到完整正确的数据
5:如何让优先级高的接收者先收到数据
一:"Hello RabbitMQ"
下面有一幅图,其中P表示生产者,C表示消费者,红色部分为消息队列
&二:项目开始
2.1:首先引入rabbitMQ jar包
&dependency&
&groupId&com.rabbitmq&/groupId&
&artifactId&amqp-client&/artifactId&
&version&3.6.5&/version&
&/dependency&
2.2:创建消费者Producer
* 消息生成者
public class Producer {
public final static String QUEUE_NAME="rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");
//factory.setUsername("lp");
//factory.setPassword("");
// factory.setPort(2088);
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ";
//发送消息到队列中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Send +'" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
2.3:创建消费者
public class Customer {
private final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
System.out.println("Customer Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
前面代码我们可以看出和生成者一样的,后面的是获取生产者发送的信息,其中envelope主要存放生产者相关信息(比如交换机、路由key等)body是消息实体。
2.4:运行结果
&三:实现任务分发
一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这里就要采用分布机制了。
我们新创建一个生产者NewTask
public class NewTask {
private static final String TASK_QUEUE_NAME="task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
//分发信息
for (int i=0;i&10;i++){
String message="Hello RabbitMQ"+i;
channel.basicPublish("",TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("NewTask send '"+message+"'");
channel.close();
connection.close();
然后创建2个工作者Work1和Work2代码一样
public class Work1 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("Worker1
Waiting for messages");
//每次从队列获取的数量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker1
Received '" + message + "'");
new Exception();
//doWork(message);
}catch (Exception e){
channel.abort();
}finally {
System.out.println("Worker1 Done");
channel.basicAck(envelope.getDeliveryTag(),false);
boolean autoAck=false;
//消息消费完成确认
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
private static void doWork(String task) {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
注:channel.basicQos(1);autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生成者。最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。
关于上面我们遗留问题在下一篇继续讲解
阅读(...) 评论()

我要回帖

更多关于 rabitmq 的文章

 

随机推荐