spark中rdd的flatmap操作是一个spark transform函数,为什么也会有shuffle

【Spark二八Spark Shuffle读过程源代码代码剖析 - bit1129的博客 - ITeye博客
博客分类:
Spark的shuffle读操作是性能杀手,原因是Shuffle读操作需要从多个Map节点拉取数据到Reduce节点,所有的Reduce结果是否还要经过一次总计算?
package spark.examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkWordCount {
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
val conf = new SparkConf()
conf.setAppName("SparkWordCount")
conf.setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("file:///D:/word.in")
println(rdd.toDebugString)
val rdd1 = rdd.flatMap(_.split(" "))
println("rdd1:" + rdd1.toDebugString)
val rdd2 = rdd1.map((_, 1))
println("rdd2:" + rdd2.toDebugString)
val rdd3 = rdd2.reduceByKey(_ + _);
println("rdd3:" + rdd3.toDebugString)
rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
1. ResultTask的runTask方法
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) =& U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
2. 首先反序列化得到rdd和func,其中rdd是该stage的最后一个RDD(final RDD),而func是就是PairRDDFunctions.scala的saveAsHadoopDataset方法内部定义的writeToFile函数。writeToFile函数实现了wordcount最终的写磁盘操作(rdd3.saveAsTextFile)。在调用writeToFile写磁盘之前,需要首先从Mapper节点读取到数据,然后对它进行整合,这个在ResultTask的runTask方法的最后一句 func(context, rdd.iterator(partition, context))中的rdd.iterator方法实现的。
这在Spark Shuffle写操作过程中分析,就是读取级联的读取它的父RDD的compute方法完成读取的读取操作
3.数据读取操作(rdd.iterator)
3.1 RDD类定义的iterator的模板方法,
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) { ///如果这个RDD的StorageLevel不为NONE,那么逻辑是什么?
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else { //否则计算或者读取Checkpoint检查点
computeOrReadCheckpoint(split, context)
3.2 WordCount的第二个Stage总共包含两个RDD,最后一个是MappedRDD,第一个是ShuffledMapRDD,MappedRDD从ShuffledMapRDD获取具体的数据,然后执行MappedRDD自身携带的函数,这个函数的定义是在RDD的saveAsTextFile的函数x=&(NullWritable.get(), new Text(x.toString))
def saveAsTextFile(path: String) {
this.map(x =& (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
3.3 MappedRDD的compute方法
override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f) ///调用它的parent RDD的iterator方法,这里是ShuffledRDD
3.4 ShuffledRDD的compute方法
///split是当前要计算的分片
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
//ShuffleManager是SortShuffleManager、
///获取的ReaderHashShuffleReader,注意此处虽然是Sort based Shuffle,但是读仍然是HashShuffleReader
///获取reader的参照是shuffleHandle,分区的头尾,Spark只支持一次读取一个分片
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.asInstanceOf[Iterator[(K, C)]]
3.5 SortShuffleManager的getReader方法
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
// We currently use the same block store shuffle fetcher as the hash-based shuffle.
new HashShuffleReader(////构造时,只需要提供ShuffleHandle,parttition开始结束位
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
3.6 HashShuffleReader的read方法
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val ser = Serializer.getSerializer(dep.serializer)///获取序列化器
///关键方法,根据shuffleId获取迭代数据的Iterator
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
///对数据进行aggregate,得到一个整合后的Iterator
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
////Reduce端进行结果合并,map端是否已经combine过,采用两种不同的方式
////如果map端做了combine,那么调用combineCombinersByKey,
if (dep.mapSideCombine) {
new InterruptibleIterator(context, dep.bineCombinersByKey(iter, context))
////如果map端没combine,那么调用combineValuesByKey,这个应该是跟map端做combine使用相同的方法
new InterruptibleIterator(context, dep.bineValuesByKey(iter, context))
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair =& (pair._1, pair._2))
// Sort the output if there is a sort ordering defined.
///继续对数据进行排序操作,如果定义了keyOrdering,如果没有定义keyOrdering,那么如何定义之?
///对Key进行排序?
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =&
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
sorter.iterator
case None =&
aggregatedIter
3.7 BlockStoreShuffleFetcher伴生对象只定义了一个fetch方法
private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
shuffleId: Int, ////shuffleId,由dep.shuffleHandle.shuffleId提供
reduceId: Int,
///startPartition被理解为reduceId?
context: TaskContext,
serializer: Serializer)
: Iterator[T] =
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager //获取blockManager
val startTime = System.currentTimeMillis
///根据shuffleId和reduceId获取statuses对象,因为MapOutputTracker存放在SparkEnv中,SparkEnv类似于集群级别的共享变量
///简单的说就是获取MapOutputputLocation
/// Called from executors to get the server URIs and output sizes of the map outputs of a given shuffle.
///statuses是一个二元元组的集合,每个元组的第一个元素是BlockManagerId对象(包含三方面的信息,executorId_,host_,port_),第二个是数据的长度
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
////BlockManagerId封装了什么信息?
///ArrayBuffer的元素是Tuple二元组类型,分别是Int和Long类型
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
//statuses.zipWithIndex是个二元组,第一元素又是一个二元元组
///address表示BlockManagerId对象
///size表示数据大小
///index表示索引?有什么用?
for (((address, size), index) &- statuses.zipWithIndex) {
///这是什么操作?根据address获取一个二元组,然后把(index,size)赋值给它
///splitsByAddress此时有一个元素(address,(index,size))
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
///构造blockByAddress,得到一个Seq,元素类型是元组:(BlockManagerId, Seq[(BlockId, Long)])
///元组的第一个元素是address,元组的第二个元素是一个新元组的集合,每个元组的第一个是一个元素是BlockId(由shuffleId,index, reduceId),第二个元素是数据的size
val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
case (address, splits) =&
(address, splits.map(s =& (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
///定义一个内部函数
def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Success(block) =& {
block.asInstanceOf[Iterator[T]]
case Failure(e) =& {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =&
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block", e)
///这是什么类?
val blockFetcherItr = new ShuffleBlockFetcherIterator(
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
serializer,
SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
///对blockFetcherItr集合ununpackBlock方法然后压平结果
val itr = blockFetcherItr.flatMap(unpackBlock)
///构造 CompletionIterator对象?
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
context.taskMetrics.updateShuffleReadMetrics()
///包装了completionIter迭代器,组合模式
new InterruptibleIterator[T](context, completionIter)
3.8 ShuffleBlockFetcherIterator类
a. 从下面的类文档中可以看出,ShuffleBlockFetcherIterator类,用于fetch数据块,对于位于本地的block,从本地BlockManager获取;对于远端的数据块,使用BlockTransferService获取数据(BlockTransferService使用Netty作为底层的数据传输模块)
b. 最后的结果是获取了一个迭代器,迭代的每条记录是一个(BlockId,values)元组
c. throttle的含义是节流阀
* An iterator that fetches multiple blocks. For local blocks, it fetches from the local block
* manager. For remote blocks, it fetches them using the provided BlockTransferService.
* This creates an iterator of (BlockID, values) tuples so the caller can handle blocks in a
* pipelined fashion as they are received.
* The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid
* using too much memory.
* @param context [[TaskContext]], used for metrics update
* @param shuffleClient [[ShuffleClient]] for fetching remote blocks
* @param blockManager [[BlockManager]] for reading local blocks
* @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]].
For each block we also require the size (in bytes as a long field) in
order to throttle the memory usage.
* @param serializer serializer used to deserialize the data.
* @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
d.在BlockStoreShuffleFetcher中,ShuffleBlockFetcherIterator的构造如下,对比下构造函数
注意ShuffleClient是从SparkEnv的BlockManager获取的,
val blockFetcherItr = new ShuffleBlockFetcherIterator(
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
serializer,
SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
e. ShuffleClient在BlockManager中的定义
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTranserService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { //启用了外部ExternalShuffleService
val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
blockTransferService //否则使用blockTransferService: BlockTransferService,在Spark1.2中使用了基于Netty的数据传输NettyBlockTransferService
3.9 BlockStoreShuffleFetcher的fetch方法调用blockFetcherItr.flatMap(unpackBlock)解析
在Reduce阶段,还会执行combine操作,进行深度的合并的操作,这是必然的,假如不同的Mapper上有多个单词A,那么计数的结果应该是把所有Mapper上的A进行合并得到最后的结果。
浏览: 365035 次
来自: 北京
关于第一个reduceByKey对应的cache,shuffl ...
看了你的文章,updateStateByKey 这个方式的使用 ...
棒极啦,解决了我的问题。
你好,这个代码生成主要在,那个地方使用。
看楼主这么厉害的样子,请问楼主如何知道类库的版本呢?比如g++ ...苹果/安卓/wp
积分 325, 距离下一级还需 125 积分
权限: 自定义头衔, 签名中使用图片
道具: 彩虹炫, 涂鸦板, 雷达卡, 热点灯, 金钱卡, 显身卡, 匿名卡下一级可获得
道具: 抢沙发
购买后可立即获得
权限: 隐身
道具: 金钱卡, 彩虹炫, 雷达卡, 热点灯, 涂鸦板
上火签到天数: 137 天连续签到: 1 天[LV.7]常住居民III
本帖最后由 无量天尊Spark 于
17:48 编辑
一、spark运行原理
Spark是一个分布式(很多机器,每个机器负责一部部分数据),基于内存(内存不够可以放在磁盘中),特别适合于迭代计算的计算框架。基于内存(在一些情况下也会基于磁盘),优先考虑放入内存中,有更好的数据本地性。如果内存中放不完的话,会考虑将数据或者部分数据放入磁盘中。擅长迭代式计算是spark的真正精髓。基于磁盘的迭代计算比hadoop快 10x倍,基于内存的迭代计算比hadoop 快100x倍。
Driver端,就是写好的本地程序提交到特定的机器上。
Spark开发语言说明:国内开发程序spark程序有些使用java开发,1、人才问题java开发人员很多(scala开发人员较少)2、 整合更加容易3、 维护更加容易4、但是要更好的掌握spark还是需要用scala写spark,因为java写起来太繁琐了而且有些功能实现起来很困难。
Spark组件,如图1所示:
17:31:05 上传
图 1处理数据来源:hdfs 、hbase、hive 、db。Hivee 包括数据仓库和计算引擎,sparksql只能取代hive的计算引擎。处理数据输出:hdfs 、hbase、hive 、db、s3(云)。还可以直接输出给客户端(dirver端)。
二、RDD 解密
1)Spark中一切基于RDD,RDD 是弹性分布式的数据集。例如1P的数据的处理,处理时是分布式的,分成了很多分片分布在几百或者上千台的机器上。存储时默认存储在内存中,如果内存中放不下会放到磁盘上。RDD 本身有一系列的数据分片,一个RDD逻辑上抽象的代表了底层的一个输入文件(或者文件夹),但是实际是按照分区(patition)分为多个分区,分区会放到spark 集群不同的机器的节点上。假设有1亿数据,每台机器放100万条,需要100万台机器。而且每台机器的100万条数据按照patition(特定规模的一个数据集合)来管理。2)RDD 的弹性特点A) 自动的进行内存和磁盘的存储的切换;B) 基于Lineage 的高效容错。例如作业链条油1000个步骤,在901个步骤出错,由于具有血统关系会在第900 个步骤重新计算,而不是从第一个步骤计算。提高了错误恢复速度;C) Task如果失败会自动进行特定次数的重试。默认4次;D) Stage如果失败会自动进行特定次数的重试,task 底层尝试了好几次都失败,这时候整个阶段就会失败,整个阶段肯定有很多并行的数据分片,数据分片的计算逻辑是一样的,只是处理的数据不同。再次提交这个stage 的时候,除了再次自动提交体现弹性表现以外,更重要的是再次提交这个stage 的时候,如果说这个stage 中(假设有100万数据分片),有5个失败,再次提交stage 时会查看其它任务有没有输出,如果有就不在提交这些任务,只会提交那失败的5个任务,这是会非常的高效(只计算运算失败的分片)。默认3次。
问题:spark的中间数据都在内存中,不在硬盘中,如何得到失败前一步的结果?回答:数据优先考虑存在内存中,如果内存不够用,会存在磁盘中。不是每个步骤都做缓存。缓存的条件:任务特别耗时、任务计算链条很长、Shuffle之后,checkpoint之前。
三、RDD 实战
1、从hdfs 上读取数据val data = sc.textfile(“/library/wordcount/input/Data”)
17:35:34 上传
图 2返回的内容为RDD 类型,RDD泛型为String 因为sc.textFile读取的文本。读取的一行一行的数据,一行一行的数据是数据分片,RDD 是一系列的数据分片的。数据分片中每个分片中每一行是String类型的。查看依赖关系:data.toDebugString
17:37:10 上传
图 3textFile 从Hadoop 读取数据所以产生了 HadoopRdd,然后进行map,为什么进行map呢?因为获取了每个的数据分片,我们只对每个数据分片的每一行内容感兴趣,不会对数据分片的没一行key(行的索引)感兴趣。所以进行map 操作,产生mappartitionRDD 。这次产生了2个RDD。有时候会产生1个或者更多RDD。sc.textfile 从hdfs 分布式文件系统中,读取我们需要的具体的数据,而这个数据是一系列分片的方式分布在不同的机器上。怎么证明分布在不同的机器上呢?执行 data.count(action 级别的操作)运行的结果会搜集给driver 端。
17:40:10 上传
图 44个exector,(node_local 代表数据在本地磁盘上,process_local 数据在本地的内存中)。真正的计算分布在各个机器上,散落在集群上不同的机器上,数据要符合数据本地性,所以数据也分布在集群中不同的机器上。这个证明了RDD 在逻辑上代表了hdfs 上的文件,实际上是很多数据分片,这些数据分片散落在spark集群中不同的节点上。计算的时候要符合数据本地性,所以就是数据不动,代码动。所以也是在做并行的计算的。
问题 :hdfs 的分片和spark rDD的分区的关系?Spark 在读取数据的时候 一个partition 默认对应hdfs中第一个block,默认大小是128M。注:由于数据可能会跨block 存储,所以一个partition可能是128M+10个字节,下一个partition 是128M-10个字节。如果 hdfs 中路径不存在,在textfile 时时不会报错的因为是transformation级别的操作(是lazy操作)。如图5所示:
17:41:15 上传
图 5但是如果进行 data.count 这个时候要发生计算,是action级别的。找不到相应的hdfs目录就会报错。如图6所示:
19:30:52 上传
2、进行单词切分 val flatted= data.flatMap(_.split(“”)) 对每行数据 以空格进行切分,又会产生一个MapPartitionsRdd 执行结果如图7所示:
19:36:45 上传
图 7查看依赖关系&&faltted.toDebugString .自己有个MappatitionRDD把之前依赖的RDD 也都列出来了。说明RDD 之间是由依赖关系的。如图8所示:
19:39:40 上传
3、对每个单词计数为1&&val mapped=flatted.map(word =&(word,1)) 写匿名函数,执行后又产生一个RDD 里面的key 是字符串,value 是整数,所以产生的是RDD[(String,Int)]。如图9所示:
19:42:26 上传
图 9查看RDD 的依赖 mapped.toDebugString ,产生如下结果。map 产生MapPartitionsRDD。MapPartitionsRDD 和HadoopRDD 是RDD 的子类。RDD 是抽象的,需要具体的子类来实现(实现数据存储在哪里,具体怎么计算)。map 产生MapPartitionsRDD 依赖于上一步flatM安排产生的MapPartitionsRDD,而flatMap产生的MapPartitionsRDD依赖于上一步textFlie 产生的MapPartitionsRDD,而textFile 产的MapPartitionsRDD依赖于textFile产生的HadoopRDD,通过HadoopRDD 来读取具体的数据。
19:51:42 上传
4、进行reduce 操作,就是key 相同,value进行相加。相当于图书馆有1000个书架,每个书架一个负责人对书的数量进行统计,然后这1000个人将自己统计的数据交给你来汇总。这个过程就叫做shuffle。Shuffle 会产生ShuffledRDD 。就是每个书架的书交个一个节点去处理(这个处理过程是并行的),另外一个节点会从这1000个节点上抓取所有的书的数量的信息进行汇总。val reduced=mapped.reduceByKey(_+_)&&产生的结果如图11所示:
19:53:21 上传
图 11查看依赖关系 reduced.toDebugString&&。reduceByKey 产生的ShuffledRDD依赖上一步 map 产生的MapPartitionsRDD。执行结果如图12所示:
19:55:28 上传
5、保存到hdfs 中。(前面所有的操作都没有触发操作,现在才开始真正的计算并将数据写入磁盘)。Spark在计算时每个步骤都是RDD ,RDD 的本质是提供的容错性,自动从失败的节点恢复。如果某个节点的RDD 丢失了,会根据RDD 的血统关系,重新生成。Reduced.saveAsTextFile(“/library/wordcount/output/dt_spark_clicked4”)。执行结果如图13所示:
19:57:10 上传
6、在浏览器上查看
20:04:09 上传
20:05:27 上传
注:本学习笔记来自DT大数据梦工厂
支持楼主:、
购买后,论坛将把您花费的资金全部奖励给楼主,以表示您对TA发好贴的支持
载入中......
总评分:&论坛币 + 5&
讲的很好,作为一个初学者学到很多。
a2100 发表于
讲的很好,作为一个初学者学到很多。
&nbsp&nbsp|
&nbsp&nbsp|
&nbsp&nbsp|
&nbsp&nbsp|
&nbsp&nbsp|
&nbsp&nbsp|
如有投资本站或合作意向,请联系(010-);
邮箱:service@pinggu.org
投诉或不良信息处理:(010-)
论坛法律顾问:王进律师spark transform系列__Coalesce - 隔壁老杨的专栏 - CSDN博客
spark transform系列__Coalesce
spark1.6源码
Coalesce/repartition
这个操作是把当前的RDD中的partition根据一个新的传入的parition的个数,对partition中的结果集进行重新组合成一个新的结果集的函数.
这个函数需要传入两个参数:
参数1:需要重新进行分区的分区个数.
参数2:是否执行shuffle操作,默认为false.
def&coalesce(numPartitions:&Int,&shuffle:&Boolean&=&false)
(implicit&ord:&Ordering[T]&=&null)
&&&&:&RDD[T]&=&withScope&{
&&if&(shuffle)&{
这种情况是表示需要执行shuffle的情况,
首先定义针对当前的RDD的partition的执行的function,distributePartition函数.
这个函数中,根据需要重新生成的RDD的partition的个数,把当前的RDD中的PARTITION的结果随机存储到新RDD的某个PARTITION中,这个distributePartition函数的key就是要hash完成后的新的partition的下标,value是原来(也就是当前的RDD)的key-value.
这样做的作用是,在对key执行hash操作时,key就是对应的新的分区的下标,直接就能得到这个对应的分区.
这个地方,每条记录都向从开始的分区位置,一直向下增加,也就是每条记录都会轮询的向下一个partition中分发数据.通过执行shuffle操作,可以最大可能的保证新的RDD中每个PARTITION的数据都差不多.
&&&&/**&Distributes&elements&evenly&across&output&partitions,&starting&from&a&random&
&&&&&&&&partition.&*/
&&&&val&distributePartition&=&(index:&Int,&items:&Iterator[T])&=&&{
&&&&&&var&position&=&(new&Random(index)).nextInt(numPartitions)
&&&&&&items.map&{&t&=&
&&&&&&&&//&Note&that&the&hash&code&of&the&key&will&just&be&the&key&itself.&
&&&&&&&&&&&The&HashPartitioner
&&&&&&&&//&will&mod&it&with&the&number&of&total&partitions.
&&&&&&&&position&=&position&+&1
&&&&&&&&(position,&t)
&&&&}&:&Iterator[(Int,&T)]
生成CoalesedRDD的实例,这个情况需要执行shuffle操作,因此,在这个实例传入的上层RDD的依赖根据当前的rdd先生成一个ShuffledRDD的实例.下面的mapPartitionsWithIndex是对当前的RDD执行了一个MAP操作.最后根据生成的CoalesedRDD执行values操作就得到原来RDD的key-value.
&&&&//&include&a&shuffle&step&so&that&our&upstream&tasks&are&still&distributed
&&&&new&CoalescedRDD(
&&&&&&new&ShuffledRDD[Int,&T,&T](mapPartitionsWithIndex(distributePartition),
&&&&&&new&HashPartitioner(numPartitions)),
&&&&&&numPartitions).values
&&}&else&{
不需要执行shuffle操作,直接根据当前的RDD与新的PARTITION的个数,生成CoalescedRDD的实例.这里会根据每个新的RDD中partition的个数与老的partition个数进行组合,原则上保持host相同的partition放到一个分区中,但是如果某一个分区对应上层的partition太多(多个0.1个百分比时),会进行随机分区(这个0.1其实并不能完全保证).
&&&&new&CoalescedRDD(this,&numPartitions)
CoalescedRDD的处理流程:
首先看实例生成的部分,在实例生成时,上层RDD的依赖部分默认为Nil,这个依赖通过getDependencie得到.
private[spark]&class&CoalescedRDD[T:&ClassTag](
&&&&@transient&var&prev:&RDD[T],
&&&&maxPartitions:&Int,
&&&&balanceSlack:&Double&=&0.10)
&&extends&RDD[T](prev.context,&Nil)&{
接下来看看新的RDD的partition的生成部分.
override&def&getPartitions:&Array[Partition]&=&{
这里通过当前新生成的RDD的partition的个数与上层RDD的依赖,每个partition可接受的数据误差的范围,默认是0.10.生成一个PartitionCoalescer实例,通过这个实例的run函数来得到这个新的RDD的partitions的信息.
这里的balanceSlack的值用于控制针对上一个rdd中的partitions与当前的partitions可接受的误差的partition的个数.
&&val&pc&=&new&PartitionCoalescer(maxPartitions,&prev,&balanceSlack)
这里的run函数,返回的是对应的一个一个的PartitionGroup的实例,
在run的函数中,需要执行的流程:
这上地方分成两个处理:
1,如果当前的RDD的上层依赖的RDD是一个shuffle的RDD时,那么当前的RDD的partition的个数与上层的依赖RDD的partition相同,这个时候很好处理,当前RDD与上层依赖是一个一对一的关系.
2,如果当前的RDD的上层的依赖的RDD是一个非SHUFFLE的RDD时,这个时候,如果两个RDD的PARTITION的个数相同,也就好处理,与1的处理相同.
3,如果当前的RDD的partition小于上层的依赖RDD的partition的个数,这个时候的处理相对较麻烦:
3,1,首先根据当前的RDD的partition的个数取对应上层的rdd的partition的个数,并通过partition的host进行分组存储,也就是每个host中最少存储了一个partition,
3,2,然后,把上层的rdd中多出部分的partition(还没有在分组中存储的)进行处理,如果这个partition对应现在已经存在的host分组不存在时,从现有的分组中取出最小的一个,用于存储这个partition,
3,3,如果对应此partition的host已经存在,取出这个host中分区组中对应上层rdd的partition个数最小的分组,同时在现有的所有分组中随机取出两个分组,找到最小的一个分组,如果这个分组的partition的个数加上可接受的误差的个数,小于现在host对应的最小分组的个数时,把这个partition添加到这个随机的分组中,否则添加到host对应的分组中.
根据上面的处理,最后得到CoalescedRDDPartition的信息,
&&pc.run().zipWithIndex.map&{
&&&&case&(pg,&i)&=&
&&&&&&val&ids&=&pg.arr.map(_.index).toArray
&&&&&&new&CoalescedRDDPartition(i,&prev,&ids,&pg.prefLoc)
下面来看看getDependencie函数的逻辑:
override&def&getDependencies:&Seq[Dependency[_]]&=&{
这里的Dependency是一个Narrow的依赖,也就是说,当前RDD中的partition对应上层的rdd的partition的个数为1到多个,1对1通常是做过shuffle操作的情况,
&&Seq(new&NarrowDependency(prev)&{
&&&&def&getParents(id:&Int):&Seq[Int]&=
&&&&&&partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
最后看看这个CoalescedRDD的compute的函数逻辑:
这个compute根据当前的RDD中的partition对应的上层依赖RDD的partitions(下面的parents)的iterator进行flatMap操作,把对应上层的每个partition的iterator组合到一个iterator中,
这个组合是每个partition的数据集的iterator进行首层相连.
override&def&compute(partition:&Partition,&context:&TaskContext):&Iterator[T]&=&{
&&partition.asInstanceOf[CoalescedRDDPartition].parents.iterator
&&&&.flatMap&{&parentPartition&=&
&&&&&&&&firstParent[T].iterator(parentPartition,&context)
Repartition
这个操作是直接使用的的操作不作太细的说明只在默认情况下的操作的参数默认为的操作时会显示指定的值为在指定定个参数为的情况下重新生成的的实例与上层的的依赖的的结果为对的关系不需要做太多的的区划同时比不做操作的功能在数据分划上能够更加的平均
def&repartition(numPartitions:&Int)(implicit&ord:&Ordering[T]&=&null)
:&RDD[T]&=&withScope&{
&&coalesce(numPartitions,&shuffle&=&true)
我的热门文章
即使是一小步也想与你分享

我要回帖

更多关于 spark transform算子 的文章

 

随机推荐