什么时候使用Apache camel的dozer组件使用

Apache Camel快速入门(中)-架构
您可以捐助,支持我们的公益事业。
每天15篇文章
不仅获得谋生技能
更可以追随信仰
Camel快速入门(中)
486 次浏览
&&&&评价:
本文来自于csdn,文章一是补上个文章漏掉的内容,二有讲解了Routing路由条件,分文好几个方面,详细请看下文:
3-1-2、特殊的Endpoint Direct
Endpoint Direct用于在两个编排好的路由间实现Exchange消息的连接,上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Direct连接的下一个路由起始位置(http://camel.apache.org/direct.html)。注意,两个被连接的路由一定要是可用的,并且存在于同一个Camel服务中。以下的例子说明了Endpoint
Direct的简单使用方式。
package com.yinwenjie.test.cameltest.
import org.apache.camel.builder.RouteB
import org.apache.camel.impl.DefaultCamelC
import org.apache.camel.model.ModelCamelC
* 测试两个路由的连接
* @author yinwenjie
public class DirectCamel {
public static void main(String[] args) throws
Exception {
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
// 首先将两个完整有效的路由注册到Camel服务中
camelContext.addRoutes((new DirectCamel()).new
DirectRouteA());
camelContext.addRoutes((new DirectCamel()).new
DirectRouteB());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (DirectCamel.class) {
DirectCamel.class.wait();
* DirectRouteA 其中使用direct 连接到 DirectRouteB
* @author yinwenjie
public class DirectRouteA extends RouteBuilder
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
public void configure() throws Exception {
from(&jetty:http://0.0.0.0:8282/directCamel&)
// 连接路由:DirectRouteB
.to(&direct:directRouteB&)
.to(&log:DirectRouteA?showExchangeId=true&);
* @author yinwenjie
public class DirectRouteB extends RouteBuilder
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
public void configure() throws Exception {
from(&direct:directRouteB&)
.to(&log:DirectRouteB?showExchangeId=true&);
以上代码片段中,我们编排了两个可用的路由(尽管两个路由都很简单,但确实是两个独立的路由)命名为DirectRouteA和DirectRouteB。其中DirectRouteA实例在最后一个Endpoint控制端点(direct:directRouteB)中使用Endpoint
Direct将Exchange消息发送到DirectRouteB实例的开始位置。以下是控制台输出的内容:
[ 09:54:38]
INFO qtp Exchange[Id: ID-yinwenjie-240--1,
ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache,
Body: [Body is instance of org.apache.camel.StreamCache]]
(MarkerIgnoringBase.java:96)
[ 09:54:38] INFO qtp
Exchange[Id: ID-yinwenjie-240--1,
ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache,
Body: [Body is instance of org.apache.camel.StreamCache]]
(MarkerIgnoringBase.java:96)
从以上执行效果我们可以看到,被连接的两个路由使用的Exchange对象是同一个,也就是说在DirectRouteB路由中如果Exchange对象中的内容发生了变化就会在随后继续执行的DirectRouteA路由中产生影响。Endpoint
Direct元素在我们实际使用Camel进行路由编排时,应用频度非常高。因为它可以把多个已编排好的路由按照业务要求连接起来,形成一个新的路由,保持原有路由的良好重用。
========================================(增补完)
3-3、Processor 处理器
Camel中另一个重要的元素是Processor处理器,它用于接收从控制端点、路由选择条件又或者另一个处理器的Exchange中传来的消息信息,并进行处理。Camel核心包和各个Plugin组件都提供了很多Processor的实现,开发人员也可以通过实现org.apache.camel.Processor接口自定义处理器(后者是通常做法)。
既然是做编码,那么我们自然可以在自定义的Processor处理器中做很多事情。这些事情可能包括处理业务逻辑、建立数据库连接去做业务数据存储、建立和某个第三方业务系统的RPC连接,但是我们一般不会那样做――那是Endpoint的工作。Processor处理器中最主要的工作是进行业务数据格式的转换和中间数据的临时存储。这样做是因为Processor处理器是Camel编排的路由中,主要进行Exchange输入输出消息交换的地方。
不过开发人员当然可以在Processor处理器中连接数据库。例如开发人员需要根据上一个Endpoint中携带的“订单编号前缀”信息,在Processor处理器中连接到一个独立的数据库中(或者缓存服务中)查找其对应的路由信息,以便动态决定下一个路由路径。由于Camel支持和JAVA语言的Spring框架无缝集成,所以要在Processor处理器中操作数据库只需要进行非常简单的配置。
以下代码片段是自定义的Processor处理器实现,其中的process(Exchange exchange)方法是必须进行实现的:
// 一个自定义处理器的实现
// 就是我们上文看到过的处理器实现了
public class OtherProcessor implements Processor
public void process(Exchange exchange) throws
Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
//===============
// 您可以在这里进行数据格式转换
// 并且将结果存储到out message中
//===============
// 存入到exchange的out区域
if(exchange.getPattern() == ExchangePattern.InOut)
Message outMessage = exchange.getOut();
outMessage.setBody(body + & || other out&);
注意,处理器Processor是和控制端点平级的概念。要看一个URI对应的实现是否是一个控制端点,最根本的就是看这个实现类是否实现了org.apache.camel.Endpoint接口;而要看一个路由中的元素是否是Processor处理器,最根本的就是看这个类是否实现了org.apache.camel.Processor接口。
3-5、Routing路由条件
在控制端点和处理器之间、处理器和处理器之间,Camel允许开发人员进行路由条件设置。例如开发人员可以拥有当Exchange
In Message的内容为A的情况下将消息送入下一个处理器A,当Exchange In Message的内容为B时将消息送入下一个处理器B的处理能力。又例如,无论编排的路由中上一个元素的处理消息如何,都将携带消息的Exchange对象复制
多份,分别送入下一处理器X、Y、Z。开发人员甚至还可以通过路由规则完成Exchange到多个Endpoint的负载传输。
Camel中支持的路由规则非常丰富,包括:Message Filter、Based
Router、Dynamic Router、Splitter、Aggregator、Resequencer等等。在Camel的中使用了非常形象化的图形来表示这些路由功能.
实际上EIP规则中所描述的大部分业务集成模式,在以上页面都能找到对应的图形化表达。但由于篇幅和本专题的中心思想限制,恕笔者不能对Camel中的路由规则逐一讲解。这里我们选取两个比较有代表性的路由规则进行讲解:Content
Based Router和Recipient List。希望对各位读者理解Camel中的路由规则有所帮助:
3-5-1、Content Based Router 基于内容的路由
把Content Based Router译为基于内容的路由,笔者觉得更为贴切(并不能译作基本路由,实际上你无法定义什么是基本路由)。它并不是一种单一的路由方式,而是多种基于条件和判断表达式的路由方式。其中可能包括choice语句/方法、when语句/方法、otherwise语句/方法。请看以下示例:
package com.yinwenjie.test.cameltest.
* 使用条件选择进行路由编排
* @author yinwenjie
public class ChoiceCamel extends RouteBuilder
public static void main(String[] args) throws
Exception {
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
// 将我们编排的一个完整消息路由过程,加入到上下文中
camelContext.addRoutes(new ChoiceCamel());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (ChoiceCamel.class) {
ChoiceCamel.class.wait();
public void configure() throws Exception {
// 这是一个JsonPath表达式,用于从http携带的json信息中,提取orgId属性的值
JsonPathExpression jsonPathExpression = new
JsonPathExpression(&$.data.orgId&);
jsonPathExpression.setResultType(String.class);
// 通用使用http协议接受消息
from(&jetty:http://0.0.0.0:8282/choiceCamel&)
// 首先送入HttpProcessor,
// 负责将exchange in Message Body之中的stream转成字符串
// 当然,不转的话,下面主要的choice操作也可以运行
// HttpProcessor中的实现和上文代码片段中的一致,这里就不再重复贴出
.process(new HttpProcessor())
// 将orgId属性的值存储 exchange in Message的header中,以便后续进行判断
.setHeader(&orgId&, jsonPathExpression)
// 当orgId == yuanbao,执行OtherProcessor
// 当orgId == yinwenjie,执行OtherProcessor2
// 其它情况执行OtherProcessor3
.when(header(&orgId&).isEqualTo(&yuanbao&))
.process(new OtherProcessor())
.when(header(&orgId&).isEqualTo(&yinwenjie&))
.process(new OtherProcessor2())
.otherwise()
.process(new OtherProcessor3())
.endChoice();
* 这个处理器用来完成输入的json格式的转换
* 和上一篇文章出现的HttpProcessor 内容基本一致。就不再贴出了
* @author yinwenjie
public class HttpProcessor implements Processor
* 另一个处理器OtherProcessor
* @author yinwenjie
public class OtherProcessor implements Processor
public void process(Exchange exchange) throws
Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
// 存入到exchange的out区域
if(exchange.getPattern() == ExchangePattern.InOut)
Message outMessage = exchange.getOut();
outMessage.setBody(body + & || 被OtherProcessor处理&);
* 很简单的处理器OtherProcessor2
* 和OtherProcessor基本相同,就不再重复贴出
* @author yinwenjie
public class OtherProcessor2 implements Processor
outMessage.setBody(body + & || 被OtherProcessor2处理&);
* 很简单的处理器OtherProcessor3
* 和OtherProcessor基本相同,就不再重复贴出
* @author yinwenjie
public class OtherProcessor3 implements Processor
outMessage.setBody(body + & || 被OtherProcessor3处理&);
以上代码片段中,开发人员首先使用JsonPath表达式,从Http中携带的json信息中寻找到orgId这个属性的值,然后将这个值存储在Exchange的header区域(这样做只是为了后续方便判断,您也可以将值存储在Exchange的properties区域,还可以直接使用JsonPath表达式进行判断)
。接下来,通过判断存储在header区域的值,让消息路由进入不同的Processor处理器。由于我们设置的from-jetty-endpoint中默认的Exchange
Pattern值为InOut,所以在各个Processor处理器中完成处理后 Out Message的Body内容会以Http响应结果的形式返回到from-jetty-endPoint中。最后我们将在测试页面上看到Processor处理器中的消息值。
Camel中支持绝大多数被开发人员承认和使用的表达式:正则式、XPath、JsonPath等。如果各位读者对JsonPath的语法还不熟悉的话,可以参考Google提供的说明文档(https://code.google.com/p/json-path/)。为了测试以上代码片段的工作效果,我们使用Postman工具向指定的地址发送一段json信息,并观察整个路由的执行效果。如下图所示:
当orgId的值为yuanbao时的执行效果
当orgId的值为yinwenjie时的执行效果
关于路由判断,Camel中提供了丰富的条件判断手段。除了我们在本小节中使用的isEqualTo方式还包括:isGreaterThan、isGreaterThanOrEqualTo、isLessThan、isLessThanOrEqualTo、isNotEqualTo、in(多个值)、contains、regex等等,它们的共同点是这些方法都返回某个实现了org.apache.camel.Predicate接口的类。
3-5-2、Recipient List 接收者列表
在本小节上部分的介绍中,我们说明了怎么使用条件判断向若干可能的路由路径中的某一条路径传送消息。那么如何做到根据判断条件,向若干可能的路径中的其中多条路径传送同一条消息呢?又或者向若干条可能的路径全部传输同一条消息呢?
在Camel中可能被选择的消息路由路径称为接收者,Camel提供了多种方式向路由中可能成为下一处理元素的多个接收者发送消息:静态接收者列表(Static
Recipient List)、动态接收者列表(Dynamic Recipient List)和 循环动态路由(Dynamic
Router)。下面我们对这几种接收者列表形式进行逐一讲解。
3-5-2-1、使用multicast处理Static Recipient List
使用multicast方式时,Camel将会把上一处理元素输出的Exchange复制多份发送给这个列表中的所有接收者,并且按顺序逐一执行(可设置为并行处理)这些接收者。这些接收者可能是通过Direct连接的另一个路由,也可能是Processor或者某个单一的Endpoint。需要注意的是,Excahnge是在Endpoint控制端点和Processor处理器间或者两个Processor处理器间唯一能够有效携带Message的元素,所以将一条消息复制多份并且让其执行不相互受到影响,那么必然就会对Exchange对象进行复制(是复制,是复制,虽然主要属性内容相同,但是这些Exchange使用的内存区域都是不一样的,ExchangeId也不一样)。以下是multicast使用的简单示例代码:
package com.yinwenjie.test.cameltest.
* 测试组播路由
* @author yinwenjie
public class MulticastCamel
extends RouteBuilder
public static void main (String[] args) throws
Exception {
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext
= new DefaultCamelContext ();
// 启动route
camelContext.start();
// 将我们编排的一个完整消息路由过程,加入到上下文中
camelContext.addRoutes (new MulticastCamel());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized
(MulticastCamel.class) {
MulticastCamel.class.wait();
public void configure()
throws Exception {
// 这个线程池用来进行multicast中各个路由线路的并发执行
ExecutorService executorService
Executors.newFixedThreadPool(10);
MulticastDefinition multicastDefinition =
from(&jetty: http://0.0.0.0:8282/multicastCamel&) .multicast();
// multicast 中的消息路由可以顺序执行也可以并发执行
// 这里我们演示并发执行
multicastDefinition .setParallelProcessing(true);
// 为并发执行设置一个独立的线程池
multicastDefinition.setExecutorService (executorService);
// 注意,multicast中各路由路径的Excahnge都是基于上一路由元素的excahnge复制而来
// 无论前者Excahnge中的Pattern如何设置,其处理结果都不会反映在最初的Excahnge对象中
multicastDefinition.to(
&log:helloworld1? showExchangeId=true&
,&log:helloworld2? showExchangeId=true&)
// 一定要使用end,否则OtherProcessor会被做为multicast中的一个分支路由
// 所以您在OtherProcessor中看到的Excahnge中的Body、Header等属性内容
// 不会有“复制的Exchange”设置的任何值的痕迹
.process(new OtherProcessor());
* 另一个处理器
* @author yinwenjie
public class OtherProcessor
implements Processor
/* (non-Javadoc)
* @ see org.apache.camel.Processo
#process (org.apache.camel.Exchange)
public void process (Exchange exchange) throws
Exception {
Message message =
exchange.getIn();
LOGGER.info(& OtherProcessor中的exchange&
+ exchange);
String body =
message.getBody().toString();
// 存入到exchange的out区域
if(exchange.getPattern()
== ExchangePattern.InOut)
Message outMessage =
exchange.getOut();
outMessage.setBody (body + & || 被OtherProcessor处理&);
以上代码片段中,我们使用multicast将原始的Exchange复制了多份,分别传送给multicast中的两个接收者,并且为了保证两个接收者的处理过程是并行的,我们还专门为multicast设置了一个线程池(不设置的话Camel将自行设置)。在以上的代码片段中,在multicast路由定义之后我们还设置了一个OtherProcessor处理器,它主要作用就是查看OtherProcessor中的Exchange对象的状态。下面的截图展示了以上代码片段的执行效果:
[ 16:07:43]
INFO pool-1-thread-7 Exchange[Id: ID-yinwenjie-240--20,
ExchangePattern: InOut, BodyType: String, Body:
{&data&:{&orgId&:&yinwenjie&},&token&:&d9c33c8f-ae59-4edf-b37f-290ff208de2e&,&desc&:&&}]
(MarkerIgnoringBase.java:96)
[ 16:07:43] INFO pool-1-thread-8
Exchange[Id: ID-yinwenjie-240--19,
ExchangePattern: InOut, BodyType: String, Body:
{&data&:{&orgId&:&yinwenjie&},&token&:&d9c33c8f-ae59-4edf-b37f-290ff208de2e&,&desc&:&&}]
(MarkerIgnoringBase.java:96)
[ 16:07:43] INFO qtp-18
OtherProcessor中的exchange [id:ID-yinwenjie-240--16]Exchange[Message:
{&data&:{&orgId&:&yinwenjie&},&token&:&d9c33c8f-ae59-4edf-b37f-290ff208de2e&,&desc&:&&}]
(MulticastCamel.java:74)
通过执行结果可以看到,在multicast中的两个接收者(两个路由分支的设定)分别在我们设置的线程池中运行,线程ID分别是【pool-1-thread-7】和【pool-1-thread-8】。在multicast中的所有路由分支都运行完成后,OtherProcessor处理器的实例在【qtp-18】线程中继续运行(jetty:http-endpint对于本次请求的处理原本就在这个线程上运行)。
请各位读者特别注意以上三句日志所输出的ExchangeId,它们是完全不同的三个Exchange实例!其中在multicast的两个路由分支中承载Message的Excahnge对象,它们的Exchange-ID号分别为【ID-yinwenjie-240--20】和【ID-yinwenjie-240--19】,来源则是multicast对原始Exchange对象的复制,原始Exchagne对象的Exchange-ID为【ID-yinwenjie-240--16】。
3-5-2-2、处理Dynamic Recipient List
在编排路由,很多情况下开发人员不能确定有哪些接收者会成为下一个处理元素:因为它们需要由Exchange中所携带的消息内容来动态决定下一个处理元素。这种情况下,开发人员就需要用到recipient方法对下一路由目标进行动态判断。以下代码示例中,我们将三个已经编排好的路由注册到Camel服务中,并通过打印在控制台上的结果观察其执行:
第一个路由 DirectRouteA
public class DirectRouteA
extends RouteBuilder {
/* (non-Javadoc)
org.apache.camel. builder.RouteBuilder #configure()
public void configure()
throws Exception {
from(&jetty: http://0.0.0.0:8282/ dynamicCamel&)
.setExchangePattern (ExchangePattern.InOnly)
.recipientList().jsonpath (&$.data.routeName&) .delimiter (&,&)
.process(new OtherProcessor());
第二个和第三个路由
* @author yinwenjie
public class DirectRouteB extends RouteBuilder
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
public void configure() throws Exception {
// 第二个路由和第三个路由的代码都相似
// 唯一不同的是类型
from(&direct:directRouteB&)
.to(&log:DirectRouteB?showExchangeId=true&);
注册到Camel服务中,并开始执行
public static void main(String[] args) throws
Exception {
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
// 将我们编排的一个完整消息路由过程,加入到上下文中
camelContext.addRoutes((new DynamicCamel()).new
DirectRouteA());
camelContext.addRoutes((new DynamicCamel()).new
DirectRouteB());
camelContext.addRoutes((new DynamicCamel()).new
DirectRouteC());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (DynamicCamel.class) {
DynamicCamel.class.wait();
DirectRouteB路由和DirectRouteC路由中的代码非常简单,就是从通过direct连接到本路由的上一个路由实例中获取并打印Exchange对象的信息。所以各位读者可以看到以上代码片段只列举了DirectRouteB的代码信息。DirectRouteA路由中“ExchangePattern.InOnly”的作用在上文中已经讲过,这里就不再进行赘述了。需要重点说明的是recipientList方法,这个方法可以像multicast方法那样进行并发执行或者运行线程池的设置,但是在DirectRouteA的代码中我们并没有那样做,这是为了让读者看清除recipientList或者multicast方法的顺序执行执行效果。以下是我们启动Camel服务后,从Postman(或者其他测试工具)传入的JSON格式的信息:
{&data&:{&routeName&:&
direct:directRouteB, direct:directRouteC&},&t
oken&:&d9c33c8f-ae59-4edf -b37f -290ff208de2e&,&desc&:&&}
recipientList方法将以 .data.routeName 中指定的路由信息动态决定一下个或者多个消息接收者,以上JSON片段中我们指定了两个“direct:directRouteB,direct:directRouteC”。那么recipientList会使用delimiter方法中设置的“,”作为分隔符来分别确定这两个接收者。
观察到的执行效果
10:31:53] INFO qtp-16 Exchange[Id: ID-yinwenjie-240--3,
ExchangePattern: InOnly, BodyType: org.apache.camel.converter.stream.InputStreamCache,
Body: [Body is instance of org.apache.camel.StreamCache]]
(MarkerIgnoringBase.java:96)
[ 10:31:53] INFO qtp-16
Exchange[Id: ID-yinwenjie-240--4,
ExchangePattern: InOnly, BodyType: org.apache.camel.converter.stream.InputStreamCache,
Body: [Body is instance of org.apache.camel.StreamCache]]
(MarkerIgnoringBase.java:96)
[ 10:31:53] INFO qtp-16
OtherProcessor中的exchange [id:ID-yinwenjie-240--1]Exchange[Message:
[Body is instance of org.apache.camel.StreamCache]]
(DynamicCamel.java:100)
静态路由和动态路由在执行效果上有很多相似之处。例如在两种路径选择方式中,路由分支上的接收者中使用的Exchange对象的来源都是对上一执行元素所输出的Exchange对象的复制,这些Exchange对象除了其中携带的业务内容相同外,ExchangeID是不一样,也就是说每个路由分支的Exchange对象都不相同。所以各路由分支的消息都不受彼此影响。另外动态路由和静态路由都支持对路由分支的顺序执行和并发执行,都可以为并发执行设置独立的线程池。
从以上执行效果中我们可以看到,由于我们没有设置动态路由是并发执行,所以各个需要执行的路由分支都是由名为【qtp-16】的Camel服务线程依次执行,并且每个路由分支的Exchange对象都不受彼此影响。另外,请注意以上执行结果的最后一条日志信息,它是在路由分支以外对OtherProcessor处理器的执行。由此可见无论路由分支如何执行,都不会影响路由分支以外的元素执行时所使用的Exchange对象。
487 次浏览
更多课程...&&&
更多咨询...&&&
每天2个文档/视频
扫描微信二维码订阅
订阅技术月刊
获得每月300个技术资源
|&京ICP备号&京公海网安备号【针对Java开发者的Apache Camel入门指南】 - 装机大神 - 博客园
posts - 63, comments - 0, trackbacks - 0, articles - 0
Apache Camel是一个非常实用的规则引擎库,能够用来处理来自于不同源的事件和信息。你可以在使用不同的协议比如VM,HTTP,FTP,JMS甚至是文件系统中来传递消息,并且让你的操作逻辑和传递逻辑保持分离,这能够让你更专注于消息的内容。
在这篇文章中,我将提供一个Java语言(非Groovy)的Apache Camel入门演示。
首先创建一个Maven项目的pom.xml。
&?xml&version="1.0"&encoding="UTF-8"?&
&project&xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd"&
&modelVersion&4.0.0&/modelVersion&
&groupId&camel-spring-demo&/groupId&
&artifactId&camel-spring-demo&/artifactId&
&version&1.0-SNAPSHOT&/version&
&packaging&jar&/packaging&
&properties&
&project.build.sourceEncoding&UTF-8&/project.build.sourceEncoding&
&camel.version&2.11.1&/camel.version&
&/properties&
&dependencies&
&dependency&
&groupId&org.apache.camel&/groupId&
&artifactId&camel-core&/artifactId&
&version&${camel.version}&/version&
&/dependency&
&dependency&
&groupId&org.slf4j&/groupId&
&artifactId&slf4j-simple&/artifactId&
&version&1.7.5&/version&
&/dependency&
&/dependencies&
&/project&
在这里我们只用到了camel-core.jar包,实际上它提供了许多你可能用到的实用组件。出于日志记录的目的,我使用了slf4j-simple来作为日志记录的实现,从而我们可以从控制台上看到输出。
接下来我们只需要构造一个路由类。路由就好比是Camel中怎样将消息从一端传递到另一端的一个指令定义。我们将会创建src/main/java/camelcoredemo/TimerRouteBuilder.java文件,每隔一秒向处理器发送一个消息,简单打印出来。
import&org.slf4j.*;
import&org.apache.camel.*;
import&org.apache.camel.builder.*;
public&class&TimerRouteBuilder&extends&RouteBuilder {
static&Logger LOG = LoggerFactory.getLogger(TimerRouteBuilder.class);
public&void&configure() {
from("timer://timer1?period=1000")
.process(new&Processor() {
public&void&process(Exchange msg) {
LOG.info("Processing {}", msg);
以上就是这个示例的全部所需,现在编译运行。
bash& mvn compile
bash& mvn exec:java -Dexec.mainClass=org.apache.camel.main.Main -Dexec.args='-r camelcoredemo.TimerRouteBuilder'
注意,这里我们并没有编写Java类的main入口,我们只是将RouteBuilder的类名当作参数简单传递给&org.apache.camel.main.Main,然后它将自动加载路由。
控制CamelContext
当启动Camel后,它会创建一个CamelContext对象,该对象拥有了很多关于如何运行Camel的信息,还包含我们所创建的Route的定义。现在如果你想通过CamelContext获得更多的控制,那么你需要编写自己的主类代码。我在这举个简单的例子。
import&org.slf4j.*;
import&org.apache.camel.*;
import&org.apache.camel.impl.*;
import&org.apache.camel.builder.*;
public&class&TimerMain {
static&Logger LOG = LoggerFactory.getLogger(TimerMain.class);
public&static&void&main(String[] args)&throws&Exception {
new&TimerMain().run();
void&run()&throws&Exception {
final&CamelContext camelContext =&new&DefaultCamelContext();
camelContext.addRoutes(createRouteBuilder());
camelContext.setTracing(true);
camelContext.start();
Runtime.getRuntime().addShutdownHook(new&Thread() {
public&void&run() {
camelContext.stop();
}&catch&(Exception e) {
throw&new&RuntimeException(e);
waitForStop();
RouteBuilder createRouteBuilder() {
return&new&TimerRouteBuilder();
void&waitForStop() {
while&(true) {
Thread.sleep(Long.MAX_VALUE);
}&catch&(InterruptedException e) {
可以看到,我们在createRouteBuilder()方法中重用了已有的TimerRouteBuilder类。现在我们的主类对在什么时候创建、启动、停止CamelContext有了完全的控制。context(camelContext)对象允许你全局性地控制如何配置Camel,而不是在Route级。它的JavaDoc链接给出了所有setter方法,你可以研究下它都可以做些什么。
注意到一点,我们也需要在我们的主类中提供少量设置代码。首先我们需要处理优雅关闭的问题,所以我们增加了一个Java关闭回调函数去调用context的stop()方法。其次在context已经启动后,我们需要增加一个线程阻塞。如果在启动后你不阻塞你的主线程,那么它会在启动后就简单的退出了,那就没啥用了。你会把Camel一直作为一个服务(就像一个服务器)运行,直至你按下CTRL+C键去终止该进程。
改善启动CamelContext的主类
如果你不想像上面例子一样过多的处理主类设置代码,那么你可以简单地继承由camel-core提供的org.apache.camel.main.Main类作为代替。通过利用这个类,你不仅可以让你的context自动设置,还可以获得所有附加的命令行特性,比如控制进程运行多久,启用追踪,加载自定义route类等等。
重构了下上一个例子,代码如下:
import&org.slf4j.*;
import&org.apache.camel.builder.*;
import&org.apache.camel.main.M
public&class&TimerMain2&extends&Main {
static&Logger LOG = LoggerFactory.getLogger(TimerMain2.class);
public&static&void&main(String[] args)&throws&Exception {
TimerMain2 main =&new&TimerMain2();
main.enableHangupSupport();
main.addRouteBuilder(createRouteBuilder());
main.run(args);
static&RouteBuilder createRouteBuilder() {
return&new&TimerRouteBuilder();
现在TimerMain2类的代码比之前的更少了,你可以试试看,它应该和之前的功能一样。
bash& mvn compile
bash& mvn&exec:java -Dexec.mainClass=camelcoredemo.TimerMain2 -Dexec.args='-t'
注意到我们给出-t选项后,会转储Route追踪。使用-h会看到所有可用的选项。
用Camel的注册机制添加bean
在之前的TimerRouteBuilder例子中,我们已经在代码中创建了一个匿名Processor。现在如果你想将几个不同的Processor放在一起,那么使用Camel的注册机制添加bean的方式将能更好的减少代码混乱。Camel允许你通过将processing当作bean注入到它的registry space,然后你只要把它们当作bean组件来进行调用。如下是我的重构代码:
import&org.slf4j.*;
import&org.apache.camel.*;
import&org.apache.camel.builder.*;
import&org.apache.camel.main.M
public&class&TimerBeansMain&extends&Main {
static&Logger LOG = LoggerFactory.getLogger(TimerBeansMain.class);
public&static&void&main(String[] args)&throws&Exception {
TimerBeansMain main =&new&TimerBeansMain();
main.enableHangupSupport();
main.bind("processByBean1",&new&Bean1());
main.bind("processAgainByBean2",&new&Bean2());
main.addRouteBuilder(createRouteBuilder());
main.run(args);
static&RouteBuilder createRouteBuilder() {
return&new&RouteBuilder() {
public&void&configure() {
from("timer://timer1?period=1000")
.to("bean:processByBean1")
.to("bean:processAgainByBean2");
static&class&Bean1&implements&Processor {
public&void&process(Exchange msg) {
LOG.info("First process {}", msg);
static&class&Bean2&implements&Processor {
public&void&process(Exchange msg) {
LOG.info("Second process {}", msg);
现在Route类更简洁明了,同时处理代码也被重构到了独立的类中。当你需要编写很复杂的Route来实现业务逻辑时,这种方式能够帮助你更好的组织和测试你的代码。它能够让你构建像&乐高&积木那样可复用的POJO bean。Camel的registry space同样可用于其他很多用途,比如你可以自定义许多具有附加功能的endpoint组件或者注册一些信息,更或者替换线程池实现策略之内的事情。
上述Route示例是用所谓的Java DSL来构成的,它的可读性较高,你可以用IDE提供的支持查看所有可用于Route的方法。
我希望这篇文章能够帮助你跳过Camel的摸索阶段。除了已经提到的事件组件之外,camel还提供了如下组件:

我要回帖

更多关于 camel 的文章

 

随机推荐