apache mq 网络公司中转

是目前最流行功能最强大的开源消息和服务。 Apache ActiveMQ不仅速度快而且支持众多的,同时拥有非常易用的以及支持和J2EE1.4等众多。Apache ActiveMQ基于发行。
1.支持像Java、C、C++、C#、Ruby、Perl、Python和PHP等大量跨语言平台和协议。
2.支持Java、C、C++、C#中的高性能客户端。
支持:这样就能像其他流行的消息代理一样很容易的用C、Ruby、Perl、Python、PHP、ActionScript(Flash)、Smalltalk编写能与ActiveMQ通信的客户端了。
同时支持JMS客户端和消息代理两种企业集成模式。
3.支持众多高级特性,比如:、、、。
4.全面支持JMS 1.1和J2EE 1.4的瞬时消息、持久化消息、传统消息和XA消息的功能。
5.支持Spring框架:ActiveMQ可以轻易地嵌入到Spring应用中并采用Spring的XML配置机制来对ActiveMQ进行配置。
6.通过了TomEE、Geronimo、JBoss、GlassFish和WebLogic等流行的J2EE服务器的测试。
内置了以支持内外双向的消息传递。这样ActiveMQ就可以被自动部署到任何一个兼容J2EE1.4的服务器上。
支持可插拔:例如、TCP、SSL、NIO、UDP、multicast、JGroups和JXTA传输协议。
搭配高性能日志和JDBC能实现快速的。
7.为高性能集群、客户端-服务器端、点对点通信而设计。
8.API提供了技术、语言无关的web API 用来消息传递。
9.支持:支持web流从而让web浏览器能够使用纯DHTML并将浏览器作为了消息通信结构的一部分。
10.支持:因此ActiveMQ可以很容易的在这些web service项目中提供可靠的消息传递服务。
11.可以被用作存活在内存中的JMS提供者,是的理想实现。
资源整理者简介:
可能感兴趣的文章
按分类快速查找
关于资源导航
伯乐在线资源导航收录优秀的工具资源。内容覆盖开发、设计、产品和管理等IT互联网行业相关的领域。目前已经收录 1314 项工具资源。
关于资源导航
伯乐在线资源导航收录优秀的工具资源。内容覆盖开发、设计、产品和管理等IT互联网行业相关的领域。
新浪微博:
推荐微信号
(加好友请注明来意)
- 好的话题、有启发的回复、值得信赖的圈子
- 分享和发现有价值的内容与观点
- 为IT单身男女服务的征婚传播平台
- 优秀的工具资源导航
- 翻译传播优秀的外文文章
- 国内外的精选博客文章
- UI,网页,交互和用户体验
- 专注iOS技术分享
- 专注Android技术分享
- JavaScript, HTML5, CSS
- 专注Java技术分享
- 专注Python技术分享
& 2016 伯乐在线4028人阅读
ActiveMQ(8)
前面文章《 》在最后有提到一个场景,就是当AMQ的节点数大于2个的时候(HA + LB),且配置了消息回流的情况下的一些问题。
HA + LB的基本结构如下图:
问题即发生在当生产者将消息投递到Master节点后(AMQ SERVER),消费者与A节点建立连接(Broker),根据AMQ的“预先消费”策略预先消费了一定数量的消息,即A节点消费了Master节点的一部分消息,A节点在将消息转发至消费者Consumer。
消费者在消费过程中,A节点意外宕机,消费者根据failover机制会自动连接至B或C节点,想继续消费剩余的消息(A节点未消费完成的消息)。
那么我们可以按如下的配置方式即可解决该场景下的问题:
xmlns=&http://www.springframework.org/schema/beans&
xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance&
xsi:schemaLocation=&http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd&&
&!-- Allows us to use system properties as variables in this configuration file --&
&bean class=&org.springframework.beans.factory.config.PropertyPlaceholderConfigurer&&
&property name=&locations&&
&value&file:${activemq.conf}/credentials.properties&/value&
&/property&
The &broker& element is used to configure the ActiveMQ broker.
&broker xmlns=&http://activemq.apache.org/schema/core& brokerName=&broker129& dataDirectory=&${activemq.data}&&
&!-- Destination specific policies using destination names or wildcards --&
&destinationPolicy&
&policyMap&
&policyEntries&
&policyEntry topic=&&&&
&/policyEntry&
&policyEntry queue=&&& enableAudit=&false&&
&deadLetterStrategy&
&individualDeadLetterStrategy queuePrefix=&DLQ.& useQueueForQueueMessages=&true& /&
&/deadLetterStrategy&
&networkBridgeFilterFactory&
&conditionalNetworkBridgeFilterFactory replayWhenNoConsumers=&true&/&
&/networkBridgeFilterFactory&
&/policyEntry&
&/policyEntries&
&/policyMap&
&/destinationPolicy&
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
&managementContext&
&managementContext createConnector=&false&/&
&/managementContext&
duplex Broker双工模式,即:Broker可以是消费者也可以是发布者。
该参数同“消息回流”(replayWhenNoConsumers)不同,注意区分。
&!-- networkTTL 即:信息和订阅在网络可以通过的Broker数量。该参数需要根据LB数量合理设置 --&
&networkConnectors&
&networkConnector duplex=&true& networkTTL=&3&
uri=&static:(tcp://192.168.137.200:61616)&/&
&/networkConnectors&
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
&persistenceAdapter&
&kahaDB directory=&${activemq.data}/kahadb&/&
&/persistenceAdapter&
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
&systemUsage&
&systemUsage&
&memoryUsage&
&memoryUsage percentOfJvmHeap=&70& /&
&/memoryUsage&
&storeUsage&
&storeUsage limit=&100 gb&/&
&/storeUsage&
&tempUsage&
&tempUsage limit=&50 gb&/&
&/tempUsage&
&/systemUsage&
&/systemUsage&
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
&transportConnectors&
&!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --&
&transportConnector name=&openwire& uri=&nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=&/&
&/transportConnectors&
&!-- destroy the spring context on shutdown to stop jetty --&
&shutdownHooks&
&bean xmlns=&http://www.springframework.org/schema/beans& class=&org.apache.activemq.hooks.SpringContextHook& /&
&/shutdownHooks&
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
&import resource=&jetty.xml&/&
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:226765次
积分:3299
积分:3299
排名:第7088名
原创:104篇
转载:32篇
评论:32条
(2)(1)(5)(4)(2)(2)(4)(5)(2)(2)(4)(6)(2)(1)(11)(1)(1)(1)(7)(4)(2)(2)(1)(2)(20)(1)(1)(6)(6)(4)(8)(15)架构设计:系统间通信(21)――ActiveMQ的安装与使用浅析
之前我们通过两篇文章(架构设计:系统间通信(19)&&MQ:消息协议(上)、架构设计:系统间通信(20)&&MQ:消息协议(下))从理论层面上为大家介绍了消息协议的基本定义,并花了较大篇幅向读者介绍了三种典型的消息协议:XMPP协议、Stomp协议和AMQP协议。本小节开始,我们基于之前的知识点讲解这些协议在具体的&消息队列中间件&中是如何被我们操作的。由于本人在实际工作中经常使用ActiveMQ和RabbitMQ,所以就选取这两个&消息队列中间件&进行讲解。如果读者可以补充其他&消息队列中间件&的使用,那当然是再好不过了。
2、ActiveMQ的安装和使用
ActiveMQ是Apache软件基金会的开源产品,支持AMQP协议、MQTT协议(和XMPP协议作用类似)、Openwire协议和Stomp协议等多种消息协议。并且ActiveMQ完整支持JMS API接口规范(当然Apache也提供多种其他语言的客户端,例如:C、C++、C#、Ruby、Perl)。
2-1、ActiveMQ的安装
在本文发布之时,ActiveMQ最新的版本号是5.13.2(版本号升级很快,不过并不推荐使用最新的版本)。由ActiveMQ的安装是很简单,所以这个过程并不值得我们花很大篇幅进行讨论。具体的过程就是:下载-&解压-&配置环境变量-&运行:
您可以Apache ActiveMQ的官网下载安装包:。这里我们示例在CentOS下的安装过程,所以下载Linux下的压缩包即可()。
将下载的安装包放置在root用户的home目录内,解压即可(当然您可以根据自己的需要加压到不同的文件路径下)。如下所示:
[root@localhost ~]# tar -zxvf ./apache-activemq-5.13.2-bin.tar.gz
以上解压使用的是root用户,这是为了演示方便。正式环境中还是建议禁用root用户,为activeMQ的运行专门创建一个用户和用户组。
配置环境变量(不是必须的)
如果您只是在测试环境使用Apache ActiveMQ,以便熟悉消息中间件本身的特性和使用方式。那么您无需对解压后的软件进行任何配置,所有可运行的命令都在软件安装目录的./bin目录下。为了使用方便,最好配置一下环境变量,如下所示(注意,根据您自己的软件安装位置,环境变量的设置是不一样的,请不要盲目粘贴复制):
设置该次会话的环境变量:
[root@localhost ~]# export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;
永久设置环境变量:
[root@localhost ~]# echo &export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;& && /etc/profile
在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 针对操作系统进行了更深入的优化,所以您可以看到./bin目录下,有一个针对32位Linux运行命令的./linux-x86-32目录,和针对64位Linux运行命令的./linux-x86-64目录。请按照您自己的情况进行环境变量设置和命令运行。
现在您可以在任何目录,运行activemq命令了。注意activemq命令一共有6个参数(console | start | stop | restart | status | dump),启动Apache ActiveMQ使用的命令是activemq start:
[root@localhost ~]# activemq start
如果启动成功,就可以在浏览器上访问服务节点在8161端口的管理页面了(例如):
点击&manage ActiveMQ broker&连接,可以进入管理主界面(默认的用户和密码都是admin)。以上就是Apache ActiveMQ消息中间件最简的安装和运行方式。在后续的文章中,我们会陆续讨论ActiveMQ的集群和高性能优化,那时会介绍对应的ActiveMQ的配置问题。
2-2、ActiveMQ的其他命令参数
如同上文讲到的,activemq命令除了start参数用于启动activemq程序以外,还有另外5个参数可以使用:console | stop | restart | status | dump。他们代表的使用意义是:
stop:停止当前ActiveMQ节点的运行。
restart:重新启动当前ActiveMQ节点。
status:查看当前ActiveMQ节点的运行状态。如果当前ActiveMQ节点没有运行,那么将返回&ActiveMQ Broker is not running&的提示信息。注意,status命令只能告诉开发人员当前节点时停止的还是运行的,除此之外不能从status命令获取更多的信息。例如,ActiveMQ为什么创建Queue失败?当前ActiveMQ使用了多少内存?而要获取这些信息,需要使用以下参数启动ActiveMQ节点。
console:使用控制台模式启动ActiveMQ节点;在这种模式下,开发人员可以调试、监控当前ActivieMQ节点的实时情况,并获取实时状态。
dump:如果您采用console模式运行ActiveMQ,那么就可以使用dump参数,在console控制台上获取当前ActiveMQ节点的线程状态快照。
2-3、在ActiveMQ中传递Stomp消息
好吧,既然我们已经讨论过如何安装和运行ActiveMQ,也讨论了Stomp协议的组织结构,为什么我们不立即动手试一试操作ActiveMQ承载Stomp协议的消息呢?
下面我们使用ActiveMQ提供的JAVA 客户端(实际上就是ActiveMQ对JMS规范的实现),向ActiveMQ中的Queue(示例代码中将这个Queue命名为&test&)发送一条Stomp协议消息,然后再使用JAVA语言的客户端,从ActiveMQ上接受这条消息:
使用ActiveMQ的API发送Stomp协议消息:
package mq.test.
import java.net.S
import java.util.D
import org.apache.activemq.transport.stomp.StompC
// 消息生产者
public class TestProducer {
public static void main(String[] args) {
// 建立Stomp协议的连接
StompConnection con = new StompConnection();
Socket so = new Socket(&192.168.61.138&, 61613);
con.open(so);
// 注意,协议版本可以是1.2,也可以是1.1
con.setVersion(&1.2&);
// 用户名和密码,这个不必多说了
con.connect(&admin&, &admin&);
// 以下发送一条信息(您也可以使用&事务&方式)
con.send(&/test&, &234543& + new Date().getTime());
} catch(Exception e) {
e.printStackTrace(System.out);
使用ActiveMQ的API接收Stomp协议消息:
package mq.test.
import java.net.S
import java.net.SocketTimeoutE
import java.util.M
import org.apache.activemq.transport.stomp.StompC
import org.apache.activemq.transport.stomp.StompF
public class TestConsumer {
public static void main(String[] args) throws Exception {
// 建立连接
StompConnection con = new StompConnection();
Socket so = new Socket(&192.168.61.138&, 61613);
con.open(so);
con.setVersion(&1.2&);
con.connect(&admin&, &admin&);
String ack = &client&;
con.subscribe(&/test&, &client&);
// 接受消息(使用循环进行)
StompFrame frame =
// 注意,如果没有接收到消息,
// 这个消费者线程会停在这里,直到本次等待超时
frame = con.receive();
} catch(SocketTimeoutException e) {
// 打印本次接收到的消息
System.out.println(&frame.getAction() = & + frame.getAction());
Map headers = frame.getHeaders();
String meesage_id = headers.get(&message-id&);
System.out.println(&frame.getBody() = & + frame.getBody());
System.out.println(&frame.getCommandId() = & + frame.getCommandId());
// 在ack是client标记的情况下,确认消息
if(&client&.equals(ack)) {
con.ack(meesage_id);
以上分别是使用Activie提供的Stomp协议的消息生产端和Stomp协议的消息消费端的代码(如果您不清楚Stomp协议的细节,可以参考我另一篇文章:《架构设计:系统间通信(19)&&MQ:消息协议(上)》)。请注意在代码片段中,并没有出现任何一个带有jms名称的包或者类&&这是因为ActiveMQ为Stomp协议提供的JAVA API在内部进行了JMS规范的封装。
您可以查看activemq-stomp中关于协议转换部分的源代码:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父级接口:org.apache.activemq.transport.stomp.FrameTranslator来验证这件事情(关于ActiveMQ对JMS规范的实现设计,如果后续有时间再回头进行讲解)。
以下是Stomp协议的消费者端的运行效果(在生产者端已经向ActiveMQ插入了一条消息之后):
frame.getAction() = MESSAGE
frame.getBody() = 0073204
frame.getCommandId() = 0
注意,由于消息体中插入了一个时间戳,所以您复制粘贴代码后运行效果并不会和我的演示程序完全一致。
2-4、ActiveMQ中的Queue和Topics
如果您细心的话,在ActiveMQ提供的管理页面上已经看到有两个功能页面:Queue和Topic。Queue和Topic是JMS为开发人员提供的两种不同工作机制的消息队列。 在ActiveMQ官方的解释是:
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
中文的可以译做:JMS-Topic 队列基于&订阅-发布&模式,当操作者发布一条消息后,所有对这条消息感兴趣的订阅者都可以收到它&&也就是说这条消息会被拷贝成多份,进行分发。只有当前&活动的&订阅者能够收到消息(换句话说,如果当前JMS-Topic队列中没有订阅者,这条消息将被丢弃)。
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
So Queues implement a reliable load balancer in JMS.
中文的可以译做:JMS-Queue是一种&负载均衡模式&的实现。一个消息能且只能被一个消费者接受。如果当前JMS-Queue中没有任何的消费者,那么这条消息将会被Queue存储起来(实际应用中可以存储在磁盘上,也可以存储在中,看软件的配置),直到有一个消费者连接上。另外,如果消费者在接受到消息后,在他断开与JMS-Queue连接之前,没有发送ack信息(可以是客户端手动发送,也可以是自动发送),那么这条消息将被发送给其他消费者。
以下表格摘自互联网上的资料,基本上把Queue和Topic这两种队列的不同特性说清楚了:
Topic 模式队列
Queue 模式队列
&订阅-发布&模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息
&负载均衡&模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息。
Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。
传递完整性
如果没有订阅者,消息会被丢弃
消息不会丢弃
由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异
由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的
2-5、JMS和协议间转换
上文已经说到,JMS这套面向消息通信的 JAVA API 是一个和厂商无关的规范。通过JMS,我们能实现不同消息中间件厂商、不同协议间的转换和交互。这一小节我们就来讨论一下这个问题。如果用一张图来表示JMS在消息中间件中的作用话,那么就可以这么来画:
首先您使用的MQ消息中间件需要实现了JMS规范;那么通过JMS规范,开发人员可以忽略各种消息协议的细节,只要消息在同一队列中,就能够保证各种消息协议间实现互相转换。下面我们首先来看一个使用JMS API在ActiveMQ中操作openwire协议消息的简单示例,然后再给出一个通过JMS,实现Stomp消息协议和Openwire消息协议间的互转示例。
2-5-1、JMS操作
以下代码使用向某个Queue(命名为test)中发送一条消息:
import javax.jms.C
import javax.jms.D
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQConnectionF
* 测试使用JMS API连接ActiveMQ
* @author yinwenjie
public class JMSProducer {
* 由于是测试代码,这里忽略了异常处理。
* 正是代码可不能这样做
* @param args
* @throws RuntimeException
public static void main (String[] args) throws Exception {
// 定义JMS-ActiveMQ连接信息(默认为Openwire协议)
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(&tcp://192.168.61.138:61616&);
Session session =
Destination sendQ
Connection connection =
//进行连接
connection = connectionFactory.createQueueConnection();
connection.start();
//建立会话(设置一个带有事务特性的会话)
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//建立queue(当然如果有了就不会重复建立)
sendQueue = session.createQueue(&/test&);
//建立消息发送者对象
MessageProducer sender = session.createProducer(sendQueue);
TextMessage outMessage = session.createTextMessage();
outMessage.setText(&这是发送的消息内容&);
//发送(JMS是支持事务的)
sender.send(outMessage);
sender.close();
connection.close();
当以上代码运行到&start&的位置时,我们可以通过观察ActiveMQ管理界面中connection列表中的连接信息,发现消息生产者已经建立了一个Openwire协议的连接:
从而确定我们通过JMS API建立了一个Z"/kf/ware/vc/" target="_blank" class="keylink">vcGVud2lyZdCt0um1xM2o0bbBrL3ToaO909fFztLDx8q508PS1M/CtPrC66OsvajBotK7uPa7+dPab3BlbndpcmXQrdLptcQmbGRxdW87z/u30dXfJnJkcXVvO6GjPHN0cm9uZz7XotLio7rP+8+iyfqy+tXfus3P+8+iz/u30dXfo6zTs8nktcS208HQsdjQ69K71sKhozwvc3Ryb25nPqOo1NrKvsD9tPrC69bQo6zL/MPHtrzTs8nkw/uzxs6qdGVzdLXESk1TLVF1ZXVlo6k8L3A+DQrS1M/CtPrC68q508NKTVO008SzuPZRdWV1ZdbQvdPK1c/7z6Kjug0KPHByZSBjbGFzcz0="brush:">
import javax.jms.C
import javax.jms.D
import javax.jms.M
import javax.jms.MessageC
import javax.jms.MessageL
import javax.jms.S
import org.apache.activemq.ActiveMQConnectionF
* 测试使用JMS API连接ActiveMQ
* @author yinwenjie
public class JMSConsumer {
* 由于是测试代码,这里忽略了异常处理。
* 正是代码可不能这样做
* @param args
* @throws RuntimeException
public static void main (String[] args) throws Exception {
// 定义JMS-ActiveMQ连接信息
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(&tcp://192.168.61.138:61616&);
Session session =
Destination sendQ
Connection connection =
//进行连接
connection = connectionFactory.createQueueConnection();
connection.start();
//建立会话(设置为自动ack)
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立Queue(当然如果有了就不会重复建立)
sendQueue = session.createQueue(&/test&);
//建立消息发送者对象
MessageConsumer consumer = session.createConsumer(sendQueue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
// 接收到消息后,不需要再发送ack了。
System.out.println(&Message = & + arg0);
synchronized (JMSConsumer.class) {
JMSConsumer.class.wait();
consumer.close();
connection.close();
当以上&消费者&代码运行到start的位置时,我们通过ActiveMQ提供的管理界面可以看到,基于Openwire协议的连接增加到了两条:
注意,您在运行以上测试代码时,不用和我的运行顺序一致。由于Queue模式的队列是要进行消息状态保存的,所以无论您是先运行&消费者&端,还是先运行&生产者&端,最后&消费者&都会收到一条消息。类似如下的效果:
Message = ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId = ID:yinwenjie-240-:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:yinwenjie-240-:1:1:1, destination = queue:///test, transactionId = TX:ID:yinwenjie-240-:1:1, expiration = 0, timestamp = 4, arrival = 0, brokerInTime = 6, brokerOutTime = 7, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@66968df8, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 这是发送的消息内容}
2-5-2、协议间转换
下面我们将Openwire协议的消息通过JMS送入Queue队列,并且让基于Stomp协议的消费者接收到这条消息。为了节约篇幅,基于Openwire协议的生产者的代码请参考上一小节2-5-1中&生产者&的代码片段。这里只列出Stomp消息的接受者代码(实际上这段代码在上文中也可以找到):
Stomp协议的消息消费者(消息接收者):
package mq.test.
import java.net.S
import java.net.SocketTimeoutE
import java.util.M
import org.apache.activemq.transport.stomp.StompC
import org.apache.activemq.transport.stomp.StompF
public class TestConsumer {
public static void main(String[] args) throws Exception {
// 建立连接(注意,Stomp协议的连接端口是61613)
StompConnection con = new StompConnection();
Socket so = new Socket(&192.168.61.138&, 61613);
con.open(so);
con.setVersion(&1.2&);
con.connect(&admin&, &admin&);
String ack = &client&;
con.subscribe(&/test&, &client&);
// 接受消息(使用循环进行)
StompFrame frame =
// 注意,如果没有接收到消息,
// 这个消费者线程会停在这里,直到本次等待超时
frame = con.receive();
} catch(SocketTimeoutException e) {
// 打印本次接收到的消息
System.out.println(&frame.getAction() = & + frame.getAction());
Map headers = frame.getHeaders();
String meesage_id = headers.get(&message-id&);
System.out.println(&frame.getBody() = & + frame.getBody());
System.out.println(&frame.getCommandId() = & + frame.getCommandId());
// 在ack是client模式的情况下,确认消息
if(&client&.equals(ack)) {
con.ack(meesage_id);
当您同时运行Openwire消息发送者和Stomp消息接收者时,您可以在ActiveMQ的管理界面看到这两种协议的连接信息:
以下是Stomp协议消费者接收到的消息内容(经过转换的openwire协议消息):
frame.getAction() = MESSAGE
frame.getBody() = 这是发送的消息内容
frame.getCommandId() = 0
(window.slotbydup=window.slotbydup || []).push({
id: '2467140',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467141',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467143',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467148',
container: s,
size: '1000,90',
display: 'inlay-fix'

我要回帖

更多关于 网络公司 的文章

 

随机推荐