jmeter 测试rabbitmqq官网中的php测试helloworld的代码是否有问题

RabbitMQ与PHP - 推酷
RabbitMQ与PHP
你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)。本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一。
RabbitMQ简介
AMQP ,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。网站在:
上面有各种语言教程和实例代码
AMPQ协议为了能够满足各种消息队列需求,在概念上比较复杂,了解了这些概念,是使用好RabbitMQ的基础。
vhosts : 虚拟主机
虚拟主机( virtual host ):一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢? RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止 A 组访问 B 组的交换机 / 队列 / 绑定,必须为 A 和 B 分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机 “/” 。
一个RabbitMQ的Server上可以有多个vhosts,用户与权限设置就是依附于vhosts。对一般PHP应用,不需要用户权限设定,直接使用默认就存在的”/”就可以了,用户可以使用默认就存在的”guest”。一个简单的配置示例:
$conn_args = array(
'host' =& '127.0.0.1',
'port' =& '5672',
'login' =& 'guest',
'password' =& 'guest',
'vhost'=&'/'
connection 与 channel : 连接与信道
connection是指物理的连接,一个client与一个server之间有一个连接;一个连接上可以建立多个channel,可以理解为逻辑上的连接。一般应用的情况下,有一个channel就够用了,不需要创建更多的channel。示例代码:
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn-&connect()) {
die(&Cannot connect to the broker!\n&);
$channel = new AMQPChannel($conn);
Exchange 与 routingkey : 交换机 与 路由键
为了将不同类型的message进行区分,设置了Exchange交换机与Route路由两个概念。比如,将A类型的message发送到名为‘C1’的交换机,将类型为B的发送到’C2′的交换机。当客户端连接C1处理队列消息时,取到的就只是A类型message。进一步的,如果A类型message也非常多,需要进一步细化区分,比如某个客户端只处理A类型message中针对K用户的message,routingkey就是来做这个用途的。
$e_name = 'e_linvo'; //交换机名
$k_route = array(0=& 'key_1', 1=& 'key_2'); //路由key
//创建交换机
$ex = new AMQPExchange($channel);
$ex-&setName($e_name);
$ex-&setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex-&setFlags(AMQP_DURABLE); //持久化
echo &Exchange Status:&.$ex-&declare().&\n&;
for($i=0; $i&5; ++$i){
echo &Send Message:&.$ex-&publish($message . date('H:i:s'), $k_route[i%2]).&\n&;
由以上代码可以看到,发送消息时,只要有“交换机”就够了。至于交换机后面有没有对应的处理队列,发送方是不用管的。routingkey可以是空的字符串。在示例中,我使用了两个key交替发送消息,是为了下面更便于理解routingkey的作用。
对于交换机,有两个重要的概念:
交换机( Exchange ):可以理解成具有路由表的路由程序。每个消息都有一个路由键( routing key ),就是一个简单的字符串。交换机中有一系列的绑定( binding ),即路由规则( routes )。交换机可以有多个。多个队列可以和同一个交换机绑定,同时多个交换机也可以和同一个队列绑定。(多对多的关系)
A,类型。有三种类型:
1. Fanout Exchange (不处理路由键):一个发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。 Fanout 交换机发消息是最快的。
2. Direct Exchange (处理路由键):如果一个队列绑定到该交换机上,并且当前要求路由键为 X ,只有路由键是 X 的消息才会被这个队列转发。
3. Topic Exchange (将路由键和某模式进行匹配,可以理解成模糊处理):路由键的词由 “.” 隔开,符号 “#” 表示匹配 0 个或多个词,符号 “*” 表示匹配不多不少一个词。
类型总结:Fanout类型最简单,这种模型忽略routingkey;Direct类型是使用最多的,使用确定的routingkey。这种模型下,接收消息时绑定’key_1′则只接收key_1的消息;最后一种是Topic,这种模式与Direct类似,但是支持通配符进行匹配,比如: ‘key_*’,就会接受key_1和key_2。Topic貌似美好,但是有可能导致不严谨,所以还是推荐使用Direct。
B,持久化。指定了持久化的交换机,在重新启动时才能重建,否则需要客户端重新声明生成才行。
需要特别明确的概念:交换机的持久化,并不等于消息的持久化。只有在持久化队列中的消息,才能持久化;如果没有队列,消息是没有地方存储的;消息本身在投递时也有一个持久化标志的,PHP中默认投递到持久化交换机就是持久的消息,不用特别指定。
4,queue: 队列
讲了这么多,才讲到队列呀。事实上,队列仅是针对接收方(consumer)的,由接收方根据需求创建的。只有队列创建了,交换机才会将新接受到的消息送到队列中,交换机是不会在队列创建之前的消息放进来的。换句话说,在建立队列之前,发出的所有消息都被丢弃了。下面这个图比RabbitMQ官方的图更清楚——Queue是属于ReceiveMessage的一部分。
接下来看一下创建队列及接收消息的示例:
$e_name = 'e_linvo'; //交换机名
$q_name = 'q_linvo'; //队列名
$k_route = ''; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn-&connect()) {
die(&Cannot connect to the broker!\n&);
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
$ex-&setName($e_name);
$ex-&setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex-&setFlags(AMQP_DURABLE); //持久化
echo &Exchange Status:&.$ex-&declare().&\n&;
//创建队列
$q = new AMQPQueue($channel);
$q-&setName($q_name);
$q-&setFlags(AMQP_DURABLE); //持久化
//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q-&bind($e_name, $k_route).&\n&;
//阻塞模式接收消息
echo &Message:\n&;
$q-&consume('processMessage', AMQP_AUTOACK); //自动ACK应答
$conn-&disconnect();
* 消费回调函数
* 处理消息
function processMessage($envelope, $queue) {
var_dump($envelope-&getRoutingKey);
$msg = $envelope-&getBody();
echo $msg.&\n&; //处理消息
从上述示例中可以看到,交换机既可以由消息发送端创建,也可以由消息消费者创建。
创建一个队列(line:20)后,需要将队列绑定到交换机上(line:25)队列才能工作,routingkey也是在这里指定的。有的资料上写成bindingkey,其实一回事儿,弄两个名词反倒容易混淆。
消息的处理,是有两种方式:
A,一次性。用 $q-&get([...]),不管取到取不到消息都会立即返回,一般情况下使用轮询处理消息队列就要用这种方式;
B,阻塞。用 $q-&consum( callback, [...] ) 程序会进入持续侦听状态,每收到一个消息就会调用callback指定的函数一次,直到某个callback函数返回FALSE才结束。
关于callback,这里多说几句: PHP的call_back是支持使用数组的,比如: $c = new MyClass(); $c-&counter = 100; $q-&consume( array($c,’myfunc’) ) 这样就可以调用自己写的处理类。MyClass中myfunc的参数定义,与上例中processMessage一样就行。
在上述示例中,使用的$routingkey = ”, 意味着接收全部的消息。我们可以将其改为 $routingkey = ‘key_1′,可以看到结果中仅有设置routingkey为key_1的内容了。
注意: routingkey = ‘key_1′ 与 routingkey = ‘key_2′ 是两个不同的队列。假设: client1 与 client2 都连接到 key_1 的队列上,一个消息被client1处理之后,就不会被client2处理。而 routingkey = ” 是另类,client_all绑定到 ” 上,将消息全都处理后,client1和client2上也就没有消息了。
在程序设计上,需要规划好exchange的名称,以及如何使用key区分开不同类型的标记,在消息产生的地方插入发送消息代码。后端处理,可以针对每一个key启动一个或多个client,以提高消息处理的实时性。如何使用PHP进行多线程的消息处理,将在下一节中讲述。
更多消息模型,可以参考:
安装erlang依赖的基本环境
#操作系统:CentOS release 6.2
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel java-devel
Erlang安装方式一:源码编译
wget http://www.erlang.org/download/otp_src_R16B03.tar.
tar -zxvf otp_src_R16B03.tar.
cd otp_src_R16B03;
./configure --prefix=/usr/local/erlang --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-#不用java编译,故去掉java避免错误
配置erlang环境
#vi /etc/profile
在文件最后加入:
PATH=$PATH:/usr/local/erlang/bin
export PATH
#source /etc/profile
Erlang安装方式二:YUM安装
安装erlang的YUM源
#自动安装erlang的YUM源
wget http://packages./erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
#或手动安装YUM源
rpm --import http://packages./rpm/erlang_solutions.asc
Add the following lines to some file in /etc/yum.repos.d/:
[erlang-solutions]
name=Centos $releasever - $basearch - Erlang Solutions
baseurl=http://packages./rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=http://packages./rpm/erlang_solutions.asc
yum erlang
安装成功检测
安装完后输入“erl”以下提示即为安装成功:
[root@localhost ~]# erl
Erlang/OTP 18 [erts-7.2] [source-e6dd627] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V7.2
(abort with ^G)
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致javaweb(8)
背景:项目中一个场景需要用java端的处理代码获取php端放到rabbitmq内的消息,然后做相应业务的处理。
前提:rabbitmq服务器已经搭建好,php端的消息发布正常运行。
首先:下载rabbitmq-client对应的java版jar包(spring好像有相应的支持)
开始代码coding的工作,上代码
package com.
import com.rabbitmq.client.C
import com.rabbitmq.client.C
import com.rabbitmq.client.ConnectionF
import com.rabbitmq.client.GetR
public class RabbitMqControll {
* 读取RabbitMq中的存储信息
* @param queue_name 队列名
* @param exchange_name 交换机名
* @param route_key 绑定用到的route_key
* @param durable 是否持久化
public void readRabbitMqInfo (String queue_name,
String exchange_name, String route_key, boolean durable)
ConnectionFactory factory = new ConnectionFactory();
// 设置服务器ip
factory.setHost(&172.18.107.66&);
// 设置rabbitmq服务器运行的端口
factory.setPort(5672);
// 设置rabbitmq服务器连接用户
factory.setUsername(&guest&);
// 设置rabbitmq服务器连接用户密码
factory.setPassword(&guest&);
// 设置rabbitmq服务器节点目录(个人理解)
factory.setVirtualHost(&/&);
// 创建工厂连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明交换机(设置相关属性时需要和php端的一致)
channel.exchangeDeclare(exchange_name, &direct&, durable);
// 声明消息队列(设置相关属性时需要和php端的一致)
channel.queueDeclare(queue_name, durable, false, true, null);
// 绑定消息队列(设置相关属性时需要和php端的一致)
channel.queueBind(queue_name, exchange_name, route_key);
System.out.println(& [*] Waiting for messages. To exit press CTRL+C&);
// basicConsume消费模式
/*channel.basicQos(1);//消息分发处理
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue_name, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(& [x] Received '& + message + &'&);
// 提交消息处理完成回复
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// basicGet消费模式
while (true)
// get方式主动消费
GetResponse res=channel.basicGet(queue_name, false);
if (res != null && res.getMessageCount() &= 0)
System.out.println(res.getMessageCount());
String message = &&;
message = new String(res.getBody());
channel.basicAck(res.getEnvelope().getDeliveryTag(), false);
System.out.println(& [x] Received '& + message + &'&);
System.out.println(&消息队列中没有可消费的信息!&);
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}在开发的过程中,主要报的异常是:
1.创建交换机和消息队列时,设置的属性和消息产生端的php代码设置的不一样,导致不匹配和一直重写属性
2.在调用时一直没有确定到底是用basicConsume的消费模式还是basicGet消费模式(前者带有监控效果,后者没有,不知道是不是因为一者有跳出while循环,一者没有的原因)
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:82033次
积分:1863
积分:1863
排名:第18754名
原创:105篇
评论:10条
(7)(2)(2)(3)(7)(4)(5)(10)(6)(8)(5)(12)(1)(4)(13)(13)(7)(4)(2)The page is temporarily unavailable
nginx error!
The page you are looking for is temporarily unavailable.
Please try again later.
Website Administrator
Something has triggered an error on your
This is the default error page for
nginx that is distributed with
It is located
/usr/share/nginx/html/50x.html
You should customize this error page for your own
site or edit the error_page directive in
the nginx configuration file
/etc/nginx/nginx.conf.

我要回帖

更多关于 rabbitmq集群测试 的文章

 

随机推荐