Spark Streaming 应用在处理完一段javadstream转javardd后,persist在内存中的rdd还存在吗?

程序员的自我修养
29,765次浏览
Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强这两个特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和以及完美融合。
其内部工作方式如下:
Word Count示例
private static final Pattern SPACE = pile(" ");
public static void main(String[] args) {
StreamingExamples.setStreamingLogLevels();
JavaStreamingContext jssc = new JavaStreamingContext("local[2]",
"JavaNetworkWordCount", new Duration(10000));
jssc.checkpoint(".");//使用updateStateByKey()函数需要设置checkpoint
//打开本地的端口9999
JavaReceiverInputDStream&String& lines = jssc.socketTextStream("localhost", 9999);
//按行输入,以空格分隔
JavaDStream&String& words = lines.flatMap(line -& Arrays.asList(SPACE.split(line)));
//每个单词形成pair,如(word,1)
JavaPairDStream&String, Integer& pairs = words.mapToPair(word -& new Tuple2&&(word, 1));
//统计并更新每个单词的历史出现次数
JavaPairDStream&String, Integer& counts = pairs.updateStateByKey((values, state) -& {
Integer newSum = state.or(0);
for(Integer i :values) {
return Optional.of(newSum);
counts.print();
jssc.start();
jssc.awaitTermination();
12345678910111213141516171819202122232425
private static final Pattern SPACE = Pattern.compile(" ");public static void main(String[] args) {&&&&StreamingExamples.setStreamingLogLevels();&&&&&JavaStreamingContext jssc = new JavaStreamingContext("local[2]",&&&&&&&&&&"JavaNetworkWordCount", new Duration(10000));&&&&jssc.checkpoint(".");//使用updateStateByKey()函数需要设置checkpoint&&&&//打开本地的端口9999&&&&JavaReceiverInputDStream&String& lines = jssc.socketTextStream("localhost", 9999);&&&&//按行输入,以空格分隔&&&&JavaDStream&String& words = lines.flatMap(line -& Arrays.asList(SPACE.split(line)));&&&&//每个单词形成pair,如(word,1)&&&&JavaPairDStream&String, Integer& pairs = words.mapToPair(word -& new Tuple2&&(word, 1));&&&&//统计并更新每个单词的历史出现次数&&&&JavaPairDStream&String, Integer& counts = pairs.updateStateByKey((values, state) -& {&&&&&&&&Integer newSum = state.or(0);&&&&&&&&for(Integer i :values) {&&&&&&&&&&&&newSum += i;&&&&&&&&}&&&&&&&&return Optional.of(newSum);&&&&});&&&&counts.print();&&&&jssc.start();&&&&jssc.awaitTermination();}
启动Netcat
$ nc -lk 9999
$ nc -lk 9999
启动Spark Streaming Application
若在本地调试,可在IDE中启动,否则,用如下命令启动:
$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
hello world
hello spark
hello yurnom
hello worldhello sparkhello yurnom
-------------------------------------------
Time: 0 ms
-------------------------------------------
(yurnom,1)
-------------------------------------------Time: 0 ms-------------------------------------------(yurnom,1)(hello,3)(world,1)(spark,1)
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream由连续的序列化RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
对数据的操作也是按照RDD为单位来进行的,如下图所示:
上图下方的RDD都是通过Spark高级原语的转换而来,计算过程由Spark engine来完成。
Operations
DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
UpdateStateByKey Operation
UpdateStateByKey原语用于记录历史记录,上文中Word Count示例中就用到了该特性。若不用UpdateStateByKey来更新状态,那么每次数据进来后分析完成后,结果输出后将不在保存。如,若将上文代码示例中的第15行若替换为:
JavaPairDStream&String, Integer& counts = pairs.reduceByKey((i1, i2) -& (i1 + i2));
JavaPairDStream&String, Integer& counts = pairs.reduceByKey((i1, i2) -& (i1 + i2));
那么输入:hellow world,结果则为:(hello,1)(world,1),然后输入hello spark,结果则为(hello,1)(spark,1)。也就是不会保留上一次数据处理的结果。
使用UpdateStateByKey原语需要用于记录的State,可以为任意类型,如上例中即为Optional&Intege&类型;此外还需要更新State的函数,可参考Word Count示例中的15-20行。
Transform Operations
Transform()原语允许DStream上执行任意的RDD-to-RDD函数。通过该函数可以方便的扩展Spark API。此外,本篇开头所提到的以及也是通过本函数来进行结合的。官方示例:
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD&String, Double& spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream&String, Integer& cleanedDStream = wordCounts.transform(
new Function&JavaPairRDD&String, Integer&, JavaPairRDD&String, Integer&&() {
@Override public JavaPairRDD&String, Integer& call(JavaPairRDD&String, Integer& rdd) throws Exception {
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
1234567891011
import org.apache.spark.streaming.api.java.*;// RDD containing spam informationfinal JavaPairRDD&String, Double& spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);&JavaPairDStream&String, Integer& cleanedDStream = wordCounts.transform(&&new Function&JavaPairRDD&String, Integer&, JavaPairRDD&String, Integer&&() {&&&&@Override public JavaPairRDD&String, Integer& call(JavaPairRDD&String, Integer& rdd) throws Exception {&&&&&&rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning&&&&&&...&&&&}&&});
Window Operations
Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。如下图所示:
如以下代码表示,每10秒钟处理最近30秒钟中的数据。
JavaPairDStream&String, Integer& windowedWordCounts =
pairs.reduceByKeyAndWindow((a, b) -& (a + b),
new Duration(30000), new Duration(10000));
JavaPairDStream&String, Integer& windowedWordCounts = &&&&&&pairs.reduceByKeyAndWindow((a, b) -& (a + b), &&&&&&&&&&&&new Duration(30000), new Duration(10000));
Window相关API有:
window(windowLength, slideInterval)
countByWindow(windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
countByValueAndWindow(windowLength, slideInterval, [numTasks])
Output Operations
当某个Output Operations原语被调用时,stream才会开始真正的计算过程。现阶段支持的Output方式有以下几种:
foreachRDD(func)
saveAsObjectFiles(prefix, [suffix])
saveAsTextFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
除了前文中Word Count示例中用到的TCP套接字连接连接作为输入源以外,Spark Streaming还可以使用很多其它的输入源。例如对于文件,可以这样处理:
jssc.fileStream(dataDirectory);
jssc.fileStream(dataDirectory);
Spark Streaming将会监控该文件夹,要使用该特性,需要注意以下几点:
该文件夹下的所有文件必须有相同的数据格式
在该文件夹下创建文件的方式必须是原子性的移动或重命名的方式,不可以先创建文件后在进行写入
所有文件夹下的文件不可进行改动
其它数据源的使用可以参考Spark安装包中的examples文件夹中的streaming部分。同样对于特殊的数据输入源,可以进行。
一般来说,使用Spark自带的就能满足大部分的监控需求。对于Spark Streaming来说,以下两个度量指标尤为重要(在Batch Processing Statistics标签下):
Processing Time:处理每个batch的时间
Scheduling Delay:每个batch在队列中等待前一个batch完成处理所等待的时间
若Processing Time的值一直大于Scheduling Delay,或者Scheduling Delay的值持续增长,代表系统已经无法处理这样大的数据输入量了,这时就需要考虑各种来增强系统的负载。
与RDD一样,DStream同样也能通过persist()方法将数据流存放在内存中,这样做的好处是遇到需要多次迭代计算的程序时,速度优势十分的明显。而对于上文中提到的各种window原语,其默认的持久化策略就是保存在内存中。
当数据源来自于网络时(例如通过Kafka、Flume、sockets等等),默认的持久化策略是将数据保存在两台机器上,这也是为了容错性而设计的。
(转载本站文章请注明作者和出处
,请勿用于任何商业用途)
文章总数:78篇
评论总数:281条
分类总数:31个
标签总数:43个
运行时间:951天
大家好,欢迎来到。
这不是一个只谈技术的博客,这里记录我成长的点点滴滴,coding、riding and everthing!
最新评论功能现在的位置:
Spark Streaming原理简析
数据的接收
StreamingContext实例化的时候,需要传入一个SparkContext,然后指定要连接的spark matser url,即连接一个spark engine,用于获得executor。
实例化之后,首先,要指定一个接收数据的方式,如
val lines = ssc.socketTextStream("localhost", 9999)
这样从socket接收文本数据。这个步骤返回的是一个ReceiverInputDStream的实现,内含Receiver,可接收数据并转化为RDD放内存里。
ReceiverInputDStream有一个需要子类实现的方法
def getReceiver(): Receiver[T]
子类实现这个方法,worker节点调用后能得到Receiver,使得数据接收的工作能分布到worker上。
如果是local跑,由于Receiver接收数据在本地,所以在启动streaming application的时候,要注意分配的core数目要大于Receiver数目,才能腾出cpu做计算任务的调度。
Receiver需要子类实现
def onStart()def onStop()
来定义一个数据接收器的初始化、接收到数据后如何存、如何在结束的时候释放资源。
Receiver提供了一系列store()接口,如store(ByteBuffer),store(Iterator)等等。这些store接口是实现好了的,会由worker节点上初始化的ReceiverSupervisor来完成这些存储功能。ReceiverSupervisor还会对Receiver做监控,如监控是否启动了、是否停止了、是否要重启、汇报error等等。
ReceiverSupervisor的存储接口的实现,借助的是BlockManager,数据会以RDD的形式被存放,根据StorageLevel选择不同存放策略。默认是序列化后存内存,放不下的话写磁盘(executor)。被计算出来的RDD中间结果,默认存放策略是序列化后只存内存。
ReceiverSupervisor在做putBlock操作的时候,会首先借助BlockManager存好数据,然后往ReceiverTracker发送一个AddBlock的消息。ReceiverTracker内部的ReceivedBlockTracker用于维护一个receiver接收到的所有block信息,即BlockInfo,所以AddBlock会把信息存放在ReceivedBlockTracker里。未来需要计算的时候,ReceiverTracker根据streamId,从ReceivedBlockTracker取出对应的block列表。
RateLimiter帮助控制Receiver速度,spark.streaming.receiver.maxRate参数。
数据源方面,普通的数据源为file, socket, akka, RDDs。高级数据源为Twitter, Kafka, Flume等。开发者也可以自己定制数据源。
JobScheduler在context里初始化。当context start的时候,触发scheduler的start。
scheduler的start触发了ReceiverTracker和JobGenerator的start。这两个类是任务调度的重点。前者在worker上启动Receiver接收数据,并且暴露接口能够根据streamId获得对应的一批Block地址。后者基于数据和时间来生成任务描述。
JobScheduler内含一个线程池,用于调度任务执行。spark.streaming.concurrentJobs可以控制job并发度,默认是1,即它只能一个一个提job。
job来自JobGenerator生成的JobSet。JobGenerator根据时间,生成job并且执行cp。
JobGenerator的生成job逻辑:
- 调用ReceiverTracker的allocateBlocksToBatch方法,为本批数据分配好block,即准备好数据
- 间接调用DStream的generateJob(time)方法,制造可执行的RDD
DStream切分RDD和生成可执行的RDD,即getOrCompute(time):
- 如果这个时间点的RDD已经生成好了,那么从内存hashmap里拿出来,否则下一步
- 如果时间是批次间隔的整数倍,则下一步,否则这个时间点不切
- 调用DStream的子类的compute方法,得到RDD。可能是一个RDD,也可以是个RDD列表
- 对每个RDD,调用persist方法,制定默认的存储策略。如果时间点合适,同时调用RDD的checkpoint方法,制定好cp策略
- 得到这些RDD后,调用SparkContext.runJob(rdd, emptyFunction)。把这整个变成一个function,生成Job类。未来会在executor上触发其runJob
JobGenerator成功生成job后,调用JobScheduler.submitJobSet(JobSet),JobScheduler会使用线程池提交JobSet中的所有job。该方法调用结束后,JobGenerator发送一个DoCheckpoint的消息,注意这里的cp是driver端元数据的cp,而不是RDD本身的cp。如果time合适,会触发cp操作,内部的CheckpointWriter类会完成write(streamingContext, time)。
JobScheduler提交job的线程里,触发了job的run()方法,同时,job跑完后,JobScheduler处理JobCompleted(job)。如果job跑成功了,调用JobSet的handleJobCompletion(Job),做些计时和数数工作,如果整个JobSet完成了,调用JobGenerator的onBatchCompletion(time)方法,JobGenerator接着会做clearMetadata的工作,然后JobScheduler打印输出;如果job跑失败了,JobScheduler汇报error,最后会在context里抛异常。
transform:可以与外部RDD交互,比如做维表的join
updateStateByKey:生成StateDStream,比如做增量计算。WordCount例子
每一批都需要与增量RDD进行一次cogroup之后,然后执行update function。两个RDD做cogroup过程有些开销:RDD[K, V]和RDD[K, U]合成RDD[K, List[V], List[U]],List[U]一般size是1,理解为oldvalue,即RDD[K, batchValueList, Option[oldValue]]。然后update function处理完,变成RDD[K, newValue]。
批与批之间严格有序,即增量合并操作,是有序的,批之间没发并发
增量RDD的分区数可以开大,即这步增量的计算可以调大并发
window:batch size,window length, sliding interval三个参数组成的滑窗操作。把多个批次的RDD合并成一个UnionRDD进行计算。
foreachRDD: 这个操作是一个输出操作,比较特殊。
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
def foreachRDD(foreachFunc: (RDD[T], Time) =& Unit) { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
DStream.foreachRDD()操作使开发者可以直接控制RDD的计算逻辑,而不是通过DStream映射过去。所以借助这个方法,可以实现MLlib, Spark SQL与Streaming的集合,如:结合Spark SQL、DataFrame做Wordcount。
如果是window操作,默认接收的数据都persist在内存里。
如果是flume, kafka源头,默认接收的数据replicate成两份存起来。
Checkpoint
与state有关的流计算,计算出来的结果RDD,会被cp到HDFS上,原文如下:
Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depends on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.
cp的时间间隔也可以设定,可以多批做一次cp。
cp的操作是同步的。
简单的不带state操作的流任务,可以不开启cp。
driver端的metadata也有cp策略。driver cp的时候是将整个StreamingContext对象写到了可靠存储里。
EasyQuery的目标是不需要写一行java代码就可以实现非常非常复杂的查询,省时省力,提高效率。
【上篇】【下篇】
您可能还会对这些文章感兴趣!
籍贯山东,落户北京,IT行业。
工作经历:
2014年至今&,自主创业
,传智播客
,超人学院
,亚信科技
教育经历:
,中科院研究生院
,河北大学随笔 - 344
评论 - 183spark流数据处理:Spark Streaming的使用
&&&&您可以捐助,支持我们的公益事业。
每天15篇文章
不仅获得谋生技能
更可以追随信仰
spark流数据处理:Spark
Streaming的使用
来源:博客 发布于:&&
1690 次浏览
&&&&评价:
Spark Streaming是Spark核心API的扩展,用于可伸缩、高吞吐量、可容错地处理在线流数据。Spark
Streaming可以从很多数据源获取数据,比如:Kafka、Flume、Twitter、ZeroMQ、Kinesis或TCP连接等,并可以用很多高层算子(map/reduce/join/window等)来方便地处理这些数据。最后处理过的数据还可以推送到文件系统、数据库和在线监控页面等。实际上,你也可以在数据流上使用Spark的机器学习和图计算算法。
Spark Streaming内部工作机制概图如下所示。Spark
Streaming接收在线数据流并将其划分成批(batch),然后通过Spark引擎处理并最终得到由一批一批数据构成的结果流。
Spark Streaming流程概图
Spark Streaming将流数据抽象为离散化流(discretized stream),即DStream。DStream可以从输入数据流创建也可以从其他的DStream转换而来。DStream在内部被表示为一个连续的RDD序列。
首先以一个简单的示例开始:用Spark Streaming对从TCP连接中接收的文本进行单词计数。
/** * 功能:用spark streaming实现的针对流式数据进行单词计数的程序。 *
该程序只是对数据流中的每一批数据进行单独的计数,而没有进行增量计数。 * 环境:spark 1.6.1, scala 2.10.4 */
// 引入相关类库
import org.apache.spark._
import org.apache.spark.streaming._
object NetworkWordCount {
def main(args: Array[String]) {
// Spark Streaming程序以StreamingContext为起点,其内部维持了一个SparkContext的实例。
// 这里我们创建一个带有两个本地线程的StreamingContext,并设置批处理间隔为1秒。
val conf = new SparkConf().setMaster(&local[2]&).setAppName(&NetworkWordCount&)
val ssc = new StreamingContext(conf, Seconds(1))
// 在一个Spark应用中默认只允许有一个SparkContext,默认地spark-shell已经为我们创建好了
// SparkContext,名为sc。因此在spark-shell中应该以下述方式创建StreamingContext,以
// 避免创建再次创建SparkContext而引起错误:
// val ssc = new StreamingContext(sc, Seconds(1))
// 创建一个从TCP连接获取流数据的DStream,其每条记录是一行文本
val lines = ssc.socketTextStream(&localhost&,
// 对DStream进行转换,最终得到计算结果
val res = lines.flatMap(_.split(& &)).map((_,
1)).reduceByKey(_ + _)
// 打印该DStream中每个RDD中的前十个元素
res.print()
// 执行完上面代码,Spark Streaming并没有真正开始处理数据,而只是记录需在数据上执行的操作。
// 当我们设置好所有需要在数据上执行的操作以后,我们就可以开始真正地处理数据了。如下:
ssc.start() // 开始计算
ssc.awaitTermination() // 等待计算终止
为了测试程序,我们得有TCP数据源作为输入,这可以使用Netcat(一般linux系统中都有,如果是windows系统,则推荐你使用Ncat,Ncat是一个改进版的Netcat)。如下使用Netcat监听指定本地端口:
nc -lk 9999
如果是使用Ncat,则对应命令如下:
ncat -lk 9999
在IntelliJ IDEA或Eclipse中可以本地运行测试上述Spark
Streaming程序,该程序会连接到Netcat(或Ncat)监听的端口,你可以在运行Netcat(或Ncat)的终端中输入东东并回车,然后就可以看到该Spark
Streaming程序会马上输出处理结果,并且这个处理是不停的、流式的。
注意:上述示例只是对数据流中的每一批数据进行单独的计数,而没有进行增量计数。
StreamingContext
StreamingContext是Spark Streaming程序的入口点,正如SparkContext是Spark程序的入口点一样。
StreamingContext中维护了一个SparkContext实例,你可以通过ssc.sparkContext来访问它。该SparkContext实例要么在创建StreamingContext时被传入,要么在StreamingContext内部根据传入的SparkConf进行创建,这取决于你所使用的StreamingContext构造函数,请观看API文档。
关于DStream
Spark Streaming将流数据抽象为离散化流(discretized
stream),即DStream。DStream在内部被表示为一个连续的RDD序列,每一个RDD包含了一个固定时间间隔内数据源所产生的数据,如下图所示。
对DStream所进行的操作将被转换为对底层RDD的操作。例如,在前面的流数据单词计数示例程序中,lines.flatMap(_.split(&
&))语句中的flatMap算子就被应用到lines DStream中的RDD以生成words
DStream中的RDD,如下图所示。
InputDStream和Receiver
InputDStream代表输入数据流,除了file stream和queue RDD stream,其他的输入流都和一个Receiver相关联(具体地是对应ReceiverInputDStream类,其内部会启动一个Receiver),Receiver工作在一个worker节点上,用于接收相应数据源的流数据并将其存储在内存中(取决于创建StreamingContext时指定的存储级别)以供处理。
我们也可以创建多个InputDStream来连接多个数据源,其中的ReceiverInputDStream都将启动Receiver来接收数据。一个Spark
Streaming应用程序应该分配足够多的核心(local模式下是线程)去运行receiver(s)并处理其接收的数据。当我们以本地模式运行Spark
Streaming程序时,master URL不能指定为local或者local[1](Spark Streaming会启动一个线程运行receiver,只有一个线程将导致没有线程来处理数据),而应该是local[n],这个n应该大于receiver的个数。在集群中运行Spark
Streaming程序时,同样道理,也需要分配大于receiver的个数的核心数。
基本数据源
Spark Streaming提供了从很多数据源获取流数据的方法,一些基本的数据源可以通过StreamingContext
API直接使用,主要包括:文件系统、网络连接、Akka actors等。
文件数据流
StreamingContext提供了从兼容于HDFS API的所有文件系统中创建文件数据输入流的方法,如下:
ssc.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
ssc.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
文件流没有receiver。Spark Streaming将监控对应目录(但不支持嵌套目录),并处理在该目录中创建的任何文件(以.开头的将被忽略)。监控目录中的文件必须有相同的数据格式。监控目录中的文件如果被修改(比如以append方式写入),这些修改将不会被读取,因此正确的方式应该是先在其他目录中写好这些文件并将其移动或者重命名到该监控目录。
对于简单的文本文件,可以使用更简单的方法,如下:
ssc.textFileStream(dataDirectory)
ssc.textFileStream(dataDirectory)
网络数据流
网络连接流可以使用ssc.socketStream()或ssc.socketTextStream()创建,详情参见API文档。
Akka Actor流
可以通过ssc.actorStream()创建一个从Akka actor接收数据流的ReceiverInputDStream。更多参见API文档和自定义Receiver指南。
我们也可以用ssc.queueStream()创建一个基于RDD序列的InputDStream。序列中的每一个RDD将被作为DStream中的一个数据批,这通常在测试你的Spark
Streaming程序时非常有用。
高级数据源
对于Kafka、Flume、Kinesis、Twitter等这些高级数据源,则需要添加外部依赖,关于依赖参见这里。
下面给出一些关于高级数据源集成方法的参考链接:
Kafka: Kafka Integration Guide
Flume: Flume Integration Guide
Kinesis: Kinesis Integration Guide
自定义数据源
你也可以自定义数据源,只需要实现一个自己的receiver从自定义数据源接收数据并将其推送到Spark。详情参见:Custom
Receiver Guide。
Receiver可靠性
依据可靠性可将Receiver分为两类。可靠Receiver带有传输确认机制(ACK机制),可以确保数据在传输过程中不会丢失,Kafka和Flume等在ACK机制开启的情况下就是可靠的。不可靠Receiver不带有传输确认机制,包括不支持ACK机制和支持ACK但关闭的情形。
DStream转换算子
DStream支持很多和RDD类似的转换算子(transformation)(这些转换算子和RDD中的一样,都是lazy的),完整的算子列表参见API文档中的DStream和PairDStreamFunctions,下面列出一些常用的:
下面对其中一些操作进行更详细的说明。
updateStateByKey
updateStateByKey操作允许你用持续不断的新信息来更新你所维护的某些状态信息。为了使用这个算子,通常你需要如下两个步骤:
1、定义状态:状态可以是任意数据类型。
2、定义状态更新函数:根据先前的状态数据和流数据中的新数据如何更新状态数据。
在每一批数据中,Spark都将在所有已经存在的key上应用状态更新函数,即使在这批数据中没有某些key对应的数据。如果状态更新函数返回None,那么对应的key-value对将被移除。
在快速示例中,我们只是对每一批数据进行单独的单词计数,在这里我们就可以通过updateStateByKey算子进行增量计数了。
需要注意的是使用updateStateByKey算子要求已经配置了检查点目录,参见检查点部分。
transform操作允许在DStream上应用任意RDD-to-RDD函数,这样你就可以方便地使用在DStream
API中没有的却在RDD API中存在的算子来转换DStream中的每一个RDD了。例如,在DStream
API中不存在将数据流中的每一批数据(一个RDD)与其他数据集进行join的操作,此时就可以通过transform算子+RDD的join算子来实现。
需要注意的是传入transform的函数每次应用在一批数据(一个RDD)上,这意味着你可以根据时间变化在不同的RDD上做不同的处理,也就是说RDD操作、RDD分区数、广播变量等在不同的批之间都可以改变。
Spark Streaming也提供了基于窗口的计算,它允许你在一个滑动窗口上使用转换操作,滑动窗口如下图所示。
窗口是基于时间滑动的,窗口操作新形成的DStream中的每一个RDD包含了某一滑动窗口中的所有数据。任何窗口操作都需要指定如下两个参数:
窗口长度:它必须是源DStream批处理间隔的整数倍。
滑动间隔:它必须是源DStream批处理间隔的整数倍。
一些常用的窗口操作算子如下:
需要强调的是,上述某些操作(如reduceByWindow和reduceByKeyAndWindow等)有一些特殊形式,通过只考虑新进入窗口的数据和离开窗口的数据,让Spark增量计算归约结果。这种特殊形式需要额外提供一个规约函数的逆函数,比如+对应的逆函数为-。对于较大的窗口,提供逆函数可以大大提高执行效率。
Stream-stream joins:
val stream1: DStream[String, String] = ...val stream2: DStream[String, String] = ...val joinedStream = stream1.join(stream2)
上述代码对两个DStream进行join操作,在每一个批处理间隔,stream1产生的一个RDD将和stream2产生的一个RDD进行join操作。另外,还有其他一些join操作:leftOuterJoin、rightOuterJoin和fullOuterJoin。也可以进行基于窗口的join操作,如下:
val windowedStream1 = stream1.window(Seconds(20))val windowedStream2 = stream2.window(Minutes(1))val joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins:
这种类型的join可以通过transform()来实现。如下代码将一个分窗的stream和一个数据集进行join:
val dataset: RDD[String, String] = ...val windowedStream = stream.window(Seconds(20))...val joinedStream = windowedStream.transform { rdd =& rdd.join(dataset) }
DStream输出操作
输出操作允许将DStream中的数据推送到外部系统,比如数据库和文件系统。和RDD的action算子类似,DStream的输出操作用来触发所有转换操作的执行。下面列出主要的输出操作:
foreachRDD使用注意事项
通常,将数据写到外部系统需要创建一个网络连接。不经意间,你很可能在driver节点创建一个连接对象,然后在试着在executor节点使用这个连接,如下:
dstream.foreachRDD { rdd =&
val connection = createNewConnection()
// 在driver节点执行
rdd.foreach { record =&
connection.send(record) // 在executor节点执行
上述代码是错误的,因为创建的连接对象将被序列化然后传输到worker节点,而连接通常是不能在机器之间传递的。这个错误可能显示为序列化错误(连接对象不可序列化)、初始化错误(连接对象需要在worker节点初始化)等等。
改进的办法是在worker节点创建连接对象。然而,这可能导致另一个常见错误――为每一条记录创建一个连接,如下:
dstream.foreachRDD { rdd =&
rdd.foreach { record =&
val connection = createNewConnection()
connection.send(record)
connection.close()
为每一条记录创建一个连接将消耗掉大量系统资源,极大地降低了系统效率。一个更好的方法是使用rdd.foreachPartition()为每一个RDD分区创建一个连接,如下:
dstream.foreachRDD { rdd =&
rdd.foreachPartition { partitionOfRecords =&
val connection = createNewConnection()
partitionOfRecords.foreach(record =& connection.send(record))
connection.close()
最后,还可以进一步优化――在多个批数据(RDD)之间重用连接对象。我们可以通过一个静态的lazy的连接池来保存连接,以供在多批数据之间共用连接对象,如下:
dstream.foreachRDD { rdd =&
rdd.foreachPartition { partitionOfRecords =&
// 连接池需是静态的,并且初始化需是lazy的
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record =& connection.send(record))
ConnectionPool.returnConnection(connection)
// 将连接归还,以便将来重用
注意,上述连接池中的连接应该是按需创建的(lazy的),并且最好将长期不用的连接关闭(超时机制)。
DStream的转换操作是lazy的,输出操作触发实质的计算。具体地说是DStream输出操作内部的RDD行动操作强制处理接收到的数据。因此,如果一个程序没有任何输出操作,或者有像foreachRDD()这样的输出操作但其中没有任何RDD行动操作,那么该程序就不会执行任何计算,它将简单地接收数据然后丢弃掉。
缺省情况下,输出操作一次执行一个,并且是按照应用程序定义的前后顺序执行的。
累加器和广播变量
在Spark Streaming中,累加器(Accumulator)和广播变量(Broadcast)不能从检查点(checkpoint)中恢复。如果你采用检查点机制(检查点将切断RDD依赖)并且也用了累加器或广播变量,为了在突发异常并重启driver节点之后累加器和广播变量可以被重新实例化,你应该为它们创建lazy实例化的单例对象。示例如下:
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]]
def getInstance(sc: SparkContext): Broadcast[Seq[String]]
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq(&a&, &b&,
instance = sc.broadcast(wordBlacklist)
object DroppedWordsCounter {
@volatile private var instance: Accumulator[Long]
def getInstance(sc: SparkContext): Accumulator[Long]
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, &WordsInBlacklistCounter&)
wordCounts.foreachRDD((rdd: RDD[(String, Int)],
time: Time) =& {
// 获取或注册blacklist广播变量
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// 获取或注册droppedWordsCounter累加器
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// 根据blacklist来移除words,并用droppedWordsCounter来计数
val counts = rdd.filter { case (word, count) =&
if (blacklist.value.contains(word)) {
droppedWordsCounter += count
}.collect()
val output = &Counts at time & + time
+ & & + counts
DataFrame和SQL操作
在流数据处理中也可以使用DataFrame和SQL。此时你必须用StreamingContext正在使用的SparkContext实例来创建一个SQLContext。为了使程序可以在driver故障重启之后可以继续运行,我们应该创建一个lazy实例化的SQLContext的单例对象。示例如下:
/** 在Spark Streaming程序中进行DataFrame操作 */
val words: DStream[String] = ...
words.foreachRDD { rdd =&
// 获取SQLContext的单例对象
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val wordsDataFrame = rdd.toDF(&word&)
wordsDataFrame.registerTempTable(&words&)
val wordCountsDataFrame =
sqlContext.sql(&select word, count(*) as
total from words group by word&)
wordCountsDataFrame.show()
你也可以在流数据中使用由MLlib提供的机器学习算法。首先你要知道的是,有一些流式机器学习算法(例如Streaming
Linear Regression、Streaming KMeans等),它们可以从流数据中学习得到模型,也可以将学到的模型应用到流数据中。除此之外,对于大量的机器学习算法,你可以通过历史数据离线地学习得到一个模型,并将模型应用到在线的流数据中。
持久化(缓存)
和RDD相似,DStream也允许将流数据持久化,简单地在DStream上调用persist()将自动地将其代表的每一个RDD缓存下来。如果同一个DStream中的数据要被使用多次,将DStream缓存下来将是非常有益的。
对于window-based操作(如reduceByWindow、reduceByKeyAndWindow等)和state-based操作(如updateStateByKey),DStream将被隐式地持久化,因此你就不必自己手动调用persist()了哦。
对于从网络获取数据的情况(如TCP连接、Kafka、Flume等),出于容错的考虑,缺省的持久化级别是将数据复制到两个节点。
注意:和RDD不同的是,DStream的缺省持久化级别是将数据序列化并存储到内存中。
流处理程序通常是7*24小时不间断运行的,因此必须是可以从故障中恢复的。为了可以从故障中恢复,Spark
Streaming需要在可容错的存储系统中checkpoint足够的信息。有两类数据被checkpoint。
1、Metadata checkpointing: 将定义流计算的信息保存到可容错的存储系统中,如HDFS。这被用作Spark
Streaming driver节点从故障中恢复。Metadata包括:Streaming程序的配置信息、作用在DStream上的操作集合、还未处理完的batch。
2、Data checkpointing: 将产生的RDD保存到可靠的存储系统,如HDFS。这对于一些stateful的转换操作(如updateStateByKey、reduceByKeyAndWindow等)是必须的,这些操作需要结合多个batch的数据来进行处理,新产生的RDD依赖于先前几个batch对应的RDD,这将导致依赖链随着时间持续变长。为了避免无边界的增长,stateful转换操作的中间结果(RDD)会被周期性地checkpoint到可靠的存储系统中,这样就可以切断依赖链了。
何时启用Checkpointing:
1、使用了stateful转换操作:这种情况下必须配置checkpoint目录,以便周期性的RDD
checkpointing。
2、driver节点从故障恢复:如果你想使driver节点可从故障恢复,就要配置checkpoint目录。如果无所谓故障恢复,checkpoint目录就不是必须的,此时如果程序中没有stateful转换操作,那么就无需配置checkpoint目录。
怎样配置Checkpointing:
可以使用ssc.checkpoint()来设置checkpoint目录,这样你就可以在程序中使用stateful的转换操作了,如果你想使程序可以从driver节点的故障中恢复,你应该重写你的程序以支持以下行为:
1、当程序首次启动,创建一个StreamingContext,并在设置一些东东之后调用start()。
2、当程序从故障中恢复,从checkpoint数据中重建一个StreamingContext。
上述行为可以通过ssc.getOrCreate()来辅助实现,示例如下:
// 创建并设置一个新的StreamingContext对象def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)
val lines = ssc.socketTextStream(...)
ssc.checkpoint(checkpointDirectory)
// 如果检查点存在,则根据检查点创建一个StreamingContext;
// 否则,根据提供的函数创建一个新的StreamingContext。
val context =
StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)
// 在这里做一些额外的设置。这里的设置和首次启动还是故障恢复无关。
context. ...
// 开始运行
context.start()
context.awaitTermination()
需要注意的是,checkpoint的时间间隔需要仔细考虑,过小或过大的时间间隔都可能导致问题。通常,checkpoint的时间间隔最好是DStream的批处理时间间隔的5-10倍。dstream.checkpoint()可用来设置checkpoint的时间间隔,同时对于那些没有默认地进行checkpointing的DStream(非stateful转换操作生成的DStream),这也将引起周期性地checkpoint该DStream中的RDD。
除了Spark提供的一些监控能力外,Spark Streaming还提供了一些额外的监控能力。当一个Spark
Streaming程序运行时,Spark应用程序监控页面(通常是 http://master:4040
)将多出一个Streaming选项卡,其中展示了receiver和已完成的batch的统计信息。其中有两个信息非常有用:
1、Processing Time: 一批数据的处理时间。
2、Scheduling Delay: 一批数据在队列中等待的时间。
如果处理一批数据的时间持续高出批处理间隔,或者等待时间持续增加,通常意味着你的系统的处理速度跟不上数据产生的速度。此时,你可以考虑削减批数据的处理时间,参见性能调优部分。
你也可以通过StreamingListener接口来监听Spark Streaming程序的执行状况,包括:receiver状态、处理时间等等。
削减批数据处理时间
数据接收的并行度
通过网络接收的数据(Kafka、Flume、socket等)需要经过解序列化然后存储在Spark中。如果数据接收成为瓶颈,你就需要考虑增加数据接收的并行度。注意每一个Input
DStream只会创建一个receiver(运行在worker节点)用于接收一个数据流。你可以创建多个Input
DStream并配置它们来接收数据源的不同部分以增加数据接收并行度。
例如,如果一个Kafka input DStream接收两个主题的数据导致系统瓶颈的话,可以将Kafka输入流划分为两个,然后创建两个Input
DStream,每一个接收一个主题的数据流。这样的话数据接收就可以并行进行了,从而增加了系统的吞吐量。这两个DStream可以被union成为一个单一的DStream,后续的转换操作将作用在统一的数据流之上。示例如下:
val numStreams = 5val kafkaStreams = (1 to numStreams).map { i =& KafkaUtils.createStream(...) }val unifiedStream = streamingContext.union(kafkaStreams)unifiedStream.print()
数据处理的并行度
另一个需要考虑的是receiver的数据块划分间隔,这可以通过spark.streaming.blockInterval进行设置。receiver会将接收到的数据合并为数据块然后存储到Spark内存中。每一批数据中数据块的数量决定了Task的数量(通常大约是:批处理间隔/块间隔)。过少的Task数将导致集群资源利用率降低,如果出现这种情况,你应该试着去减小块划分间隔,我们推荐的块划分间隔的最小值是大约50ms,过小的话也将导致一些问题。
另一个增加并行度的方法是在处理数据之前,使用inputStream.repartition()明确地将Input
DStream重新分区,这样新形成的DStream中的RDD都将有指定数量的分区。
对于一些操作,如reduceByKey()和reduceByKeyAndWindow()等,也可以传递一个分区数参数来控制计算的并行度。
数据序列化优化
在Spark Streaming程序中,输入数据和持久化的数据默认都经过序列化处理并缓存在内存中。
序列化优化可以明显提高程序的运行效率,参见我的Spark使用总结一文的序列化部分。
在一些特殊情况下,需要保存在内存中的流数据可能不是很大,这时可以设置存储级别以非序列化的形式存储在内存中,如果这不会引起过大的GC开销,那么将提高程序的性能。
设置正确的批处理间隔
批处理间隔的设置对流处理程序是非常关键的,这可能影响到输入流能否被迅速地流畅地持续地处理。
恰当的批处理间隔通常和数据产生速度以及集群计算能力相关。通常来说,如果我们想了解一个流处理程序的处理速度能否跟得上数据产生的速度,可以查看Spark应用监控页面(
http://master:4040 )。对于一个稳定的流处理程序来说,批处理时间(Processing
Time)应该小于设置的批处理间隔时间(Batch Interval),并且Batch的调度延迟时间(Scheduling
Delay)是相对平稳的(持续增加就意味着跟不上数据产生速度了,但瞬时的增加并不意味着什么)。
已经证实,对于大多数应用来说,500ms是比较好的最小批处理间隔。
内存优化对于一个Spark Streaming程序来说也很重要,这主要包括:内存使用优化以及GC优化。
更多课程...&&&
每天2个文档/视频
扫描微信二维码订阅
订阅技术月刊
获得每月300个技术资源
|&京ICP备号&京公海网安备号

我要回帖

更多关于 spark rdd persist 的文章

 

随机推荐