如何java 去掉 r n一个rdd的前n行

rddndndnsnnsn急
为您推荐:
扫描下载二维码hadoop&spark(5)
Spark RDD Scala语言编程
RDD(Resilient Distributed Dataset)是一个不可变的分布式对象集合, 每个rdd被分为多个分区, 这些分区运行在集群的不同节点上。rdd支持两种类型的操作:转化(trainsformation)和行动(action), Spark只会惰性计算rdd, 也就是说, 转化操作的rdd不会立即计算, 而是在其第一次遇到行动操作时才去计算, 如果想在多个行动操作中复用一个rdd, 可以使用RDD.persist()让Spark把这个rdd缓存下来。
0. 初始化SparkContext
import org.apache.spark.{SparkConf, SparkContext}
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("spark-rdd-demo"))
1. 创建RDD
Spark提供了2种创建RDD的方式:
1.1 读取外部数据集
val lines = sc.parallelize(List("Java", "Scala", "C++"))
1.2 在驱动器程序中对一个集合进行并行化
val lines = sc.textFile("hdfs://dash-dev:9000/input/test.txt")
2. RDD操作
2.1 转化操作
RDD的转化操作是返回新RDD的操作, 常用转化操作总结如下:
表1: 对一个数据为{1,2,3,3}的RDD进行基本的转化操作
将函数应用于RDD中每个元素, 将返回值构成新的RDD
rdd.map(x=&x+1)
将函数应用于RDD中的每个元素, 将返回的迭代器的所有内容构成新的RDD, 常用来切分单词
rdd.flatMap(x=&x.to(2))
返回一个通过传入给filter()的函数的元素组成的RDD
rdd.filter(x=& x&2)
distinct()
rdd.distinct()
sample(withReplacement, fraction, [seed])
对RDD采样, 以及是否替换
rdd.sample(false, 0.5)
表2: 对数据分别为{1,2,3}和{2,3,4}RDD进行针对2个RDD的转化操作
求2个RDD的并集
rdd.union(other)
intersection()
求2个RDD的交集
rdd.intersection(other)
subtract()
求2个RDD的差集
rdd.subtract(other)
cartesian()
求2个RDD的笛卡尔积
rdd.cartesian(other)
{1,2}, {1,3}, {1,4}…{3,4}
sample(withReplacement, fraction, [seed])
对RDD采样, 以及是否替换
rdd.sample(false, 0.5)
2.2 行动操作
RDD的行动操作会把最终求得的结果返回驱动器程序, 或者写入外部存储系统中。
表3: 对一个数据为{1,2,3,3}的RDD进行基本RDD的行动操作
并行整合RDD中的所有元素
rdd.reduce((x, y) =& x+y)
返回RDD中的所有元素
rdd.collect()
求RDD中的元素个数
rdd.count()
countByValue()
各元素在RDD中出现的次数
rdd.countByValue()
{1,1}, {2, 1}, {3,2}
从RDD中返回n个元素
rdd.take(2)
从RDD中返回前n个元素
rdd.top(3)
foreach(func)
对RDD中的每个元素使用给定的函数
rdd.foreach(print)
2.3 向Spark传递函数
Spark的大部分转化和行动操作都要依赖于用户传递的函数来计算, 当传递的对象是某个对象的成员, 或者包含了对某个对象中一个字段的引用时(如self.field), Spark就会把整个对象发送到工作节点上--这比你本意想传递的东西大太多了!替代的方案是,把你需要的字段从对象中拿出来放到一个局部变量中, 然后传递这个局部变量:
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
rdd.map(isMatch)
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
rdd.map(x =& x.split(query))
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
val localQuery = this.query
rdd.map(x =& x.split(localQuery))
另外, 要注意的是, Spark要求我们传入的函数及其应用的数据是可序列化的(实现了Java的Serializable接口), 否则会出现NotSerializableException。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:1336440次
积分:12688
积分:12688
排名:第805名
原创:323篇
转载:126篇
评论:364条
(7)(1)(1)(7)(3)(4)(6)(1)(2)(3)(1)(5)(4)(8)(6)(2)(4)(1)(6)(11)(11)(15)(15)(16)(18)(29)(13)(29)(23)(19)(27)(16)(21)(23)(11)(9)(22)(20)(5)(1)(4)(1)(10)(9)spark 常用函数介绍(python)
spark 常用函数介绍(python)
发布时间: 6:51:44
编辑:www.fx114.net
本篇文章主要介绍了"spark 常用函数介绍(python) ",主要涉及到spark 常用函数介绍(python) 方面的内容,对于spark 常用函数介绍(python) 感兴趣的同学可以参考一下。
以下是个人理解,一切以官网文档为准。
在开始之前,我先介绍一下,RDD是什么?
&&&&& RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。
  创建RDD:
