如何实现activemq topic 配置的Topic的持久订阅

ActiveMQ持久发布订阅模式publish-subscribe
上一篇已经介绍ActiveMQ的三种消息模式,现在就持久发布订阅模式publish-subscribe编写代码示例如下,未使用第三方框架,ActiveMQ版本5.10
Publisher.java消息发布类
package example.durableT
import org.apache.qpid.amqp_1_0.jms.impl.*;
import javax.jms.*;
class Publisher {
& & public static void
main(String []args) throws Exception {
& & ConnectionFactoryImpl
factory = new ConnectionFactoryImpl("localhost", 5672, "admin",
& & Connection connection =
factory.createConnection();
connection.start();
& & Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
& & TopicImpl sendDest = new
TopicImpl("topic.persistent");
& & MessageProducer producer
=session.createProducer(sendDest);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置持久
& & String txt
="abc"+System.currentTimeMillis();
& & TextMessage sendMsg =
session.createTextMessage(txt);
producer.send(sendMsg);
System.out.println("消息发出:"+txt);
//connection.close();
consumer.java持久消息订阅类
package example.durableT
import org.apache.qpid.amqp_1_0.jms.impl.*;
import javax.jms.*;
class consumer {
& & public static void
main(String []args) throws JMSException {
& & ConnectionFactoryImpl
factory = new ConnectionFactoryImpl("localhost", 5672, "admin",
& & Connection connection =
factory.createConnection();
& & String clientID
connection.setClientID(clientID);//持久订阅需要设置这个,据说,负载情况下每个客户端id要不同,不然会报错
connection.start();
& & Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
& & Topic reciveDest
=session.createTopic("topic.persistent");
& & MessageConsumer consumer
= session.createDurableSubscriber(reciveDest, clientID);
& & int i=0;
& & while(true) {
Message msg = consumer.receive();
if(msg instanceof TextMessage){
String txt = ((TextMessage)msg).getText();
System.out.println("收到的消息数:"+(++i)+"收到的消息:"+txt);
//connection.close();
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。greemranqq
阅读(1595)
demo2 留下了两个问题:
1.我们利用demo2 的配置,在queue 模式下 连续发送10W消息出现出现状况。
2.topic 模式下,消费者重启时间段收不到监听的信息怎么办?
二、问题解析:
1.测试 发送10W消息,中途会出现
socket: tcp://localhost:61616 due to: java.net.BindException: Address already in use: JVM_Bind 异常。
你关掉activemq,利用netstat -aon | findstr "61616"
发现没有这个端口占用情况,查阅资料才知道:
http://activemq.apache.org/jmstemplate-gotchas.html
里面解释道,发送消息的时候创建connection,session 还要关闭,比较费资源,我猜测当创建销毁操作没测试完成的时候,另一个消息发送的时候,发现端口被占着,就会出现这个种情况,也就是说当发送频率比较高的情况,容易出现,文档建议用 pool 的东西。
一共有spring 的CachingConnectionFactory 和 activemq 的PooledConnectionFactory,由于PooledConnectionFactory 这东西要 activemq-pool.jar ,因此我还是选择CachingConnectionFactory~。~。
配置spring-jms.xml更改如下:
&!-- jms 连接工厂 --&
&bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"&
&property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=false" /&
&bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"&
&!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --&
&property name="targetConnectionFactory" ref="connectionFactory"/&
&!-- Session缓存数量,这里属性也可以直接在这里配置 --&
&property name="sessionCacheSize" value="100" /&
&!-- 基本的bean模板 --&
&bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"&
&!-- 链接工厂,这里应用缓存池 就行了--&
&property name="connectionFactory" ref="cachingConnectionFactory"/&
这样操作了时候,发送20W条信息也没出现过问题,而且速度比发送快了几十倍......
2.topic 模式下如果A 消费者挂了,就收不到消息了,而我又想它收到消息,我们先来尝试持久化吧!
1.对于持久化,并不默认,其实queue 默认就持久化在文件里面的,但是topic 模式下我们得开启持久化配置, 在activemq.xml 里面有这样的配置:
&!-- 这里设置true 就算开启了 --&
&broker xmlns="http://activemq.apache.org/schema/core" persistent="true"
brokerName="localhost" dataDirectory="${activemq.base}/data"&
&!-- 这里是文件存放的位置,其他的说明暂时不讲 --&
&persistenceAdapter&
&amqPersistenceAdapter
syncOnWrite="true"
directory="${activemq.base}/data2" maxFileLength="3mb"/&
&/persistenceAdapter&
2.持久化我们在spring-jms.xml 里面还得开启几个东西:
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"&
&!-- jms 连接工厂 --&
&bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"&
&property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /&
&bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"&
&!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --&
&property name="targetConnectionFactory" ref="connectionFactory"/&
&!-- Session缓存数量 --&
&property name="sessionCacheSize" value="100" /&
&!-- 基本的bean模板 --&
&bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"&
&!-- 链接工长 --&
&property name="connectionFactory" ref="cachingConnectionFactory"/&
&!-- 进行持久化 --&
&property name="deliveryMode" value="2" /&
&!--订阅 发布模式 --&
&property name="pubSubDomain" value="true" /&
&!-- 消息订阅模式 --&
&bean id="topicDestination" class="org.mand.ActiveMQTopic"&
&!-- 订阅消息的名字 --&
&constructor-arg index="0" value="orderTopic"/&
3.想想我们订阅者需要做些什么呢?发布者发布消息,订阅者去消费,这是1对多的形式,我们可以这样理解:公司设定很多活动代金卷,去参加活动的人都能领取,当然这分两种情况,第一种 就是我们前面测试的,只要我公司门口等(监听),活动开始(发布)就能领取了,如果你当时没在,就领取不到。第二种:很多情况下,公司搞活动我们不会等在那里,只要活动开始了,那么我过段时间也可以去,礼品公司会保留的,这种情况会导致多次领取,因此总要登记一下嘛,不能你领取了,过一会又来吧?activemq 里面会有clientId 标示来区分,类似于身份证ID嘛。
当然有些情况下, 我们一个ID 可以领取多个不同的奖品,因此还得需要个字段标示:durableSubscriptionName,标示我们领取哪个礼品,下面先看配置
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"&
&!-- jms 连接工厂 --&
&bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"&
&!-- Jvm 内部传输,暂时不用TCP,使用暂时异步传输
&property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" /&
&bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"&
&!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --&
&property name="targetConnectionFactory" ref="connectionFactory"/&
&!-- 接收者ID --&
&property name="clientId" value="clientA" /&
&!-- 消息订阅模式 --&
&bean id="topicCustomerA" class="org.mand.ActiveMQTopic"&
&!-- 订阅消息的名字 --&
&constructor-arg index="0" value="orderTopic"/&
&!-- 消息监听,这里可以认为是A服务器的监听 --&
&bean id="messageListener" class="com.raycloud.excalibur.mq.ConsumerMessageListener"/&
&bean id="listenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer"&
&property name="connectionFactory" ref="connectionFactory" /&
&property name="destination" ref="topicCustomerA" /&
&property name="messageListener" ref="messageListener" /&
&!-- 持久化消息 --&
&property name="subscriptionDurable" value="true"/&
&!-- 接收者ID --&
&property name="clientId" value="clientA" /&
&!-- 这里名字可以任意改变,A 领取了,你可以改成B 还可以领取,可以举例不是很恰当 --&
&property name="durableSubscriptionName" value="clientA"/&
// 这是消费者代码,这里你可以创建 多个XMl文件,模拟多个消费者。
public class JmsTopicReceiver{
public static void main(String[] args) throws Exception {
// 加载消费者监听
ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms-consumerA.xml");
// 写个死循环,模拟服务器一直运行
while (true){}
// 监听代码 直接输出
public class ConsumerMessageListener implements MessageListener {
public void onMessage(Message message) {
System.out.println("topic 收到消息:"+message);
三、测试:
我们要完成一个收到一个order,然后N个服务计算的问题,因此采用topic 模式,而防止中途服务器挂掉,采用持久化方式,模拟测试如下:
1.启动两个ConsumerA,ConsumerB 监听,发布一个order ,同时收到消息,OK
2.启动一个ConsumerA,发布一个Order,再启动ConsumerB,也收到消息,OK
3.启动一个ConsumerA,发布一个Order,A收到,关闭mq服务器,重启mq服务(前后),重启ConsumerB ,同样收到消息,OK。
4.启动两个ConsumerA,ConsumerB,发布order,A,B收到消息,重启A,B
不收重复消息,OK
好像基本能满足需求了,由于发送量 不会很大,频率不会很高,可以试用一下了。
那么新问题是:
1.如果A,B 收到消息后,topic
的消息怎么处理呢? 一直保存着吗? 如果可以清除,怎么清除,什么时候进行清除呢?文件的方式方便管理吗?
2.虽然消息发送过去了,对象信息怎么接受呢,当然会有消息转换器..
3.在queue 模式下,服务器断开了,怎么从新连接呢,如果服务器挂了,怎么切换到备用的呢?
1.这是初步尝试了下 topic 持久化到文件,当然也可以持久化到数据的,关于activemq 持久化以及介绍文章,可以参考:http://blog.csdn.net/xyw_blog/article/details/9128219 比较详细。
阅读排行榜mq topic持久化订阅者(topic、queue的producer.setDeliveryMode(DeliveryMode. PERSISTENT)是指的mq服务),queue的消费者不在也会给他保留,topic只有持久化订阅者会保留
(1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可
。然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。
(2)使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。
首先,假设消费者都是普通的消费者,
------------------------
&1&activemq启动后,发布消息1,可惜,现在没有消费者启动着,也就是没有消费者进行了订阅。那么
,这个消息就被抛弃了。
&2&消费者1启动了,连接了activemq,进行了订阅,在等待消息~~
activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。
&3&消费者2也启动了,连接了activemq,进行了订阅,在等待消息~~
activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
&4&消费者1关掉了。
activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。
&5&消费者1又启动了。
activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
-----------------------------
总结一下:
activemq只是向当前启动的消费者发送消息。
关掉的消费者,会错过很多消息,并无法再次接收这些消息。
如果发送的消息是重要的用户同步数据,错过了,用户数据就不同步了。
那么,如何让消费者重新启动时,接收到错过的消息呢?
答案是持久订阅。
(3)普通的订阅,不区分消费者,场地里有几个人头,就扔几个馒头。
持久订阅,就要记录消费者的名字了。
张三说,我是张三,有馒头给我留着,我回来拿。
李四说,我是李四,有馒头给我留着,我回来拿。
activemq就记下张三,李四两个名字。
那么,分馒头时,还是一个人头给一个馒头。
分完了,一看张三没说话,说明他不在,给他留一个。
李四说话了,那就不用留了。
张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。
可能是一个馒头,也可能是100个馒头,就看张三离开这阵子,分了多少次馒头了。
activemq区分消费者,是通过clientID和订户名称来区分的。
//&创建connection
connection&=&connectionFactory.createConnection();
connection.setClientID(&bbb&);&//持久订阅需要设置这个。
connection.start();
//&创建session
Session&session&=&connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//&创建destination
Topic&topic&=&session.createTopic(&userSyncTopic&);&//Topic名称
//MessageConsumer&consumer&=&session.createConsumer(topic);&//普通订阅
MessageConsumer&consumer&=&session.createDurableSubscriber(topic,&bbb&);&//持久订阅
(4)还有一点,消息的生产者,发送消息时用使用持久模式
MessageProducer&producer&=&...;
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
不设置,默认就是持久的
(5)使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。
(6)activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。
查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。
可以复制activemq-jdbc.xml中的内容过来,修改一下,就可以把消息保存在其它数据库中了。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:40153次
积分:1050
积分:1050
排名:千里之外
原创:59篇
转载:68篇
(4)(1)(4)(17)(6)(4)(16)(19)(9)(3)(23)(22)&>&&>&&>&&>&spring+activemq topic持久化订阅
spring+activemq topic持久化订阅
上传大小:42KB
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:QueueProducer 、消息消费者1:SimpleJMSReceiver
消息消费者2:SimpleJMSReceiver2...展开收缩
综合评分:0(0位用户评分)
所需积分:1
下载次数:2
审核通过送C币
创建者:id_rin
创建者:qq_
创建者:qq_
课程推荐相关知识库
上传者其他资源上传者专辑
开发技术热门标签
VIP会员动态
您因违反CSDN下载频道规则而被锁定帐户,如有疑问,请联络:!
android服务器底层网络模块的设计方法
所需积分:0
剩余积分:720
您当前C币:0
可兑换下载积分:0
兑换下载分:
兑换失败,您当前C币不够,请先充值C币
消耗C币:0
你当前的下载分为234。
spring+activemq topic持久化订阅
会员到期时间:
剩余下载次数:
你还不是VIP会员
开通VIP会员权限,免积分下载
你下载资源过于频繁,请输入验证码
您因违反CSDN下载频道规则而被锁定帐户,如有疑问,请联络:!
若举报审核通过,可奖励20下载分
被举报人:
举报的资源分:
请选择类型
资源无法下载
资源无法使用
标题与实际内容不符
含有危害国家安全内容
含有反动色情等内容
含广告内容
版权问题,侵犯个人或公司的版权
*详细原因:

我要回帖

更多关于 activemq topic订阅 的文章

 

随机推荐