storm-查询kafka集群分区偏移量,提交topology错误

扫一扫体验手机阅读
storm-kafka(storm spout作为kafka的消费端)
<span type="1" blog_id="1701258" userid='
344篇文章,94W+人气,0粉丝
前百度高级工程师的架构高可用实战
¥51.00237人订阅
<span type="1" blog_id="1701258" userid='topology运行一段时间后报错,求探讨
[问题点数:100分,无满意结帖,结帖人qq_]
本版专家分:0
结帖率 87.5%
CSDN今日推荐
匿名用户不能发表回复!|
其他相关推荐个人技术的成长经历
记一次storm异常(提交topology时发生,启动均无异常)--java.net.UnknownHostException
在搭建storm的生产环境集群的时候发生了一个怪事儿,主nimbus节点启动正常,无报错信息,2台supervisor启动正常,无报错信息,为了检测方便,还在主节点nimbus上启动了相应的ui。【问题来了】每当我提交topology的时候,提交拓扑时的信息没有报错,但是过了2s,查看supervisor.log时,发现两台机器都会报错,同时错误信息过后就会自己宕掉,杀掉进程。为了解决这个问题,基本上是翻阅了各大社区,都木有找到解决问题。这个问题头疼了一星期,最终在今天得以解决,由此记录下。如果后续也有人遇到了相同的问题,可以试着本文的方法试下,说不定会有效。
storm环境介绍
192.168.101.25
zookeeper1 + kafka + storm (supervisor)
192.168.101.26
zookeeper2 + kafka + storm (supervisor)
192.168.101.36
zookeeper3 + kafka
192.168.56.147
storm(nimbus) + storm(ui)
- 192.168.101.25
zookeeper1 + kafka + storm (supervisor)
- 192.168.101.26
zookeeper2 + kafka + storm (supervisor)
- 192.168.101.36
zookeeper3 + kafka
- 192.168.56.147
storm(nimbus) + storm(ui)
如上所示,一共四台服务器,25、26、36上搭建这zookeeper集群,而25、26、147又形成了storm集群。
2018-02-11 11:19:27.481 o.a.s.u.NimbusClient Async Localizer [WARN] Ignoring exception while trying to get leader nimbus info from 192.168.56.147. will retry with a different seed host.
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.storm.thrift.transport.TTransportException: java.net.UnknownHostException: Phq147
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:108) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.ThriftClient.&init&(ThriftClient.java:69) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.NimbusClient.&init&(NimbusClient.java:127) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:94) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:57) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.Utils.getClientBlobStoreForSupervisor(Utils.java:538) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.downloadBaseBlobs(AsyncLocalizer.java:121) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.call(AsyncLocalizer.java:148) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.call(AsyncLocalizer.java:101) ~[storm-core-1.1.1.jar:1.1.1]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) ~[?:1.7.0_80]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_80]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_80]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
Caused by: java.lang.RuntimeException: org.apache.storm.thrift.transport.TTransportException: java.net.UnknownHostException: Phq147
at org.apache.storm.security.auth.TBackoffConnect.retryNext(TBackoffConnect.java:64) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:56) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:100) ~[storm-core-1.1.1.jar:1.1.1]
... 13 more
Caused by: org.apache.storm.thrift.transport.TTransportException: java.net.UnknownHostException: PhqApPrdGquery147
at org.apache.storm.thrift.transport.TSocket.open(TSocket.java:226) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:105) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:100) ~[storm-core-1.1.1.jar:1.1.1]
... 13 more
Caused by: java.net.UnknownHostException: Phq147
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178) ~[?:1.7.0_80]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.7.0_80]
at java.net.Socket.connect(Socket.java:579) ~[?:1.7.0_80]
at org.apache.storm.thrift.transport.TSocket.open(TSocket.java:221) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:105) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:100) ~[storm-core-1.1.1.jar:1.1.1]
... 13 more
2018-02-11 11:19:27.481 o.a.s.l.AsyncLocalizer Async Localizer [WARN] Failed to download basic resources for topology-id servicebus-1-
2018-02-11 11:19:27.481 o.a.s.d.s.AdvancedFSOps Async Localizer [INFO] Deleting path /karfka/elk/storm/apache-storm-1.1.1/../stormdata/supervisor/tmp/a19cf9f4-b0de-41bc-a06e-299c1046514c
2018-02-11 11:19:27.491 o.a.s.d.s.AdvancedFSOps Async Localizer [INFO] Deleting path /karfka/elk/storm/apache-storm-1.1.1/../stormdata/supervisor/stormdist/servicebus-1-
2018-02-11 11:19:27.492 o.a.s.l.AsyncLocalizer Async Localizer [WARN] Caught Exception While Downloading (rethrowing)...
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [192.168.56.147]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:111) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:57) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.Utils.getClientBlobStoreForSupervisor(Utils.java:538) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.downloadBaseBlobs(AsyncLocalizer.java:121) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.call(AsyncLocalizer.java:148) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.call(AsyncLocalizer.java:101) ~[storm-core-1.1.1.jar:1.1.1]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) ~[?:1.7.0_80]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_80]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_80]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
2018-02-11 11:19:27.495 o.a.s.d.s.Slot SLOT_6700 [ERROR] Error when processing event
java.util.concurrent.ExecutionException: org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [192.168.56.147]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.7.0_80]
at java.util.concurrent.FutureTask.get(FutureTask.java:202) ~[?:1.7.0_80]
at org.apache.storm.localizer.LocalDownloadedResource$NoCancelFuture.get(LocalDownloadedResource.java:63) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.supervisor.Slot.handleWaitingForBasicLocalization(Slot.java:413) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:273) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:741) ~[storm-core-1.1.1.jar:1.1.1]
Caused by: org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [192.168.56.147]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:111) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:57) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.Utils.getClientBlobStoreForSupervisor(Utils.java:538) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.downloadBaseBlobs(AsyncLocalizer.java:121) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.call(AsyncLocalizer.java:148) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.localizer.AsyncLocalizer$DownloadBaseBlobsDistributed.call(AsyncLocalizer.java:101) ~[storm-core-1.1.1.jar:1.1.1]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) ~[?:1.7.0_80]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_80]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_80]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
2018-02-11 11:19:27.495 o.a.s.u.Utils SLOT_6700 [ERROR] Halting process: Error when processing an event
java.lang.RuntimeException: Halting process: Error when processing an event
at org.apache.storm.utils.Utils.exitProcess(Utils.java:1773) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:774) ~[storm-core-1.1.1.jar:1.1.1]
2018-02-11 11:19:27.498 o.a.s.d.s.Supervisor Thread-5 [INFO] Shutting down supervisor a6f91d50-eceb-4f1e-bb99-28799a97d58e
2018-02-11 11:19:27.502 o.a.s.e.EventManagerImp Thread-4 [INFO] Event manager interrupted
解决问题的思路过程
初步思路:
像这种问题,一般引起异常的原因是因为网络问题,所以我第一个想到的就是端口问题,通过supervisor的后台日志进行查看,发现有这么一句
2018-02-11 11:13:26.150 o.a.s.d.s.Supervisor main [INFO] Starting Supervisor with conf {drpc.worker.threads=64, topology.state.synchroni
zation.timeout.secs=60, topology.executor...省略都是一堆配置...}
2018-02-11 11:13:26.227 o.a.s.d.s.Slot main [WARN] SLOT service101.25:6700 Starting in state EMPTY - assignment null
2018-02-11 11:13:26.228 o.a.s.d.s.Slot main [WARN] SLOT service101.25:6701 Starting in state EMPTY - assignment null
2018-02-11 11:13:26.229 o.a.s.d.s.Slot main [WARN] SLOT service101.25:6702 Starting in state EMPTY - assignment null
2018-02-11 11:13:26.229 o.a.s.d.s.Slot main [WARN] SLOT service101.25:6703 Starting in state EMPTY - assignment null
查看到上面的日志时,初步结论是以为storm分发给worker的端口号没有打开,或者说是被supervisor这个台机器的防火墙给拦截了,但是与相应的同事沟通后,发现此机器的防火墙是关闭着的,而通过
netstat -an|grep 6700
抓取端口的时候确实也没有抓取成功,后续查资料发现,worker节点的端口是分发工作时才会被打开,所以抓取不到。
跟进思路:
发现SLOT service101.25:6703,为什么这块儿不是我的ip地址,而是被解析成主机名去显示了?随后查看了我25服务器上的hosts文件,如下:
localhost localhost.localdomain localhost4 localhost4.localdomain4
localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.101.25
service101.25
原来是被storm自动解析成了主机名,而在我最上面的异常信息中也报的是找不到java.net.UnknownHostException: Phq147,我就在想会不会是我的storm的supervisor配置文件有问题,最初配置文件如下:
nimbus.seeds: ["192.168.56.147"]
那我改成它报错的主机名如何呢?
nimbus.seeds: ["Phq147"]
修改后发现依然不行!继续查看我147服务器的hosts文件发现如下内容:
localhost.localdomain localhost
localhost6.localdomain6 localhost6
10.10.56.147
原来在147上也做了相应的主机名配置,但是为什么修改后依然访问不通呢!!!???
其实要说这个坑为什么这么久才解决,还是因为自己对网络知识有所欠佳,如果发生了上续错误,是因为25supervisor服务器,去向147nimbus服务器请求通信的时候,dns解析失败造成的,网络中,一台服务器像另一台服务器发出请求时,首先会寻找hosts文件下有没有对另一台服务器进行映射,如果有,则优先hosts文件内容为主进行解析,若没有则再通过dns去解析。于是最后轻松解决问题的操作如下:
在25、26两台supervisor的hosts文件中添加上对147的映射即可。
拿25举例:
localhost localhost.localdomain localhost4 localhost4.localdomain4
localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.101.25
service101.25
192.168.56.147 Phq147
至此完成,看来还是需要多了解了解网络才行。。。。。
扫码向博主提问
非学,无以致疑;非问,无以广识
擅长领域:
[解决] storm启动异常:org.apache.thrift7.transport.TTransportException
org.apache.storm.thrift.transport.TTransportException
解决storm1.1.1集群找不到nimbus异常
storm启动报错及解决办法
Storm1.0安装过程及遇到的错误处理方法
java.net.UnknownHostException
Storm集群安装详解
java.net.UnknownHostException: unknown host:xxxx异常解决办法
没有更多推荐了,通过 IDE 向 Storm 集群远程提交 topology
时间: 13:56:26
&&&& 阅读:631
&&&& 评论:
&&&& 收藏:0
标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&转载:
http://weyo.me/pages/techs/storm-topology-remote-submission/
作为一个懒癌晚期患者,虽然 Storm 只需要一条命令的任务提交方式已经够简单了,但还是一直想要有种更简(tou)单(lan)的方式,比如要是在 Windows 下写完代码之后可以直接提交任务而不需要手动把 jar 包拷到服务器上再提交那定是极好的了。谷歌了一下终于在墙外找到了解决方法:&
Storm 集群配置
nimbus: "hd124"
nimbus.port: 6627
提交 Topology
Config conf = new Config();
conf.setNumWorkers(2);
conf.setDebug(true);
// topology 其他配置信息等
// 读取本地 Storm 配置文件
Map stormConf = Utils.readStormConfig();
stormConf.put("nimbus.host", "hd124");
stormConf.putAll(conf);
Nimbus.Client client = NimbusClient.getConfiguredClient(stormConf).getClient();
String inputJar = "E:\\workspace\\storm-demo\\target\\storm-demo-0.0.5-SNAPSHOT-shade.jar";
NimbusClient nimbus = new NimbusClient(stormConf, "hd124", 6627);
// 使用 StormSubmitter 提交 jar 包
String uploadedJarLocation = StormSubmitter.submitJar(stormConf, inputJar);
String jsonConf = JSONValue.toJSONString(stormConf);
nimbus.getClient().submitTopology("remotetopology", uploadedJarLocation, jsonConf, builder.createTopology());
第8行会读取 Storm 的本地配置文件,如果不指定的话,Utils.readStormConfig()&会读取 Storm 依赖 jar 包的默认配置文件,如&"\maven\repository\org\apache\storm\storm-core\0.9.3\storm-core-0.9.3.jar\defaults.yaml",如果集群配置与默认配置有较大不同,还需要修改对应配置信息。
这段代码需要在 Topology 已经完成打包之后运行,因为需要在程序中指定待提交的 jar 包。可以在 IDE 中安装 Maven 插件,Topology 开发完成之后直接打包,然后再切换到这段提交代码中执行提交任务。
任务提交完成之后可以在 Storm UI 中查看提交结果。
再来一个:
http://xumingming.sinaapp.com/117/twitter-storm%E7%9A%84%E4%B8%80%E4%BA%9B%E5%85%B3%E9%94%AE%E6%A6%82%E5%BF%B5/
http://xumingming.sinaapp.com/189/twitter-storm-storm%E7%9A%84%E4%B8%80%E4%BA%9B%E5%B8%B8%E8%A7%81%E6%A8%A1%E5%BC%8F/标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&原文:http://www.cnblogs.com/diyunpeng/p/4620577.html
教程昨日排行
&&国之画&&&& &&&&&&
&& &&&&&&&&&&&&&&
鲁ICP备号-4
打开技术之扣,分享程序人生!关于一句话的修改:storm集成kafka
参考文档中说到
打包上传到服务器,运行
Storm jar jarname CountTopology
回车,会看到他在等待数据传入。
这个时候运行kafka消费者程序,将数据输出,则会看到storm 会迅速输出数据和统计数目。
这里测试不写了。
正确的说法是:
是运行kafka生产者程序,将数据输入到storm,这时会看到storm 会迅速输出数据和统计数目。
因为这篇文字的开头说“这里的目标是kafka 负责生产数据,storm 消费数据并将结果输出”
http://blog.csdn.net/looklook5/article/details/
为对比,将原文放置如下
这里的目标是kafka 负责生产数据,storm 消费数据并将结果输出
这里用的是引进别人家写的整合代码,因为使用的人也比较多,下面是项目地址
下载、解压以及将这个目录下的代码添加进项目
将kafka 和 storm 的JAR 添加进项目,作为依赖jar 包
然后添加com.netflix.curator 的相关包括client、framework和recipes
下载地址:
最新的所有com.google.common类,下载地址
这样storm-kafka-0.8-plus项目应该就不会报错了。
二、kafka 生产者的创建
在我的这篇文章里3.6、Producer API,有生产者的例子,可以拿来直接用。
三、创建消费 kafka 数据的Topology
storm-kafka-0.8-plus 给我们写了个代码
代码如下:
这里清晰的写出了创建一个与kafka整合的storm Topology,观察main 函数,从上往下看:
下面是关于zookeeper的设定以及spout和bolt 的设定
下面的语句中,storm-sentence是话题,下面的语句是要求在zookeeper 服务器中在根目录创建文件夹storm,用于kafka存放zookeeper相关数据
下面是设置Topology的相关设定
下面是我修改后的脚本
storm整合kafka,spout作为kafka的消费者
Storm-kafka集成——1.1.0版本storm中tuple取KafkaSpout数据详解
(三)storm-kafka源码走读之如何构建一个KafkaSpout
storm的kafkaSpout实例
(五)storm-kafka源码走读之KafkaSpout
没有更多推荐了,

我要回帖

更多关于 kafka集群 分区消费 的文章

 

随机推荐