kafkaspout使用指南 怎么处理速度慢下来

1679人阅读
storm(3)
[时间的手,翻雨覆雨了什么]
讲到了flume-ng+kakfa的安装与单点测试,在此基础上,再加入storm的单机安装,以及kafkaSpout的实例。形成一个完整的实时单机处理实例。
1.安装与启动
(1)官网下载0.10.0版本,解压
wget http://mirror./apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
tar -xvzf apache-storm-0.10.0.tar.gz
(2)修改配置文件,添加环境变量
配置文件apache-storm-0.10.0/conf/storm.yaml,单机测试无需修改
修改bash_profile,增加如下几行
export STORM_HOME=&/home/XX/apache-storm-0.10.0&
PATH=$PATH:${STORM_HOME}/bin
(3)启动集群
storm nimbus &
storm supervisor &
storm ui &
UI地址:localhost:8080/index.html
输入jps,可以看到 nimbus 和 supervisor进程,说明集群启动成功。
topology部分
package cn.
import backtype.storm.C
import backtype.storm.LocalC
import backtype.storm.StormS
import backtype.storm.generated.StormT
import backtype.storm.spout.SchemeAsMultiS
import backtype.storm.topology.TopologyB
import storm.kafka.*;
import java.util.M
* Created by maixiaohai on 16/4/21.
public class KafkaTopology {
public static int NUM_WORKERS = 1;
public static int NUM_ACKERS = 1;
public static int MSG_TIMEOUT = 180;
public static int SPOUT_PARALLELISM_HINT = 1;
public static int PARSE_BOLT_PARALLELISM_HINT = 1;
public StormTopology buildTopology(Map map) {
String zkServer = map.get(&zookeeper&).toString();
System.out.println(&zkServer: & + zkServer);
final BrokerHosts zkHosts = new ZkHosts(zkServer);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, &YOUR_KAFKA_TOPIC&, &/test&, &single-point-test&);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(&kafkaSpout&, new KafkaSpout(kafkaConfig), SPOUT_PARALLELISM_HINT);
builder.setBolt(&parseBolt&, new ParseBolt(), PARSE_BOLT_PARALLELISM_HINT).shuffleGrouping(&kafkaSpout&);
return builder.createTopology();
public static void main(String[] args) throws Exception {
System.out.println(&===========start===========&);
Map map = XmlHelper.Dom2Map(&realtime.xml&);
KafkaTopology kafkaTopology = new KafkaTopology();
StormTopology stormTopology = kafkaTopology.buildTopology(map);
Config config = new Config();
config.setNumWorkers(NUM_WORKERS);
config.setNumAckers(NUM_ACKERS);
config.setMessageTimeoutSecs(MSG_TIMEOUT);
config.setMaxSpoutPending(5000);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(&single-point-test&, config, stormTopology);
StormSubmitter.submitTopology(&single-point-test&, config, stormTopology);
realtime.xml配置
&?xml version=&1.0&?&
&?xml-stylesheet type=&text/xsl& href=&configuration.xsl&?&
&configuration&
&host&localhost&/host&
&thriftPort&6627&/thriftPort&
&zookeeper&localhost:2181&/zookeeper&
&kafka&localhost:9092&/kafka&
&/configuration&parseBolt打印传入的tuple值
package cn.
import backtype.storm.topology.BasicOutputC
import backtype.storm.topology.OutputFieldsD
import backtype.storm.topology.base.BaseBasicB
import backtype.storm.tuple.T
* Created by maixiaohai on 16/4/21.
public class ParseBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word = tuple.getString(0);
System.out.println(word);
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
dependencies
&dependencies&
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-core&/artifactId&
&version&0.10.0&/version&
&exclusions&
&exclusion&
&groupId&org.slf4j&/groupId&
&artifactId&log4j-over-slf4j&/artifactId&
&/exclusion&
&exclusion&
&groupId&org.slf4j&/groupId&
&artifactId&slf4j-log4j12&/artifactId&
&/exclusion&
&/exclusions&
&/dependency&
&dependency&
&groupId&org.apache.storm&/groupId&
&artifactId&storm-kafka&/artifactId&
&version&0.10.0&/version&
&exclusions&
&exclusion&
&groupId&org.slf4j&/groupId&
&artifactId&slf4j-log4j12&/artifactId&
&/exclusion&
&/exclusions&
&/dependency&
&dependency&
&groupId&org.apache.kafka&/groupId&
&artifactId&kafka_2.10&/artifactId&
&version&0.8.2.1&/version&
&exclusions&
&exclusion&
&groupId&org.slf4j&/groupId&
&artifactId&slf4j-log4j12&/artifactId&
&/exclusion&
&/exclusions&
&/dependency&
&!--&dependency&--&
&!--&groupId&com.googlecode.json-simple&/groupId&--&
&!--&artifactId&json-simple&/artifactId&--&
&!--&version&1.1.1&/version&--&
&!--&/dependency&--&
&/dependencies&打包后,执行storm jar kafkaTopology.jar cn.realtime.KafkaTopology
在UI上可以看到新提交的topology,在apache-storm-0.10.0/logs/目录下,会有相应的日志
结合的flume监测文件路径的方式,即agent.sources.source1.type&=&spooldir
增加对应目录下文件并保存,可以看到storm日志中打印出对应的行。代表整个流程成功。
3.注意事项
(1)过程中的一些报错
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.&init&(Ljava/lang/SILorg/apache/zookeeper/WZ)V at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeep
java.lang.NoClassDefFoundError: org/json/simple/JSONValue at storm.kafka.DynamicBrokersReader&
类似报错基本都是源于引入的maven库版本不合适,对应的jar包版本不对导致的报错。
比如报错1,代表zookeeper版本不对,找到引入zookeeper的maven库,发现是kafka_2.10的的version过低,
改为当前版本后报错1就消失了。
(2)打包时,应该去掉strom-core,否则会冲突
(3)有关kafkaSpout和storm的配置参数后面会再写文章解释
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:27400次
排名:千里之外
原创:27篇
(1)(1)(2)(2)(2)(1)(1)(3)(1)(1)(5)(1)(4)(4)(3)(3)KafkaSpout分析:配置 - 博客频道 - CSDN.NET
cache007的专栏
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutC}
SpoutConfig自KafkaConfig。由于SpoutConfig和KafkaConfig所有的instance
field全是public, 因此在使用构造方法后,可以直接设置各个域的值。
public class SpoutConfig extends KafkaConfig implements Serializable {
public List&String& zkServers = null; //记录Spout读取进度所用的zookeeper的host
public Integer zkPort = null;//记录进度用的zookeeper的端口
public String zkRoot = null;//进度信息记录于zookeeper的哪个路径下
public String id = null;//进度记录的id,想要一个新的Spout读取之前的记录,应把它的id设为跟之前的一样。
stateUpdateIntervalMs = 2000;//用于metrics,多久更新一次状态。
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
this.zkRoot = zkR
public class KafkaConfig implements Serializable {
public final BrokerH //用以获取Kafka broker和partition的信息
public final S//从哪个读取消息
public final String clientId; // SimpleConsumer所用的client id
public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
public int TimeoutMs = 10000;//与Kafka broker的连接的超时时间
public int fetchMaxWait = 10000;
//当没有新消息时,消费者会等待这些时间
public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读大小
public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
maxOffsetBehind = 100000;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime
public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics
public KafkaConfig(BrokerHosts hosts, String ) {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
this.hosts =
this.clientId = clientId;
&对Zookeeper的使用
KafkaSpout的配置中有两个地方可以用到Zookeeper
用Zookeeper来记录KafkaSpout的处理进度,在topology重新提交或者task重启后继续之前的处理进度。在SpoutConfig中的zkServers, zkPort和zkRoot与此相关。如果zkServer和zkPort没有设置,那么KafkaSpout会使用Storm集群所用的Zookeeper记录这些信息。用Zookeeper来获取Kafka中一个的所有partition,和每个partition的leader。这需要实现BrokerHosts的子类ZkHosts.但是,这个Zookeepr是可选的。如果使用BrokerHosts的另一个子类StaticHosts,把partition和leader的对应关系硬编码,则不需要Zookeeper来提供此功能。KafkaSpout会从Kafka集群使用的Zookeeper中提取partition和leader的对应关系。而且:
如果使用StatisHosts,那么KafkaSpout会使用StaticCoordinator,这个coordinator不能响应partition leader的变化。如果使用ZkHosts,那么KafkaSpout会使用ZkCoordinator, 当其refresh()方法被调用后,这个cooridnator会检查发生leader变更的partition,并为之生成新的PartitionManager.从而能够在leader变更后,继续读取消息。
影响初始读取进度的配置项
在一个topology上线后,它从哪个offset开始读取消息呢?有一些项对此有影响:
SpoutConfig中的id字段。如果想要一个topology从另一个topology之前的处理进度继续处理,它们需要有相同的id。KafkaConfig的forceFromStart字段。如果此字段设为true, 那么它一个topology上线后,它会忽略之前相同id的topology的进度,并且从Kafka中最早的消息开始处理。KafkaConfig的startOffsetTime字段。默认为kafka.api.OffsetRequest.EarliestTime()开始读,也就是从Kafka中最早的消息开始处理。也可以设成kafka.api.OffsetRequest.LatestOffset,也就是最早的消息开始读。也可以自己指定具体的值。KafkaConfig的maxOffsetBehind字段。这个字段对于KafkaSpout的多个处理流程都有影响。当提交一个新topology时,如果没有forceFromStart, 当KafkaSpout对某个partition的处理进度落后startOffsetTime对应的offset多于此值时,KafkaSpout会丢弃中间的消息,从而强制赶上目标进度.比如,如果startOffsetTime设成了lastestTime,那么如果进度落后超过maxOffsetBehind,KafkaSpout会直接从latestTime对应的offset开始处理。如果设成了froceFromStart,则在提交新任务时,始终会从EarliestTime开始读。KafkaSpout的userStartOffsetTimeIfOffsetOutOfRange字段。如果设成true,那么当fetch消息时出错,且FetchResponse显示的出错原因是OFFSET_OUT_OF_RANGE,那么就会尝试从KafkaSpout指定的startOffsetTime对应的消息开始读。例如,如果有一批消息因为超过了保存期限被Kafka删除,并且zk里记录的消息在这批被删除的消息里。如果KafkaSpout试图从zk的记录继续读,那么就会出现OFFSET_OUT_OF_RANGE的错误,从而触发这个。
实际上maxOffsetBehind有时候有点名不符实。当startOffsetTime为A, zk里的进度为B, A - B & maxOffsetBehind时,应该从A - maxOffsetBehind除开始读或许更好一些,而不是直接跳到startOffsetTime。此处的逻辑参见PartitionManager的实现。
排名:第14543名
(17)(7)(1)(1)(1)(31)(1)(26)(1)(4)(2)(5)(1)(3)(2)(3)(1)(1)(1)(2)(1)(1)(10)(6)(1)(1)(1)(1)(5)(2)(0)(1)(2)(0)(11)(1)(2)(1)(1)(1)(1)(1)(2)(1)(0)(0)使用来对接Kafka0.7.2时, 发现kafkaSpout总会进行数据重读, 配置都无问题, 也没报错
进行debug之后, 发现是由于自己写的blot继承于IBolt, 但自己没有在代码中显示的调用collector.ack(); 导致kafkaSpout一直认为emitted的数据有问题, 超时之后进行数据重发
KafkaSpout中关键代码如下:
PartitionManager.java
public void commit() {
("Committing offset for " + _partition);
long committedTo;
if(_pending.isEmpty()) {
committedTo = _emittedToO
committedTo = _pending.first();
if(committedTo!=_committedTo) {
("Writing committed offset to ZK: " + committedTo);
Map&Object, Object& data = (Map&Object,Object&)ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", committedTo)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
("Wrote committed offset to ZK: " + committedTo);
_committedTo = committedTo;
("Committed offset " + committedTo + " for " + _partition);
如果Bolt不进行ack, 则红色代码处的offsetNumber永远相等, 导致一直不进行offset的回写操作
1. IBolt中显式调用collector.ack();
2. 使用帮你封装好的BaseBasicBlot, 它会帮你自动调用ack的
关于Ack的问题, 可以参考我的翻译和官网文章:&
阅读(...) 评论()> lgscofield的博客详情
摘要: kafka实时日志处理
在《》 一文中给大家介绍来Kafka的简单示例,演示了如何编写Kafka的代码去生产数据和消费数据,今天给大家介绍如何去整合一个完整的项目,本篇博客我打 算为大家介绍Flume+Kafka+Storm的实时日志统计,由于涉及的内容较多,这里先给大家梳理一个项目的运用这些技术的流程。下面是今天的内容 目录:
下面开始今天的内容分享。
2.项目流程
在整合这套方案的时候,项目组也是经过一番讨论,在讨论中,观点很多,有人认为直接使用Storm进行实时处理,去掉Kafka环节;也有认为直接使用Kafka的API去消费,去掉Storm的消费环节等等,但是最终组内还是一致决定使用这套方案,原因有如下几点:
业务模块化
功能组件化
我们认为,Kafka在整个环节中充当的职责应该单一,这项目的整个环节她就是一个中间件,下面用一个图来说明这个原因,如下图所示:
整个项目流程如上图所示,这样划分使得各个业务模块化,功能更加的清晰明了。
Data Collection
负责从各个节点上实时收集用户上报的日志数据,我们选用的是Apache的Flume NG来实现。
Data Access
由于收集的数据的速度和数据处理的速度不一定是一致的,因此,这里添加了一个中间件来做处理,所使用的是Apache的Kafka,关于Kafka集群部署,大家可以参考我写的《》。另外,有一部分数据是流向HDFS分布式文件系统了的,方便于为离线统计业务提供数据源。
Stream Computing
在收集到数据后,我们需要对这些数据做实时处理,所选用的是Apache的Storm。关于Storm的集群搭建部署博客后面补上,较为简单。
Data Output
在使用Storm对数据做处理后,我们需要将处理后的结果做持久化,由于对相应速度要求较高,这里采用Redis+MySQL来做持久化。整个项目的流程架构图,如下图所示:
Flume 是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS等),便于收集数据。 Flume提供了丰富的日志源收集类型,有:Console、RPC、Text、Tail、Syslog、Exec等数据源的收集,在我们的日志系统中目 前我们所使用的是spooldir方式进行日志文件采集,配置内容信息如下所示:
producer.sources.s.type&=&spooldir
producer.sources.s.spoolDir&=&/home/hadoop/dir/logdfs
当然,Flume的数据发送方类型也是多种类型的,有:Console、Text、HDFS、RPC等,这里我们系统所使用的是Kafka中间件来接收,配置内容如下所示:
producer.sinks.r.type&=&org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
关于,Flume的详细搭建部署,大家可以参考我写的《》。这里就不多做赘述了。
Kafka是一种提供高吞吐量的分布式发布订阅消息系统,她的特性如下所示:
通过磁盘数据结构提供消息的持久化,这种结构对于即使数据达到TB+级别的消息,存储也能够保持长时间的稳定。
搞吞吐特性使得Kafka即使使用普通的机器硬件,也可以支持每秒数10W的消息。
能够通过Kafka Cluster和Consumer Cluster来Partition消息。
Kafka 的目的是提供一个发布订阅解决方案,他可以处理Consumer网站中的所有流动数据,在网页浏览,搜索以及用户的一些行为,这些动作是较为关键的因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于Hadoop这样的日志数据和离线计算系统,这样的方案是一个解决实时处理较好的一 种方案。
关于Kafka集群的搭建部署和使用,大家可以参考我写的:《》,这里就不多做赘述了。
Twitter将Storm开源了,这是一个分布式的、容错的实时计算系统,已被贡献到Apache基金会,下载地址如下所示:
http://storm.apache.org/downloads.html
Storm的主要特点如下:
简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
容错性。Storm会管理工作进程和节点的故障。
水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
快速。系统的设计保证了消息能得到快速的处理,使用&OMQ作为其底层消息队列。
本地模式。Storm有一个本地模式,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
Storm 集群由一个主节点和多个工作节点组成。主节点运行了一个名为&Nimbus&的守护进程,用于分配代码、布置任 务及故障检测。每个工作节 点都运行了一个名为&Supervisor&的守护进程,用于监听工作,开始并终止工作进程。Nimbus和 Supervisor都能快速失败,而且是无 状态的,这样一来它们就变得十分健壮,两者的协调工作是由Apache的ZooKeeper来完成的。
Storm 的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Spout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为 Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。在Storm Concepts页面里对这些术语有更详细的描述。
关于Storm集群的搭建部署,博客在下一篇中更新,到时候会将更新地址附在这里,这里就先不对Storm集群的搭建部署做过多的赘述了。
这里就是为大家介绍的Flume+Kafka+Storm的整体流程,后续会给大家用一个项目案例来实践演示这个流程,包括具体的各个模块的编码实践。今天大家可以先熟悉下实时计算项目的流程开发。
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
人打赏支持
码字总数 62676
实战项目没有storm,这个已经没什么可说的,刚开始学习的时候借来的文章,真研究一段时间,也就那样。。。
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥
& 开源中国(OSChina.NET) |
开源中国社区(OSChina.net)是工信部
指定的官方社区

我要回帖

更多关于 kafka spout 的文章

 

随机推荐