关于ActiveMQ中怎么实现一对多activemq 发送消息息讨论

2008人阅读
spring(15)
1. 概述:Spring提供了一个用于简化JMS API使用的抽象框架,并且对用户屏蔽了JMS API中1.0.2和1.1版本的差异。
JMS的功能大致上分为两块,叫做消息制造和消息消耗。JmsTemplate 用于制造消息和同步消息接收。我们今天就用JmsTemplate实现同步的消息接受。
使用JMS发(接)消息的步骤:
& 1)创建连接工厂
& 2)使用连接工厂创建连接
& 3)使用连接创建会话
& 4)获取一个目的地
& 5)使用会话和目的地创建消息生产者(消息消费者)
& 6)使用连接创建一个需要发送的消息类型实例
& 7)使用连接的一个队列发送器或主题公布器,使用发送器或者主题器发送消息(接受消息)
spring中的JmsTemplate实现了对jms的一些封装,内部提供了很多的方法,我们只需要实现定义的回调接口即可。JmsTemplate继承自JmsAccessor,在JmsAccessor中有ConnectionFactory的定义,而JmsTemplate本身的构造方法也有对ConnectionFactory的封装:
public JmsTemplate(ConnectionFactory connectionFactory) {
&&&&&&&&this();
&&&&&&&&setConnectionFactory(connectionFactory);
&&&&&&&&afterPropertiesSet();
所以,我们有两种方式注入ConnectionFactory,本文我们采用构造方法的方式。
spring_jms.xml
MessageCreator 回调接口通过JmsTemplate中调用代码提供的Session来创建一条消息。
看一下MessageCreator接口:
public interface MessageCreator {
&&&&Message createMessage(Session session) throws JMSE
那么,我们来实现发送和接受消息DummyJms类
public class DummyJms {
&&&&public static void main(String[] args) throws Exception{
&&&&&&&&ApplicationContext context = new ClassPathXmlApplicationContext(&spring.xml&);
&&&&&&&&JmsTemplate jmsTemplate = (JmsTemplate)context.getBean(&jmsTemplate&);
&&&&&&&&Destination destination = (Destination)context.getBean(&destination&);
&&&&&&&&jmsTemplate.send(destination, new MessageCreator(){
&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&public Message createMessage(Session session)
&&&&&&&&&&&&&&&&&&&&&&&&throws JMSException {
&&&&&&&&&&&&&&&&&&&&return session.createTextMessage(&send message &);
&&&&&&&&&&&&&&&&}
&&&&&&&&&&&&&&&&
&&&&&&&&&&&&
&&&&&&&&});
&&&&&&&&TextMessage msg = (TextMessage)jmsTemplate.receive(destination);
&&&&&&&&System.out.println(&receive message = & + msg.getText());
输出结果:
receive message = send message&
可是我们并没有看到的像前文描述的那那些创建消息生产者,消息消费者的一些东西。继续分析,我们可以看一下,
jmsTemplate.send(Destination destination,MessageCreator messageCreator)这里到底做了什么,可以让我们不费吹灰之力,就可以实现消息的发送。JmsTemplate源代码:
public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
&&&&&&&&execute(new SessionCallback() {
&&&&&&&&&&&&public Object doInJms(Session session) throws JMSException {
&&&&&&&&&&&&&&&&doSend(session, destination, messageCreator);
&&&&&&&&&&&&&&&&
&&&&&&&&&&&&}
&&&&&&&&}, false);
JmsTemplate实现了JmsOperations接口,在JmsOperations里有
T execute(SessionCallback action) throws JmsE
那么这个SessionCallback接口是什么呢?它也为用户提供了JMS session。
public interface SessionCallback {
&&&&T doInJms(Session session) throws JMSE
继续往下看。doSend方法:
protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
&&&&&&&&&&&&throws JMSException {
&&&&&&&&Assert.notNull(messageCreator, &MessageCreator must not be null&);
&&&&&&&&MessageProducer producer = createProducer(session, destination);
&&&&&&&&try {
&&&&&&&&&&&&Message message = messageCreator.createMessage(session);
&&&&&&&&&&&&if (logger.isDebugEnabled()) {
&&&&&&&&&&&&&&&&logger.debug(&Sending created message: & + message);
&&&&&&&&&&&&}
&&&&&&&&&&&&doSend(producer, message);
&&&&&&&&&&&&// Check commit - avoid commit call within a JTA transaction.
&&&&&&&&&&&&if (session.getTransacted() && isSessionLocallyTransacted(session)) {
&&&&&&&&&&&&&&&&// Transacted session created by this template -& commit.
&&&&&&&&&&&&&&&&mitIfNecessary(session);
&&&&&&&&&&&&}
&&&&&&&&finally {
&&&&&&&&&&&&JmsUtils.closeMessageProducer(producer);
createProducer()方法又调用了doCreateProducer(),实际的消息生产者在这里。
protected MessageProducer doCreateProducer(Session session, Destination destination) throws JMSException {
&&&&&&&&return session.createProducer(destination);
在这里,我们看到了,spring创建了消息的发送者,关闭连接的一些操作。到这里,大家就明白了,spring内部处理Jms消息的过程了吧(消息的接受也是一样)。
注:本文使用spring3.0和activemq5.2版本。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:28072次
排名:千里之外
原创:26篇
(20)(1)(1)(1)(1)(5)(1)(4)<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&12.6 使用ActiveMQ消息调度延迟发送消息 - 推酷
12.6 使用ActiveMQ消息调度延迟发送消息
12.6 Scheduling messages to be delivered by ActiveMQ in the future
12.6 使用ActiveMQ消息调度延迟发送消息
The ability to schedule a message to be delivered after a delay, or at regular intervals,
is an extremely useful feature provided by ActiveMQ. One unique benefit is that messages
that are scheduled to be delivered in the future are stored persistently, so that
they can survive a hard failure of an ActiveMQ broker and be delivered on restart.
You specify that you want a message to be delivered at a later time by setting welldefined
properties on the message. For convenience, the well-known property names
are defined in the org.apache.activemq.ScheduledMessage interface. These properties
are shown in table 12.2.
ActiveMQ消息调度实现的消息延迟发送或者在按照固定时间的间隔实现间隔发送的功能十分有用.
其中一个独一无二的好处是消息调度设置为延迟发送的消息将会被持久化存储,因而在ActiveMQ代理
严重失效是消息不会丢失并且在代理重启后会继续发送消息.你可以通过严格定义消息的属性来设置如何
延迟发送消息.为方便起见,常用的延迟发送消息相关的属性都在org.apache.activemq.ScheduledMessage
接口中有定义,如表12.2所示.
Table 12.2 TransportConnector properties for updating clients of cluster changes
Property & & & & & & & & & & & & & & & & & & & &type & & & & & & & & & & & & & & & & &Description
AMQ_SCHEDULED_DELAY & & & & false & & & & & & & & & The time in milliseconds that a message will wait before
& & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & being scheduled to be delivered by the broker
AMQ_SCHEDULED_DELAY & & & & false & & & & & & & & & 消息延迟发送的延迟时间(单位毫秒) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &
& & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &&
AMQ_SCHEDULED_PERIOD & & &false & & & & & & & & & & The time in milliseconds after the start time to wait before
& & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & scheduling the message again
AMQ_SCHEDULED_PERIOD & & &false & & & & & & & & & & 代理启动后,发送消息之前的等待时间(单位毫秒). & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &
AMQ_SCHEDULED_REPEAT & & &false & & & & & & & & & & The number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_REPEAT & & &false & & & & & & & & & & 调度消息发送的重复次数
AMQ_SCHEDULED_CRON & & & &String & & & & & & & & & &Use a cron entry to set the schedule
AMQ_SCHEDULED_CRON & & & &String & & & & & & & & & &使用一个cron实体设置消息发送调度
To have a message wait for a period of time before its delivered, you only need to set
the AMQ_SCHEDULED_DELAY property. Suppose you want to publish a message from
your client, but have it actually delivered in 5 minutes time. You’d need to do something
like the following in your client code:
只需设置AMQ_SCHEDULED_DELAY这一个属性即可让消息等待一段时间后再发送.假设你打算从你的
客户端发送消息,但需要设置消息延迟5分钟后发送,你可以使用下面的客户端代码来实现:
& MessageProducer producer = session.createProducer(destination);
& TextMessage message = session.createTextMessage(&test msg&);
& long delayTime = 5 * 60 * 1000;
& message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
& producer.send(message);
ActiveMQ will store the message persistently in the broker, and when it’s scheduled, it
will deliver it to its destination. This is important, because although you’ve specified
that you want the message to be delivered in 5 minutes time, if the destination is a
queue, it will be posted to the end of the queue. So the actual delivery time will be
dependent on how many messages already exist on the queue awaiting delivery.
设置延迟发送之后,ActiveMQ将消息存储在代理中,等待设置的延时时间过了之后,消息会被发送到
设定的目的地.尽管你已经指定了消息在5分钟之后发送,但是如果消息目的地是一个消息队列,则消息
会被发送到队列的末端,这一点很重要.因而,消息发送的实际延迟时间将取决于当前消息的目的地队列中
You can also use a the AMQ_SCHEDULED_PERIOD and AMQ_SCHEDULED_REPEAT properties
to have messages delivered at a fixed rate. The following example will send a message
100 times, every 30 seconds:
你也可以使用AMQ_SCHEDULED_PERIOD和AMQ_SCHEDULED_REPEAT属性设置消息按照固定间隔时间
发送固定的次数.下面的示例代码将发送消息100,每次发送间隔时间为30秒:
& MessageProducer producer = session.createProducer(destination);
& TextMessage message = session.createTextMessage(&test msg&);
& long delay = 30 * 1000;
& long period = 30 * 1000;
& message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
& message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
& message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,COUNT repeat);
& producer.send(message);
Note that we specified the repeat as being 99, as the first message + 99 = 100. If you
schedule a message to be sent once, the message ID will be the same as the one you
published. If you schedule a repeat, or use the AMQ_SCHEDULED_CRON property to
schedule your message, then ActiveMQ will create a unique message ID for the delivered
注意,我们将消息重复发送的次数设置为99,因为第一次发送消息 + 99此重复=100.如果你设置消息只被
发送一次,那么消息的ID即使你设置的消息ID.如果你设置了重复发送,或者使用AMQ_SCHEDULED_CRON
属性来调度消息发送,那么ActiveMQ会生产一个新的唯一的消息ID作为重复发送的消息ID.
Cron is a well-known job scheduler on Unix systems, and it uses an expression
string to denote when a job should be scheduled. ActiveMQ uses the same syntax, as
Cron是Unix系统中任务调度器,它使用一个字符串来表示一个任务何时需要被执行.
ActiveMQ使用同样的预防,如下文本描述:
.---------------- & & minute (0 - 59)
| &.-------------- & & hour (0 - 23)
| &| &.------------ & & day of month (1 - 31)
| &| &| &.---------- & & month (1 - 12) - 1 = January
| &| &| &| &.-------- & & day of week (0 - 7) (Sunday=0 or 7
For example, if you want to schedule a message to be delivered at 2 a.m. on the twelfth
day of every month, you’d need to do the following:
例如,如果你打算在每月的12号早上2点发送消息,你需要按照如下代码所示进行设置:
& MessageProducer producer = session.createProducer(destination);
& TextMessage message = session.createTextMessage(&test msg&);
& message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,&0 2 12 * *&);
& producer.send(message);
You can combine scheduling with cron and a simple delay and repeat, but the cron
entry will always take precedence. For example, instead of sending one message at 2
a.m. on the twelfth day of every month, you may want to schedule 10 messages to be
delivered every 30 seconds:
你可以同时使用cron和普通的延迟与重复来调度消息发送,但是cron方式的调度具有优先权.例如,
不同于每月的12号早上2点只发送一条消息,你可能打算在每月的12号早上2点开始发送消息,
并且每隔30秒再重复发送9个消息:
& long delay = 30 * 1000;
& long period = 30 * 1000;
& MessageProducer producer = session.createProducer(destination);
& TextMessage message = session.createTextMessage(&test msg&);
& message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,&0 2 12 * *&);
& message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
& message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
& message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,COUNT repeat);
& producer.send(message);
In this section we’ve looked at how to schedule messages for sometime in the future
using ActiveMQ. You should now be able to send messages after a delay, send multiple
instances of the same message at regular intervals, and use a cron entry to schedule
本节中我们看到了如何使用ActiveMQ调度消息发送使得消息能在未来的某个时间发送.现在,你应当能够
延迟发送消息,也能够以固定间隔时间重复发送消息,并且能够使用cron实体来调度消息发送.
& & & & & & & & & & & & & & & & & & & & & & & & & & & &&
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
没有分页内容
图片无法显示
视频无法显示
与原文不一致3451人阅读
java(114)
activemq实现了jms规范,使用jms发送消息,从消息的一端发送到另一端,当加入了activemq作为中间作,我们可以使用异步的方式发送消息.大概过程:发送者与activemq建立连接,先发送消息到activemq.然后,接收者与activemq建立连接,从activemq取回消息.发送者发消息的时候不依赖接收者,接收者接收消息的时候不依赖发送者,两个过程可以不同步进行.达到应用解耦的作用.如果接收者不去取消息,消息也没过期,那么消息会一直驻留在activemq这个中间件.
下面只是一个简单例子.接收者和发送者,几乎同时与activemq建立连接,接收者建立起监听,发送者一发送消息,接收者就马上接收到消息.
一.注册Connection Factory Bean.
@Bean(destroyMethod = &stop&)
public PooledConnectionFactory pooledConnectionFactory(){
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(&admin&,&admin&,&tcp://localhost:61616&);
return new PooledConnectionFactory(activeMQConnectionFactory);
二.注册Destination Bean(使用队列发送消息).
public ActiveMQQueue activeMQQueue(){
return new ActiveMQQueue(&ExamQueueName&);
三.创建Consumer.
& 1.注册MessageListener实现Bean.
public ExamQueueListener examQueueListener(){
return new ExamQueueListener();
其中ExamQueueListener.java如下:
package com.exam.
import javax.jms.M
import javax.jms.MessageL
* Created by nil on .
public class ExamQueueListener implements MessageListener {
public void onMessage(Message message) {
System.out.println(message);
& 2.使用connection,destination和listener创建consumer.
public DefaultMessageListenerContainer defaultMessageListenerContainer(){
DefaultMessageListenerContainer defaultMessageListenerContainer=new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(pooledConnectionFactory());
defaultMessageListenerContainer.setDestination(activeMQQueue());
defaultMessageListenerContainer.setMessageListener(examQueueListener());
return defaultMessageListenerC
四.创建Producer.
& 1.使用spring的JmsTemplate.
public JmsTemplate jmsTemplate(){
return new JmsTemplate(pooledConnectionFactory());
& 2.将destination和jmsTemplate注入到examPublisher来注册Producer Bean.
public ExamPublisher examPublisher(){
return new ExamPublisher(jmsTemplate(),activeMQQueue());
&其中,ExamPublisher.java内容如下:
package com.exam.
import org.springframework.jms.core.JmsT
import org.springframework.jms.core.MessageC
import javax.jms.D
import javax.jms.JMSE
import javax.jms.M
import javax.jms.S
import java.util.R
* Created by nil on .
public class ExamPublisher {
private JmsTemplate jmsT
private Des
public ExamPublisher(){}
public ExamPublisher(JmsTemplate jmsTemplate,Destination destination){
this.jmsTemplate = jmsT
this.destination =
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsT
public void setDestination(Destination destination) {
this.destination =
public void sendMessage(final String msg){
jmsTemplate.send(destination,new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(&test message & + msg);
五.单元测试(测试前不要忘了先启动activemq).ExamPublisherTest.java内容如下:
package com.exam.
import com.exam.config.AppC
import org.junit.T
import org.junit.runner.RunW
import org.springframework.test.context.ContextC
import org.springframework.test.context.junit4.SpringJUnit4ClassR
import org.springframework.test.context.support.AnnotationConfigContextL
import javax.annotation.R
* Created by nil on .
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(loader=AnnotationConfigContextLoader.class,classes={AppConfig.class})
public class ExamPublisherTest {
private ExamPublisher examP
public void testSendMessage(){
examPublisher.sendMessage(&x123x&);
六.应用到Web
1.JmsController.java内容如下:
package com.exam.
import com.exam.jms.ExamP
import org.springframework.stereotype.C
import org.springframework.web.bind.annotation.RequestM
import javax.annotation.R
@Controller
public class JmsController {
& & @Resource
& & private ExamPublisher examP
& & @RequestMapping(&/test&)
& & public String test(String message){
& & & & examPublisher.sendMessage(message);
& & & & return &redirect:/index.html&;
2.index.html内容如下:
&!DOCTYPE html&
&meta charset=&UTF-8&&
&title&Insert title here&/title&
&form action=&test.do&&
& & &input type=&text& name=&message& value=&test&/&
& & &input type=&submit& value=&submit&/&
3.启动jetty,在浏览器打开index.html页面测试(测试前不要忘了先启动activemq).
可能遇到的问题.当停止jetty或tomcat之后,activemq会报下面的警告.警告的原因估计是:客户端(包括Producer和Consumer)没有正确断开连接,上面的例子就是从PooledConnectionFactory生成多少个活跃的连接,就会有多少个警告,(PooledConnectionFactory的默认最大连接数是1,当然注册Bean时可以改).可能是直接断了进程,没有调用stop方法,从输出的警告,我认为问题不大.
WARN | Transport Connection to: tcp://127.0.0.1:65279 failed: java.net.SocketException: Connection reset
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:460667次
积分:6090
积分:6090
排名:第2656名
原创:160篇
转载:52篇
评论:100条
(4)(4)(4)(15)(5)(4)(7)(8)(4)(3)(1)(3)(5)(10)(10)(9)(14)(2)(12)(11)(5)(8)(8)(9)(12)(1)(3)(14)(1)(4)(11)

我要回帖

更多关于 activemq异步发送消息 的文章

 

随机推荐