&&& sc.parallelize([1,2,3,4,5], 3)
#意思是将数组中的元素转换为RDD,并且存储在3个分区上[1]、[2,3]、[4,5]。如果是4个分区:[1]、[2]、[3]、[4,5]
&  上面这种是数组创建,也可以从文件系统或者HDFS中的文件创建出来,后面会讲到。
只要搞懂了spark的函数们,你就成功了一大半。
spark的函数主要分两类,Transformations和Actions。Transformations为一些数据转换类函数,actions为一些行动类函数:
转换:转换的返回值是一个新的RDD集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个RDD作为参数,然后返回一个新的RDD。
行动:行动操作计算并返回一个新的值。当在一个RDD对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。
下面介绍spark常用的Transformations, Actions函数:
Transformations
map(func [, preservesPartitioning=False])& --- 返回一个新的分布式数据集,这个数据集中的每个元素都是经过func函数处理过的。
&&& data = [1,2,3,4,5]
&&& distData = sc.parallelize(data).map(lambda x: x+1).collect()
#结果:[2,3,4,5,6]
filter(func)& --- 返回一个新的数据集,这个数据集中的元素是通过func函数筛选后返回为true的元素(简单的说就是,对数据集中的每个元素进行筛选,如果符合条件则返回true,不符合返回false,最后将返回为true的元素组成新的数据集返回)。
&&& rdd = sc.parallelize(data).filter(lambda x:x%2==0).collect()
#结果:[2, 4]
flatMap(func [, preservesPartitioning=False])& --- 类似于map(func), 但是不同的是map对每个元素处理完后返回与原数据集相同元素数量的数据集,而flatMap返回的元素数不一定和原数据集相同。each input item can be mapped to 0 or more output items (so&funcshould return a Seq rather than a single item)
#### for flatMap()
&&& rdd = sc.parallelize([2,3,4])
&&& sorted(rdd.flatMap(lambda x: range(1,x)).collect())
#结果:[1, 1, 1, 2, 2, 3]
&&& sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect())
#结果:[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
#### for map()
&&& rdd = sc.parallelize([2,3,4])
&&& sorted(rdd.flatMap(lambda x: range(1,x)).collect())
#结果:[[1], [1, 2], [1, 2, 3]]
&&& sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect())
#结果:[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]
mapPartitions(func [, preservesPartitioning=False])& ---mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
&&& rdd = sc.parallelize([1,2,3,4,5], 3)
&&& def f(iterator): yield sum(iterator)
&&& rdd.mapPartitions(f).collect()
#结果:[1,5,9]
mapPartitionsWithIndex(func [, preservesPartitioning=False])& ---Similar to&mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes.
&&& rdd = sc.parallelize([1,2,3,4,5], 3)
&&& def f(splitIndex, iterator): yield splitIndex
&&& rdd.mapPartitionsWithIndex(f).collect()
#结果:[0,1,2]
#三个分区的索引
reduceByKey(func [,&numPartitions=None,&partitionFunc=&function portable_hash at 0x7fa664f3cb90&])& --- reduceByKey就是对元素为kv对的RDD中Key相同的元素的value进行reduce,因此,key相同的多个元素的值被reduce为一个值,然后与原RDD中的key组成一个新的kv对。
&&& from operator import add
&&& rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
&&& sorted(rdd.reduceByKey(add).collect())
&&& #或者 sorted(rdd.reduceByKey(lambda a,b:a+b).collect())
#结果:[('a', 2), ('b', 1)]
aggregateByKey(zeroValue)(seqOp, combOp [, numPartitions=None])& ---
sortByKey([ascending=True, numPartitions=None, keyfunc=&function &lambda& at 0x7fa&])& --- 返回排序后的数据集。该函数就是队kv对的RDD数据进行排序,keyfunc是对key进行处理的函数,如非需要,不用管。
&&& tmp = [('a', 1), ('b', 2), ('1', 3), ('D', 4)]
&&& sc.parallelize(tmp).sortByKey(True, 1).collect()
#结果: [('1', 3), ('D', 4), ('a', 1), ('b', 2)]
&&& sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect()
#结果:[('1', 3), ('a', 1), ('b', 2), ('D', 4)]
#注意,比较两个结果可看出,keyfunc对键的处理只是在数据处理的过程中起作用,不能真正的去改变键名
join(otherDataset [, numPartitions=None])& --- join就是对元素为kv对的RDD中key相同的value收集到一起组成(v1,v2),然后与原RDD中的key组合成一个新的kv对,返回。
&&& x = sc.parallelize([("a", 1), ("b", 4)])
&&& y = sc.parallelize([("a", 2), ("a", 3)])
&&& sorted(x.join(y).collect())
#结果:[('a', (1, 2)), ('a', (1, 3))]
cartesian(otherDataset)& --- 返回一个笛卡尔积的数据集,这个数据集是通过计算两个RDDs得到的。
&&& x = sc.parallelize([1,2,3])
&&& y = sc.parallelize([4,5])
&&& x.cartesian(y).collect()
#结果:[(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
Action (这里只讲支持python的,java和scala的后面用到了在做详解,当然支持python就一定支持java和scala)
reduce(func)& --- reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
&&& from operator import add
&&& sc.parallelize([1,2,3,4,5]).reduce(add)
# 结果:15
collect()& --- 返回RDD中的数据,以list形式。
&&& sc.parallelize([1,2,3,4,5]).collect()
#结果:[1,2,3,4,5]
count()& --- 返回RDD中的元素个数。
&&& sc.parallelize([1,2,3,4,5]).count
#结果:5
first()& --- 返回RDD中的第一个元素。
&&& sc.parallelize([1,2,3,4,5]).first()
#结果:1
take(n)& --- 返回RDD中前n个元素。
&&& sc.parallelize([1,2,3,4,5]).take(2)
#结果:[1,2]
takeOrdered(n [, key=None])& --- 返回RDD中前n个元素,但是是升序(默认)排列后的前n个元素,或者是通过key函数指定后的RDD(这个key我也没理解透,后面在做详解)
&&& sc.parallelize([9,7,3,2,6,4]).takeOrdered(3)
#结果:[2,3,4]
&&& sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x:-x)
#结果:[9,7,6]
saveAsTextFile(path [, compressionCodecClass=None])& --- 该函数将RDD保存到文件系统里面,并且将其转换为文本行的文件中的每个元素调用 tostring 方法。
parameters:& path - 保存于文件系统的路径
       compressionCodecClass - (None by default) string i.e. &org.apache.press.GzipCodec&
