spark读取kafka数据消费kafka时怎么移动zk

Spark kafka实时消费实现
直接上代码,完整的。scala编写
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.typesafe.config.{Config, ConfigFactory}
import xxxxxx.JsonUtil
import scala.collection._
import kafka.serializer.StringDecoder
object Example extends Logging{
def startJob(args: Array[String]){
val appConf = ConfigFactory.load("app.conf")
val sc = new SparkContext(new SparkConf().setAppName(appConf.getString("name")))
val streamConf = appConf.getConfig("streaming")
def functionToCreateContext(): StreamingContext = {
val context = new StreamingContext(sc, Seconds(streamConf.getInt("duration")))
doXxx(appConf,sc,context)
context.checkpoint(streamConf.getString("checkpointDir"))
val ssc = StreamingContext.getOrCreate(streamConf.getString("checkpointDir"),functionToCreateContext)
ssc.start()
ssc.awaitTermination()
def doXxx(appConf: Config , sc: SparkContext, ssc : StreamingContext) {
brokers = "22.2.22.22:.22.23:.22.24:.22.25:9092"
topics = "example"
offset = "largest"
val kafkaConf = appConf.getConfig("kafka")
val topics = kafkaConf.getString("topics")
val brokers = kafkaConf.getString("brokers")
val offset = kafkaConf.getString("offset")
val topicSet = topics.split(",").toSet
val kafkaParams = immutable.Map[String, String]("metadata.broker.list" -& brokers,"auto.offset.reset"-& offset)
val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
val minuteLines = lines.filter(line =&{
if(line.indexOf("\"code\":\"aaa\"")&=0){
}).map(message =& {
val jsonData = JsonUtil.read(message)
minuteLines.foreachRDD(rdd =& {
rdd.foreach {jsonData =& {
val code = jsonData("code").asInstanceOf[String]
val name = jsonData("name").asInstanceOf[String]
def main(args: Array[String]) {
startJob(args)
上面用到的config工具,maven依赖,在这里贴出来:
&com.typesafe&
Spark Streaming消费kafka示例
如何管理Spark Streaming消费Kafka的偏移量(三)
Spark streaming+kafka实战教程
flume+kafka+spark streaming(持续更新)
zookeeper+kafka安装以及kafka+spark streaming 的简单整合
SODBASE实时大数据基础(一):实时同步Mysql数据库到Kafka
Spark消费kafkaf的数据,解析数据并将数据存入到Hive中
大数据之实时流处理常用框架
Spark 2.0 + kafka 0.10 fullstack 实战小记(1)
spark消费kafka的两种方式
没有更多推荐了,
(window.slotbydup=window.slotbydup || []).push({
id: "5865575",
container: s,
size: "300,250",
display: "inlay-fix"小飞鱼的大数据世界
Spark Streaming通过直连的方式消费Kafka中的数据
为什么采用直连(createDirectStream)的方式,主要有以下几个原因:
1.createDirectStream的方式从Kafka集群中读取数据,并且在Spark Streaming系统里面维护偏移量相关的信息,实现零数据丢失,保证不重复消费,比createStream更高效;
2.创建的DStream的rdd的partition做到了和Kafka中topic的partition一一对应。
但是采用直连(createDirectStream)的方式有一个缺点,就是不再向zookeeper中更新offset信息。
因此,在采用直连的方式消费kafka中的数据的时候,大体思路是首先获取保存在zookeeper中的偏移量信息,根据偏移量信息去创建stream,消费数据后再把当前的偏移量写入zookeeper中。在创建stream时需要考虑以下几点:
1.zookeeper中没有偏移量信息,此时按照自定义的kafka参数的配置创建stream;
2.zookeeper中保存了偏移量信息,但由于各种原因kafka清理掉了该处偏移量的数据,此时需要对偏移量进行修正,否则在运行时会出现偏移量越界的异常。 解决方法是调用spark-streaming-kafka API 中 KafkaCluster这个类中的方法获取broker中实际的最大最小偏移量,和zookeeper中偏移量进行对比来修正偏移量信息。在2.0以前的版本中KafkaCluster这个类是private权限的,需要把它拷贝到项目里使用。2.0以后的版本中修改KafkaCluster的权限为public,可以尽情调用了。
为了方便调用,本人在使用时写了一个KafkaHelper的类,将创建stream和更新zookeeper中offset的代码封装了起来,代码如下:
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkException
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.kafka.KafkaCluster.Err
* KafkaHelper类提供两个共有方法,一个用来创建direct方式的DStream,另一个用来更新zookeeper中的消费偏移量
* @param kafkaPrams kafka配置参数
* @param zkQuorum zookeeper列表
* @param group 消费组
* @param topic 消费主题
class KafkaHelper(kafkaPrams:Map[String,String],zkQuorum:String,group:String,topic:String) extends Serializable{
private val kc = new KafkaCluster(kafkaPrams)
private val zkClient = new ZkClient(zkQuorum)
private val topics = Set(topic)
* 获取消费组group下的主题topic在zookeeper中的保存路径
private def getZkPath():String={
val topicDirs = new ZKGroupTopicDirs(group,topic)
val zkPath = topicDirs.consumerOffsetDir
* 获取偏移量信息
* @param children 分区数
* @param zkPath zookeeper中的topic信息的路径
* @param earlistLeaderOffsets broker中的实际最小偏移量
* @param latestLeaderOffsets broker中的实际最大偏移量
private def getOffsets(children:Int,zkPath:String,earlistLeaderOffsets:Map[TopicAndPartition, KafkaCluster.LeaderOffset],latestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset]): Map[TopicAndPartition, Long] = {
var fromOffsets: Map[TopicAndPartition, Long] = Map()
for(i &- 0 until children){
//获取zookeeper记录的分区偏移量
val zkOffset = zkClient.readData[String](s"${zkPath}/${i}").toLong
val tp = TopicAndPartition(topic,i)
//获取broker中实际的最小和最大偏移量
val earlistOffset: Long = earlistLeaderOffsets(tp).offset
val latestOffset: Long = latestLeaderOffsets(tp).offset
//将实际的偏移量和zookeeper记录的偏移量进行对比,如果zookeeper中记录的偏移量在实际的偏移量范围内则使用zookeeper中的偏移量,
//反之,使用实际的broker中的最小偏移量
if(zkOffset&=earlistOffset && zkOffset&=latestOffset) {
fromOffsets += (tp -& zkOffset)
fromOffsets += (tp -& earlistOffset)
fromOffsets
* 创建DStream
* @param ssc
def createDirectStream(ssc:StreamingContext):InputDStream[(String, String)]={
//----------------------获取broker中实际偏移量---------------------------------------------
val partitionsE: Either[Err, Set[TopicAndPartition]] = kc.getPartitions(topics)
if(partitionsE.isLeft)
throw new SparkException("get kafka partitions failed:")
val partitions = partitionsE.right.get
val earlistLeaderOffsetsE: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]] = kc.getEarliestLeaderOffsets(partitions)
if(earlistLeaderOffsetsE.isLeft)
throw new SparkException("get kafka earlistLeaderOffsets failed:")
val earlistLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = earlistLeaderOffsetsE.right.get
val latestLeaderOffsetsE: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]] = kc.getLatestLeaderOffsets(partitions)
if(latestLeaderOffsetsE.isLeft)
throw new SparkException("get kafka latestLeaderOffsets failed:")
val latestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = latestLeaderOffsetsE.right.get
//----------------------创建kafkaStream----------------------------------------------------
var kafkaStream:InputDStream[(String, String)]=null
val zkPath: String = getZkPath()
val children = zkClient.countChildren(zkPath)
//根据zookeeper中是否有偏移量数据判断有没有消费过kafka中的数据
if(children & 0){
val fromOffsets:Map[TopicAndPartition, Long] = getOffsets(children,zkPath,earlistLeaderOffsets,latestLeaderOffsets)
val messageHandler = (mmd: MessageAndMetadata[String, String]) =& (mmd.topic, mmd.message())
//如果消费过,根据偏移量创建Stream
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaPrams, fromOffsets, messageHandler)
//如果没有消费过,根据kafkaPrams配置信息从最早的数据开始创建Stream
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaPrams, topics)
kafkaStream
* 更新zookeeper中的偏移量
* @param offsetRanges
def updateZkOffsets(offsetRanges:Array[OffsetRange])={
val zkPath: String = getZkPath()
for( o &- offsetRanges){
val newZkPath = s"${zkPath}/${o.partition}"
//将该 partition 的 offset 保存到 zookeeper
ZkUtils.updatePersistentPath(zkClient, newZkPath, o.fromOffset.toString)
测试代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object TestKafkaHelper {
def main(args: Array[String]): Unit = {
if(args.length&5){
println("Usage:&timeInterval& &brokerList& &zkQuorum& &topic& &group&")
System.exit(1)
val Array(timeInterval,brokerList,zkQuorum,topic,group) = args
val conf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(timeInterval.toInt))
//kafka配置参数
val kafkaParams = Map(
"metadata.broker.list" -& brokerList,
"group.id" -& group,
"auto.offset.reset" -& kafka.api.OffsetRequest.SmallestTimeString
val kafkaHelper = new KafkaHelper(kafkaParams,zkQuorum,topic,group)
val kafkaStream: InputDStream[(String, String)] = kafkaHelper.createDirectStream(ssc)
var offsetRanges = Array[OffsetRange]()
kafkaStream.transform( rdd =&{
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}).map( msg =& msg._2)
.foreachRDD( rdd =& {
rdd.foreachPartition( partition =&{
partition.foreach( record =&{
//处理数据的方法
println(record)
kafkaHelper.updateZkOffsets(offsetRanges)
ssc.start()
ssc.awaitTermination()
ssc.stop()
SparkStreaming采用直连方式(Direct Approach)获取Kafka数据的研究心得
没有更多推荐了,
(window.slotbydup=window.slotbydup || []).push({
id: "5865575",
container: s,
size: "300,250",
display: "inlay-fix"[Spark][kafka]kafka 生产者,消费者 互动例子
# pwd/usr/local/kafka_2.11-0.10.0.1/bin
创建topic:# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic device_statusWARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.Created topic "device_status".#
查看topic:# ./kafka-topics.sh --list --zookeeper localhost:2181device_status#
生产者生成消息:# ./kafka-console-producer.sh --broker-list localhost:9092 --topic device_status
消费者获得消息:# pwd/usr/local/kafka_2.11-0.10.0.1/bin
# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic device_status --from-beginning
在生产者窗口输入:hi, I am producer 然后按回车
在消费者窗口会看到:hi, I am producer
阅读(...) 评论()&nbsp>&nbsp
&nbsp>&nbsp
&nbsp>&nbsp
spark spark streaming + kafka receiver方式消费消息
摘要:kafka+sparkstreaming集群前提:spark安装成功,spark1.6.0zookeeper安装成功kafka安装成功步骤:在worker1中启动kafka生产者::/usr/local/kafka_2.10-0.9.0.1#bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest在worker2中启动消费者:aliyunzixun@xxx.co
kafka + spark streaming 集群
spark 安装成功,spark 1.6.0
zookeeper 安装成功
kafka 安装成功
在worker1中启动kafka 生产者:
:/usr/local/kafka_2.10-0.9.0.1#bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest
在worker2中启动消费者:
:/usr/local/kafka_2.10-0.9.0.1#bin/kafka-console-consumer.sh--zookeepermaster:2181--topictest
生产者生产的消息,消费者可以消费到。说明kafka集群没问题。进入下一步。
在master中启动spark-shell
./spark-shell--masterlocal[2]--packagesorg.apache.spark:spark-streaming-kafka_2.10:1.6.0
笔者用的spark 是 1.6.0 ,读者根据自己版本调整。
shell中的逻辑代码(wordcount): importorg.apache.spark.SparkConf
importorg.apache.spark.streaming._
importorg.apache.spark.streaming.kafka._
importorg.apache.spark.streaming.kafka.KafkaUtils
importorg.apache.spark.streaming.{Durations,StreamingContext}
valssc=newStreamingContext(sc,Durations.seconds(5))
//第二个参数是zk的clienthost:port
//第三个参数是groupID
KafkaUtils.createStream(ssc,&master:2181,worker1:2181,worker2:2181&,&StreamingWordCountSelfKafkaScala&,Map(&test&-&1)).map(_._2).flatMap(_.split(&&)).map((_,1)).reduceByKey(_+_).print()
生产者再生产消息:
spark streaming的反应:
返回worker2查看消费者
可见,groupId不一样,相互之间没有互斥。
上述是使用 createStream 方式链接kafka
还有更高效的方式,请使用createDirectStream
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
感谢王家林老师的知识分享
王家林老师名片:
中国Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公众号:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
YY课堂:每天20:00现场授课频道
王家林:DT大数据梦工厂创始人、Spark亚太研究院院长和首席专家、大数据培训专家、大数据架构师。
Spark、Flink、Docker、Android技术中国区布道师。国内最早一批从事Android、Hadoop、Spark、Docker的研究者,在Spark、Hadoop、Android、Docker等方面有丰富的源码、实务和性能优化经验。是该领域的知名咨询顾问、培训专家;Spark最佳畅销书《大数据spark企业级实战》和《Spark大数据实例开发教程》作者;Android移动互联网兴起以来,近10本的IT畅销书作者;为大量企业进行技术培训和服务,包括:
三星、惠普、爱立信、摩托罗拉、索尼、华为、夏普、南方航空公司、中国国际航空公司、金立、海信、长虹、英特尔、阿尔法特、中国联通、华三、AIA、亿迅、中国电信、网龙、福赛、中国人寿、阳光保险、兴业银行等。
找我报名有会员价哦。
以上是的内容,更多
的内容,请您使用右上方搜索功能获取相关信息。
若你要投稿、删除文章请联系邮箱:zixun-group@service.aliyun.com,工作人员会在五个工作日内给你回复。
新用户大礼包!
现在注册,免费体验40+云产品,及域名优惠!
云服务器 ECS
可弹性伸缩、安全稳定、简单易用
&40.8元/月起
预测未发生的攻击
&24元/月起
你可能还喜欢
你可能感兴趣
阿里云教程中心为您免费提供
spark spark streaming + kafka receiver方式消费消息相关信息,包括
的信息,所有spark spark streaming + kafka receiver方式消费消息相关内容均不代表阿里云的意见!投稿删除文章请联系邮箱:zixun-group@service.aliyun.com,工作人员会在五个工作日内答复
售前咨询热线
支持与服务
资源和社区
关注阿里云
International%=% ?#=# : ^-^
sparkstreaming消费kafka中的数据
package com.kafka.my.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import java.util.Properties
* @author root
sparkstreaming获取kafka方式一
测试结果:通过
*测试:1\需要先在h15\h16\h17上启动zookeeper,再启动kafka,创建kafka的topic
2\在h15kafka的bin目录下执行
#sh kafka-console-producer.sh --topic a --broker-list h15:2,h17:9092
让其等待输入
3\启动本程序接受数据
4\在h15上刚才的窗口输入数据
5\查看本程序是否正常接收
*错误:java.nio.channels.ClosedChannelException
* fetching topic metadata for topics [Set(aa)] from broker [ArrayBuffer(id:2,host:h17,port:9092, id:1,host:h16,port:9092, id:0,host:h15,port:9092)] failed
原因:server.propertis中的host.name=h15或者注释掉,否则报错
object KafkaReceiverCountWord {
def main(args: Array[String]): Unit = {
val ssc=new StreamingContext(new SparkConf().setAppName("wordCount").setMaster("local[2]"),Durations.seconds(5))
//创建properties
val topicThreadMap=Array("a").map { (_,1) }.toMap
//创建客户端接收
val lines =KafkaUtils.createStream(ssc,
"192.168.142.115:.142.116:.142.117:2181"
, "WordcountConsumerGroup"
, topicThreadMap)
val words =lines.flatMap(_._2.split(" "))
val pairs=words.map { (_,1) }
//reduceByKey
val wordcounts=pairs.reduceByKey(_+_)
//必须触发
wordcounts.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
package com.kafka.my.scala
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Durations
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
* @author root
sparkstreaming获取kafka方式二
测试结果:通过
区别于第一种方式:
1、offset不会更新到zookeeper
2、使用的节点端口是9092kafka的broker端口
3、不存在group
object KafkaDirectCountWord {
def main(args: Array[String]): Unit = {
//获取sparkstreaming
val ssc=new StreamingContext(new SparkConf().setAppName("directCount").setMaster("local[2]"),Durations.seconds(5))
//创建kafkaParams
val kafkaParams=Array("metadata.broker.list").map {(_,"192.168.142.115:.142.116:.142.117:9092") }.toMap
//创建topic
val topics=Array("a").toSet
val topics=Set("a")
//获取lines
val lines=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
val words =lines.flatMap(_._2.split(" "))
val pairs=words.map { (_,1) }
//reduceByKey
val wordcounts=pairs.reduceByKey(_+_)
//必须触发
wordcounts.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
Spark消费kafkaf的数据,解析数据并将数据存入到Hive中
《转载》Java 连接Kafka报错java.nio.channels.ClosedChannelExcep
Kafka异常 java.nio.channels.ClosedChannelException
Kafka:无丢失将kafka的值读取到hbase
spark-streaming+kafka 出现org.apache.spark.SparkException: java.lang.IllegalArgumentException
没有更多推荐了,
(window.slotbydup=window.slotbydup || []).push({
id: "5865575",
container: s,
size: "300,250",
display: "inlay-fix"

我要回帖

更多关于 spark读取kafka数据 的文章

 

随机推荐