如何查看storm中从kafka和storm的整合中接收数据量

storm集成kafka的应用,从kafka读取,写入kafka
                                                      by 小闪电
  storm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少。对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算。下面是一个简单的示例实现从kafka读取数据,并写入到kafka,以此来掌握storm与kafka之间的交互。
  实质上就是storm的kafkaspout作为一个consumer,kafkabolt作为一个producer。
  框图如下:
&        
  建立一个maven项目,将storm,kafka,zookeeper的外部依赖叠加起来。
&?xml version="1.0" encoding="UTF-8"?&
&project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&
&modelVersion&4.0.0&/modelVersion&
&groupId&org.tony&/groupId&
&artifactId&storm-example&/artifactId&
&version&1.0-SNAPSHOT&/version&
&dependencies&
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-core&/artifactId&
&version&0.9.3&/version&
&!--&scope&provided&/scope&--&
&/dependency&
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-kafka&/artifactId&
&version&0.9.3&/version&
&!--&scope&provided&/scope&--&
&/dependency&
&dependency&
&groupId&com.google.protobuf&/groupId&
&artifactId&protobuf-java&/artifactId&
&version&2.5.0&/version&
&/dependency&
&!-- storm-kafka模块需要的依赖 --&
&dependency&
&groupId&org.apache.curator&/groupId&
&artifactId&curator-framework&/artifactId&
&version&2.5.0&/version&
&exclusions&
&exclusion&
&groupId&log4j&/groupId&
&artifactId&log4j&/artifactId&
&/exclusion&
&exclusion&
&groupId&org.slf4j&/groupId&
&artifactId&slf4j-log4j12&/artifactId&
&/exclusion&
&/exclusions&
&/dependency&
&!-- kafka --&
&dependency&
&groupId&org.apache.kafka&/groupId&
&artifactId&kafka_2.10&/artifactId&
&version&0.8.1.1&/version&
&exclusions&
&exclusion&
&groupId&org.apache.zookeeper&/groupId&
&artifactId&zookeeper&/artifactId&
&/exclusion&
&exclusion&
&groupId&log4j&/groupId&
&artifactId&log4j&/artifactId&
&/exclusion&
&/exclusions&
&/dependency&
&/dependencies&
&repositories&
&repository&
&id&central&/id&
&url&http://repo1.maven.org/maven2/&/url&
&snapshots&
&enabled&false&/enabled&
&/snapshots&
&releases&
&enabled&true&/enabled&
&/releases&
&/repository&
&repository&
&id&clojars&/id&
&url&https://clojars.org/repo/&/url&
&snapshots&
&enabled&true&/enabled&
&/snapshots&
&releases&
&enabled&true&/enabled&
&/releases&
&/repository&
&repository&
&id&scala-tools&/id&
&url&http://scala-tools.org/repo-releases&/url&
&snapshots&
&enabled&true&/enabled&
&/snapshots&
&releases&
&enabled&true&/enabled&
&/releases&
&/repository&
&repository&
&id&conjars&/id&
&url&http://conjars.org/repo/&/url&
&snapshots&
&enabled&true&/enabled&
&/snapshots&
&releases&
&enabled&true&/enabled&
&/releases&
&/repository&
&/repositories&
&groupId&org.apache.maven.plugins&/groupId&
&artifactId&maven-compiler-plugin&/artifactId&
&version&3.1&/version&
&configuration&
&source&1.6&/source&
&target&1.6&/target&
&encoding&UTF-8&/encoding&
&showDeprecation&true&/showDeprecation&
&showWarnings&true&/showWarnings&
&/configuration&
&artifactId&maven-assembly-plugin&/artifactId&
&configuration&
&descriptorRefs&
&descriptorRef&jar-with-dependencies&/descriptorRef&
&/descriptorRefs&
&manifest&
&mainClass&&/mainClass&
&/manifest&
&/archive&
&/configuration&
&executions&
&execution&
&id&make-assembly&/id&
&phase&package&/phase&
&goal&single&/goal&
&/execution&
&/executions&
&/plugins&
&/project&
3 kafkaspout的消费逻辑,修改MessageScheme类,其中定义了俩个字段,key和message,方便分发到kafkabolt。代码如下
package com.tony.storm_kafka.
import java.io.UnsupportedEncodingE
import java.util.L
import backtype.storm.spout.S
import backtype.storm.tuple.F
import backtype.storm.tuple.V
*author: hi
*public class MessageScheme{ }
public class MessageScheme implements Scheme {
public List&Object& deserialize(byte[] arg0) {
String msg = new String(arg0, "UTF-8");
String msg_0 = "hello";
return new Values(msg_0,msg);
catch (UnsupportedEncodingException
// TODO: handle exception
e.printStackTrace();
return null;
public Fields getOutputFields() {
return new Fields("key","message");
4.编写topology主类,配置kafka,提交topology到storm的代码,其中kafkaspout的zkhost有动态和静态俩种配置,尽量使用动态自寻的方式。
package org.tony.
import backtype.storm.C
import backtype.storm.LocalC
import backtype.storm.StormS
import backtype.storm.generated.AlreadyAliveE
import backtype.storm.generated.InvalidTopologyE
import backtype.storm.generated.StormT
import backtype.storm.spout.SchemeAsMultiS
import backtype.storm.topology.BasicOutputC
import backtype.storm.topology.OutputFieldsD
import backtype.storm.topology.TopologyB
import backtype.storm.topology.base.BaseBasicB
import backtype.storm.tuple.T
import storm.kafka.BrokerH
import storm.kafka.KafkaS
import storm.kafka.SpoutC
import storm.kafka.ZkH
import storm.kafka.trident.TridentKafkaS
import java.util.A
import java.util.P
import org.tony.storm_kafka.bolt.ToKafkaB
import com.tony.storm_kafka.util.MessageS
public class KafkaBoltTestTopology {
//配置kafka spout参数
public static String kafka_zk_port = null;
public static String topic = null;
public static String kafka_zk_rootpath = null;
public static BrokerHosts brokerH
public static String spout_name = "spout";
public static String kafka_consume_from_start = null;
public static class PrinterBolt extends BaseBasicBolt {
private static final long serialVersionUID = 2566580L;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("-----"+(tuple.getValue(1)).toString());
public StormTopology buildTopology(){
//kafkaspout 配置文件
kafka_consume_from_start = "true";
kafka_zk_rootpath = "/kafka08";
String spout_id = spout_
brokerHosts = new ZkHosts("192.168.201.190:.201.191:.201.192:2191", kafka_zk_rootpath+"/brokers");
kafka_zk_port = "2191";
      
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, "testfromkafka", kafka_zk_rootpath, spout_id);
spoutConf.scheme = new SchemeAsMultiScheme(new MessageScheme());
spoutConf.zkPort = Integer.parseInt(kafka_zk_port);
spoutConf.zkRoot = kafka_zk_
spoutConf.zkServers = Arrays.asList(new String[] {"10.9.201.190", "10.9.201.191", "10.9.201.192"});
//是否從kafka第一條數據開始讀取
if (kafka_consume_from_start == null) {
kafka_consume_from_start = "false";
boolean kafka_consume_frome_start_b = Boolean.valueOf(kafka_consume_from_start);
if (kafka_consume_frome_start_b != true && kafka_consume_frome_start_b != false) {
System.out.println("kafka_comsume_from_start must be true or false!");
System.out.println("kafka_consume_from_start: " + kafka_consume_frome_start_b);
spoutConf.forceFromStart=kafka_consume_frome_start_b;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConf));
builder.setBolt("forwardToKafka", new ToKafkaBolt&String, String&()).shuffleGrouping("spout");
return builder.createTopology();
public static void main(String[] args) {
KafkaBoltTestTopology kafkaBoltTestTopology = new KafkaBoltTestTopology();
StormTopology stormTopology = kafkaBoltTestTopology.buildTopology();
Config conf = new Config();
//设置kafka producer的配置
Properties props = new Properties();
props.put("metadata.broker.list", "192.10.43.150:9092");
props.put("producer.type","async");
props.put("request.required.acks", "0"); // 0 ,-1 ,1
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
conf.put("topic","testTokafka");
if(args.length & 0){
// cluster submit.
StormSubmitter.submitTopology("kafkaboltTest", conf, stormTopology);
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
new LocalCluster().submitTopology("kafkaboltTest", conf, stormTopology);
5 示例结果,testfromkafka topic里面的数据可以通过另外写个类来进行持续的生产。
  topic testfromkafka的数据
  topic testTokafka的数据
阅读(...) 评论()后使用快捷导航没有帐号?
查看: 8109|回复: 10
storm笔记 -- 与kafka的集成
高级会员, 积分 505, 距离下一级还需 495 积分
论坛徽章:23
storm与kafka的结合,即前端的采集程序将实时数据源源不断采集到队列中,而storm作为消费者拉取计算,是典型的应用场景。因此,storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供storm应用使用。这里结合自己的应用做个简单总结。
&&由于storm已经提供了storm-kafka,因此可以直接使用,使用kafka的低级api读取数据。如果有需要的话,自己实现也并不困难。使用方法如下:
// 设置kafka的zookeeper集群BrokerHosts hosts = new ZkHosts(&10.1.80.249:.80.250:.80.251:2181/kafka&);// 初始化配置信息SpoutConfig conf = new SpoutConfig(hosts, &topic&, &/zkroot&,&topo&);// 在topology中设置spoutbuilder.setSpout(&kafka-spout&, new KafkaSpout(conf));
&&这里需要注意的是,spout会根据config的后面两个参数在zookeeper上为每个kafka分区创建保存读取偏移的节点,如:/zkroot/topo/partition_0。默认情况下,spout下会发射域名为bytes的binary数据,如果有需要,可以通过设置schema进行修改。
&&如上面所示,使用起来还是很简单的,下面简单的分析一下实现细节。
&&(1) 初始化:
/**& & KafkaSpout.open**/// 初始化用于读写zookeeper的客户端对象_state Map stateConf = new HashMap(conf);stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);_state = new ZkState(stateConf);// 初始化用于读取kafka数据coordinator,真正数据读取使用的是内部的PartitionManager_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);& &(2) 读取数据:
/**& & KafkaSpout.nextTuple**/// 通过各个分区对应的PartitionManager读取数据 List&PartitionManager& managers = _coordinator.getMyManagedPartitions();for (int i = 0; i & managers.size(); i++) {& && && && &// in case the number of managers decreased& && && && &_currPartitionIndex = _currPartitionIndex % managers.size();& && && &&&// 调用manager的next方法读取数据并emit& && && &&&EmitState state = managers.get(_currPartitionIndex).next(_collector);}// 提交读取到的位置到zookeeperlong now = System.currentTimeMillis();if((now - _lastUpdateMs) & _spoutConfig.stateUpdateIntervalMs) {& & commit();}
(3) ack和fail:
/**& & KafkaSpout.ack**/KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {& & //调用PartitionManager的ack& & m.ack(id.offset);}/**& & KafkaSpout.fail**/KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {& & //调用PartitionManager的fail& & m.fail(id.offset);}
& &可以看出,主要的逻辑都在PartitionManager这个类中。下面对它做个简单的分析:
&&(1) 构造:
//从zookeeper中读取上一次的偏移Map&Object, Object& json = _state.readJSON(path);//根据当前时间获取一个偏移Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);//maxOffsetBehind为两个偏移的最大范围,如果超过这个范围,则用最新偏移覆盖读取偏移,两个偏移间的数据会被丢弃。如果不希望这样,应该将它设置成一个较大的值或者MAX_VALUEif (currentOffset - _committedTo & spoutConfig.maxOffsetBehind || _committedTo &= 0) {& & _committedTo = currentO}//初始化当前偏移_emittedToOffset = _committedTo;& &
&&(2) next和fill:
/**& & PartitionManager.next**///调用fill填充待发送队列if (_waitingToEmit.isEmpty()) {& & fill();}//发送数据while (true) {& & MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();& & Iterable&List&Object&& tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);& & if (tups != null) {& && &&&for (List&Object& tup : tups) {& && && && &collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));& && &&&}& && &&&& & } else {& && &&&ack(toEmit.offset);& & }}/**& & PartitionManager.fill**///初始化当前偏移,读取消息if (had_failed) {& & //先处理失败的偏移& & offset = failed.first();} else {& & offset = _emittedToO}ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);for (MessageAndOffset msg : msgs) {& & final Long cur_offset = msg.offset();& & if (cur_offset & offset) {& && &&&// Skip any old offsets.& && &&&& & }& & if (!had_failed || failed.contains(cur_offset)) {& && &&&numMessages += 1;& && &&&//将偏移添加到pending中& && &&&_pending.add(cur_offset);& && &&&//将消息添加到待发送中& && &&&_waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));& && &&&_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);& && &&&if (had_failed) {& && && && &failed.remove(cur_offset);& && &&&}& & }}& &
&&(3) ack和fail
/**& & PartitionManager.ack**///从_pending中移除该偏移,如果该偏移与当前偏移的差大于maxOffsetBehind,则清空pendingif (!_pending.isEmpty() && _pending.first() & offset - _spoutConfig.maxOffsetBehind) {& & // Too many things pending!& & _pending.headSet(offset).clear();} else {& & _pending.remove(offset);}numberAcked++;/**&&PartitionManager.fail**///将偏移添加到失败队列中failed.add(offset);numberFailed++;
& &最后,加上一张图做个总结:
中级会员, 积分 451, 距离下一级还需 49 积分
论坛徽章:11
学习一下& && && && && && && && && && && && && && && && && && && && && && && &
高级会员, 积分 788, 距离下一级还需 212 积分
论坛徽章:24
谢谢楼主分享这么好的资料
金牌会员, 积分 2575, 距离下一级还需 425 积分
论坛徽章:17
排版不行, 看的有点眼花
中级会员, 积分 481, 距离下一级还需 19 积分
论坛徽章:8
学习了。多谢楼主热心分享。
新手上路, 积分 21, 距离下一级还需 29 积分
论坛徽章:8
排版实在是太乱,看不下去。。。
中级会员, 积分 427, 距离下一级还需 73 积分
论坛徽章:15
mark~~~~~~~~~~~~~~~~~~~
中级会员, 积分 345, 距离下一级还需 155 积分
论坛徽章:22
storm on yarn下载地址:
下载好的storm-yarn-master.zip 得放到linux进行解压,在windows解压的话,lib目录下的软连接会丢失
1:storm on yarn 需要编译
unzip storm-yarn-master.zip
然后进入 storm-yarn-master 目录通过如下命令进行编译
mvn package -DskipTests
注意:我用的maven-3.1.1 和jdk1.7.0_45
编译好后解压storm-yarn-master/lib/storm-0.9.0-wip21.zip,得到 storm-0.9.0-wip21目录。
将得到 storm-0.9.0-wip21 目录移动到 和 storm-yarn-master同级。
最终目录是
/usr/local/storm/storm-yarn-master
/usr/local/storm/storm-0.9.0-wip21
然后配置storm的启停用户的环境变量
vim ~/.basarc
添加如下环境变量
export JAVA_HOME=/usr/java/jdk1.7.0_45
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export STORM_WORK=/opt/storm
export STORM_HOME=$STORM_WORK
export PATH=$PATH:$STORM_WORK/storm-yarn-master/bin:$STORM_WORK/storm-0.9.0-wip21/bin
export HADOOP_INSTALL=/opt/hadoop
export HADOOP_HOME=$HADOOP_INSTALL
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
2:将编译好后的storm-yarn-master/lib/storm.zip 添加进hdfs中,可以通过如下命令
hadoop fs -mkdir -p /lib/storm/0.9.0-wip21
hadoop fs -moveFromLocal storm.zip /lib/storm/0.9.0-wip21
3:需要在hdfs里面添加storm工作目录,并将storm工作目录的所有者设置为storm job提交用户
hadoop fs -mkdir -p /user/storm
hadoop fs -chown storm /user/storm
4:确保storm的启停用户配置环境变量如下:
export JAVA_HOME=/usr/java/jdk1.7.0_45
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export STORM_WORK=/usr/local/storm
export STORM_HOME=$STORM_WORK
export PATH=$PATH:$STORM_WORK/storm-yarn-master/bin:$STORM_WORK/storm-0.9.0-wip21/bin
export HADOOP_INSTALL=/usr/local/hadoop
export HADOOP_HOME=$HADOOP_INSTALL
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
5:启动storm
storm-yarn launch /usr/local/storm/storm-0.9.0-wip21/conf/master.yaml
6:关闭storm
storm-yarn shutdown -appId application_2_0143 -output ~/.storm/storm.yaml
7:运行wordcount
storm jar /opt/storm/storm-yarn-master/lib/myStorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.myStorm.App WordCountTopology -c nimbus.host=192.168.109.241
其中nimbus.host是你提交storm到yarn后,yarn会给你分配一个地址,你得去自己找哦。
总结,有时候发现supervisor启动不来,会发现是内存资源不够。在虚拟机环境的同志要注意这点哦。
新手上路, 积分 28, 距离下一级还需 22 积分
论坛徽章:0
做个记号,下次好找!
新手上路, 积分 23, 距离下一级还需 27 积分
论坛徽章:4
感谢LZ分享。。。最好排下版【storm-kafka】storm和kafka结合处理流式数据 - 博客频道 - CSDN.NET
随性而简约,为技术而生.
分类:stormelasticsearch
首先简单描述下storm
Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm经常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理非常简单,而且,在同类的流式计算工具,Storm的性能也是非常出众的。
Kafka是一种高吞吐量的分布式发布订阅消息系统。
最近项目中有个需求是需要从kafka中将订阅到的流式数据实时持久化到elasticsearch和accumulo中。这里主要记录的是关于kafka和storm的整合。关于zookeeper,kafka,storm,elasticsearch等的安装可以网上找。
首先就是配置maven
&org.apache.storm&
&storm-core&
&provided&
&org.apache.storm&
&storm-kafka&
&commons-lang3&
&org.apache.kafka&
&kafka_2.9.2&
&org.apache.zookeeper&
&zookeeper&
&org.slf4j&
&slf4j-log4j12&
&org.elasticsearch&
&elasticsearch&
&org.slf4j&
&slf4j-log4j12&
&maven-compiler-plugin&
&maven-assembly-plugin&
&jar-with-dependencies&
由于storm-kafka已经实现了spout,我们直接用就可以。
public class FilterBolt extends BaseRichBolt{
private OutputC
* 初始化工作
public void prepare(Map map, TopologyContext context, OutputCollector collector){
this.collector =
* 执行逻辑,目的是过滤无用的字符串
public void execute(Tuple input){
String str = input.getString(0);
if(StringUtils.isNotBlank(str)){
String [] lines = str.split("\n");
for(String line : lines){
if(StringUtils.isBlank(line) || line.charAt(0) == '#'){
collector.emit(new Values(line));
collector.ack(input);
collector.fail(input);
* 申明传入到一个Bolt的字段名称
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("line"));
下面是转换解析string,生成json,并将json保存到elasticsearch中。
public class TransferBolt extends BaseRichBolt{
private Logger LOG = LoggerFactory.getLogger(TransferBolt.class);
private OutputC
public void prepare(Map map, TopologyContext context, OutputCollector collector){
this.collector =
public void execute(Tuple input){
String line = input.getString(0);
JSONObject json = JSONObject.toJson(line);
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest = new IndexRequest("test","element",json.getString("id")).source(json.getJSONObject("source").toString());
bulkRequest.add(indexRequest);
BulkResponse response = client.bulk(bulkRequest).actionGet();
client.admin().indices().prepareRefresh("test").execute().actionGet();
public class KafkaTopology{
public static void main(String[] args) throws Exception{
String zks = PropertiesUtils.getString(KafkaProperties.ZK_HOSTS)
String topic = PropertiesUtils.getString(KafkaProperties.TOPIC)
String zkRoot = PropertiesUtils.getString(KafkaProperties.ZK_ROOT)
String id = PropertiesUtils.getString(KafkaProperties.STORM_ID)
BrokerHosts brokerHosts = new ZkHosts(zks)
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,topic,zkRoot,id)
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme())
spoutConfig.zkServers = Arrays.asList(PropertiesUtil.getString(KafkaProperties.ZK_SERVERS).split(","))
spoutConfig.zkPort = PropertiesUtil.getInt(KafkaProperties.ZK_PORT)
TopologyBuilder builder = new TopologyBuilder()
builder.setSpout("kafka-reader",new KafkaSpout(spoutConfig),1)
builder.setBolt("filter-bolt",new FilterBolt(),1).shuffleGrouping("kafka-reader")
builder.setBolt("input-line",new TransferBolt(),1).shuffleGrouping("reader-input")
Config config = new Config()
String name = KafkaTopology.class.getSimpleName()
config.setNumWorkers(PropertiesUtil.getInt(KafkaProperties.NUM_WORKERS))
StormSubmitter.submitTopologyWithProgressBar(name,config,builder.createTopology())
排名:千里之外
与 我 联 系
新 浪 微 博
(11)(3)(1)(9)(1)(2)(1)(12)(4)(1)(3)(1)(7)(2)(2)(1)(0)(1)(4)(1)(1)(1)(2)(1)(1)(1)(3)(3)(1)(1)(4)(1)(1)(1)(0)(1)(0)(1)(1)(1)

我要回帖

更多关于 storm读取kafka数据 的文章

 

随机推荐