&&& tempFile = NamedTemporaryFile(delete=True)
&&& tempFile.close()
&&& sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
&&& from fileinput import input
&&& from glob import glob
&&& ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
&Empty lines are tolerated when saving to text files:
&&& tempFile2 = NamedTemporaryFile(delete=True)
&&& tempFile2.close()
&&& sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
&&& ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
'\n\n\nbar\nfoo\n'
&Using compressionCodecClass:
&&& tempFile3 = NamedTemporaryFile(delete=True)
&&& tempFile3.close()
&&& codec = "org.apache.press.GzipCodec"
&&& sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
&&& from fileinput import input, hook_compressed
&&& result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
&&& b''.join(result).decode('utf-8')
u'bar\nfoo\n'
countByKey()& --- 返回一个字典(key,count),该函数操作数据集为kv形式的数据,用于统计RDD中拥有相同key的元素个数。
&&& defdict = sc.parallelize([("a",1), ("b",1), ("a", 1)]).countByKey()
&&& defdict
#结果:defaultdict(&type 'int'&, {'a': 2, 'b': 1})
&&& defdict.items()
#结果:[('a', 2), ('b', 1)]
countByValue()& --- 返回一个字典(value,count),该函数操作一个list数据集,用于统计RDD中拥有相同value的元素个数。
&&& sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items()
#结果:[(1, 2), (2, 4), (3, 3), (5, 1)]
foreach(func)& --- 运行函数func来处理RDD中的每个元素,这个函数常被用来updating an Accumulator或者与外部存储系统的交互。
&&& def f(x): print(x)
&&& sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
#note: 打印是随机的,并不是一定按1,2,3,4,5的顺序打印
本文标题:
本页链接:

我要回帖

更多关于 python 去掉 n 的文章

 

随机推荐