spark将spark parquett文件采用snappy方式压缩需要设置什么参数

RC ORC Parquet 格式比较和性能测试 - CSDN博客
RC ORC Parquet 格式比较和性能测试
RC ORC Parquet 格式比较和性能测试
作者:刘旭晖 Raymond 转载请注明出处
Email:colorant
为什么要比较这三者
为什么要比较,起因是为了提高Hadoop集群的存储和计算效率,尤其是离线Hive作业的效率,为什么比较的是这三者,是因为三者是目前Hive离线作业中正在大规模使用或可能大规模使用的三种主流的相对成熟的文件格式
对于ORC性能的评测,Hortonworks发过一篇被广泛传播和引用的博客 :
这篇文章在ORC改进原理等方面说的比较客观,但是实际的benchmark比较数据,即使不说是有故意偏颇的嫌疑,至少也是不科学不客观的,特别是下面这张文件尺寸比较,带有很大的误导性。
这个测试的数据集,看起来用了TPC-DS的数据,貌似很专业的样子
但是首先,这里的测试方法明显的就不科学,压缩算法并不相同有什么好比的(Snappy侧重性能,Zlib侧重压缩率)?不知道作者对RCFile采用了什么压缩算法,但是Parquet+Snappy,ORC+Zlib,这种比较的基调就不公正(当然,这个问题,作者说是因为这是它们默认的压缩格式,但是科学严谨的来说,benchmark应该用统一的标准来衡量)
其次,套多数hive作业任务的的场景,TPC-DS的数据特性和典型的Hive应用场景(至少我们这边的场景)里的数据看起来并不一致。RC File的压缩还不到15%,这压缩率明显不是Hive离线处理数据场景和压缩算法下RCFile的典型表现
三种文件格式简单介绍
Parquet的设计方案,整体来看,基本照搬了Dremel中对嵌套数据结构的打平和重构算法,通过高效的数据打平和重建算法,实现按列存储(列组),进而对列数据引入更具针对性的编码和压缩方案,来降低存储代价,提升计算性能。想要了解这一算法逻辑的,可以看Dremel的论文:
从文件结构上来看,如下图所示:
基本上就是一个文件由多个列组组成,数据先按列组(rowgroup)分段(也就是先做行切割),然后在列组内部对每个列的数据分列连续存储(columnchunk)(也就第二步做列切割),每个列内部的数据,再细分成page(可以近似的认为是再做行切割),最后,在文件的尾部,存储所有列组的元数据信息
这么分层设计,从并发度的角度考虑,行切割的目的,主要做为任务的切分单元,比如一个Map任务处理一个列组里的数据。然后列切割的目的,除了按需读取数据,也是做为IO的并发单元。最后Page的拆分,主要是从编码和压缩的角度,进行拆分,以page为单位进行压缩编码,如果近似的理解,也可以认为一定程度上起到了内存和CPU上用量的控制,从Parquet文件的层面来说,Page是数据最小的读写单元。
最后,对列数据提供多种编码方式,比如:字典(Dictionary),游程(RLE),增量(DELTA)等等
综上,Parquet主要还是对Dremel的存储模型这部分的一个实现,在Dremel存储模型定义范围之外,自己额外做的工作,并不多。(这里指的文件格式底层技术实现方面,工程上和大数据生态系各个组件的打通结合方面,还是做了大量的工作的)
ORC文件格式的一些基础思想和Parquet很像,也是先按行水平切割,在按列垂直切割,针对不同的列,采用特定的编码格式,最后再进一步对编码后的数据进行压缩。支持的编码格式(游程,字典,增量,bit),压缩格式(zlib,snappy,LZO等等)也基本一致
与Parquet不同的地方是,Parquet对嵌套型数据结构的打散和重构的算法,来源于Dremel,通过两种level信息(definition level,repetition level)来标识特定数据在数据结构中层次位置,这两种信息和具体的列数据直接绑定,仅依靠这些信息和对象整体的Schema就能重构出这一列信息原有的层次结构。
而ORC的实现,更加简单直白一些,类似元素是否为Null的信息,就是一组bit位图,而对于元素个数不定的结构,如List,Map等数据结构,则在虚拟的父结构中维护了一个所拥有的子元素数量的信息。这样的带来的问题是,由单纯的某一叶节点列元素的数据出发,是无法独立构建复原出该列数据的结构层次的,需要借助父元素的辅助元数据才能完整复原。
在实现中,ORC对于每个列(基本的或符合结构的)采用了多个Stream分别存储数据和上述各类元数据。
比如String类型的列,如果使用字典编码,那么会生成4个stream,PRESENT Stream用来标识具体String元素是否为Null,DATA Stream,连续存储字符串自身,DICTIONARY_DATA Stream存储字典信息,LENGTH Stream存储每个元素的长度(用来从DATA Stream中定位和拆分数据)
再比如Map类型的列,使用一个PRESENT Stream来标识具体每个Map元素是否为Null,用LENGTH
Stream来标识每个Map元素内部有几个对象
这种处理方式对比Dremel,看起来的确老土很多,理论深度上被甩了不止一条街,不过如果对于嵌套层次不复杂的数据结构,也还是简单有效的。但是,ORC的风评最近感觉明显比Parquet要盛,这又是为什么呢?
个人感觉,主要还是工程实现上的问题,除了核心的数据结构的打散和重建逻辑,ORC的文件格式里,还包含了其它的一些工程优化手段。比如索引(并不是传统意义上的全量排序用索引,更接近统计信息,比如列组的min,max,avg,count等信息,可以用作粗过滤手段,也可以覆盖部分聚合计算的需求),比如Bloomfilter等。而Parquet在这些方面有规划,但是目前似乎基本都没有做。
另外,如果仅从Hive的角度来说,一方面ORC是亲儿子,有些工作开展得比较早,另一方面扁平的数据结构,让Parquet在支持嵌套数据结构方面的优势并不能很好的体现,大概也是原因之一吧。
RC File的格式,就简单很多了,基本除了先水平切Row,再垂直切Column以外,就剩下每个行组的Metadata里维护了行组的纪录数和每个column及每个Column纪录的长度,除此之外就没有太多别的黑科技了。编码方面Metadata使用RLE编码,Column Data使用Gzip等压缩格式(取决于写入方,比如MR程序)
具体看论文
RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems
主要做了RC和ORC的比较,Parquet做了一部分,主要还是在Hive的场景下,目前看来ORC会更适合一些(基于hive 1.2.1)
!!! 需要注意的是,具体性能数据取决于集群各种参数配置,具体数据格式内容等因素影响,所以绝对值大小并没有实际意义,比例大小的绝对值也不见得完全有代表性,比例的正负趋势才是基本可以参考的,另外时间有限,部分测试还有一些存疑问题尚未验证
首先是压缩率和写性能,从上表可以看到采用不同的压缩格式,不同的压缩级别,对应不同数据类型,其实结论并不是简单一致的
基本上,当前版本情况下,对于String类型比重大的数据,RC文件的尺寸,最佳表现要优于ORC的默认格式(ZLIB, SPEED),但是差距不大(3-5%左右),而对于存int bigint等类型的数据,ORC文件表现优于RC文件是比较一致的
再分析理解一下,可以认为,ORC的编码(Encoding)优势,使得在同等条件下,结果文件尺寸大小要优于RC(30%~100%),而对于复杂String类型比重大的数据,RC文件由于LZ4压缩算法比ZLIB 低压缩率设置下的压缩率的优势,最终结果数据RC+LZ4在CPU耗时略优的情况下,压缩率也略优。 ORC+Zlib可以通过更高压缩率反转尺寸优势,但是CPU耗时就大大增加了。当前hive 1.2.1版本集成的ORC文件格式(0.12+一些改进)还不支持LZ4压缩格式(独立的ORC 1.2.2版本支持),可以想见,一旦集成了,同等条件下,ORC+LZ4的表现应该是最优的。
而Parquet这边,压缩率方面看起来和ORC也没有很明显差距,小幅度的区别的原因应该还是具体Encoding和compress算法的区别。但是CPU耗时方面,明显高出RC和ORC,应该是列打散算法的消耗造成的,也不排除目前Parquet对Dremel算法的应用还有优化的空间。
下面的数据测试读取性能,RC-LZ4 v.s. ORC-ZLIB SPEED
可以看到第一例case中,ORC格式由于column data统计数据的存在,在数据过滤方面可以更好的使用Filter Push down技术,所以性能要明显由于RC格式(数据量100倍)。 无条件count这种,很明显,ORC大概能做到只需要检索原始数据500-2000分之一的数据量,RC大概是十五分之一左右(当然,这取决于表的字段数,RC文件的加速来源于分列存储,ORC格式的加速来源于meta统计信息里Count信息的存在)
而第二例有条件过滤计数case中,ORC还是优于RC,不过我们的数据集case中,检索数据量的大小差异大概只有三倍,大致可以认为是meta统计信息中范围信息起到的过滤作用。 不过,很奇怪的是,理论上ORC文件中添加了Bloom Filter以后,应该可以更好的加速过滤检索,但实际效果并没有见到,还需要再验证,是否是我的测试方法或者测试集又问题,还是当前版本还有Bug存在(1.2.1的版本之前BF这块都有bug,并不能发挥作用,但1.2.1 版本以后,jira上已经找不到这方面bug的报告了)
CPU耗时方面,差异没有那么显著 50%,这也和这个case中,IO是瓶颈,MR任务数量多,平均执行时间短,启动耗时占比不能忽略等因素有关
再看Parquet,还是同样的问题,CPU的耗时明显要偏高(尽管使用了比RC和ORC更快的Snappy压缩方式)
总体可以认为,在我们当前的数据集和hive版本环境下,在文件写入方面,ORC相比RC文件的优势不显著,一些场合RC文件还要更优,在查询检索方面,ORC则基本是更优的,性能差距大小取决于具体数据集和检索模式。如果Hive能集成ORC更新的版本,支持LZ4,并修复一些Bug,那应该就没有任何再使用RC的理由了。
至于Parquet,可以考虑在需要支持深度嵌套的数据结构的应用场合中去使用
需要进一步验证的点
内存消耗情况比较
ORC高版本与Hive集成的进展情况跟踪
各种block/strip/page大小参数对文件尺寸,读写性能的影响
ORC BloomFilter问题的跟踪
更大范围的性能验证比较
Spark和ORC的集成情况
Hive orc格式配置参数
RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems
Parquet格式:
Parquet 配置参数:
Parquet 官网
hive wiki Parquet部分
理论上用TimeStamp和Date类型的数据结构,应该是要比用String类型的方式表达日期要更高效(毕竟有明确的类型信息),这点从上表中同样的数据使用不同的格式以后压缩率的对比情况上就能看得出来。不过,稍微有点意外的是,在CPU耗时方面,TimeStamp类型远远超过String类型(差4倍。。。),这样使用专门的日期类型的价值就完全被湮没了。照理不应该这么差,不是我哪里姿势没搞对,就是在Hive中,这些类型的读写比较等性能方面还存在很大的改进空间。
顺道,推销一下个人公众号 “望月的蚂蚁”, 和技术完全无关。。。。 以一些有趣的兴趣爱好等为主题,比如乐高,桌游,旅行,摄影。。。工作生活要平衡不是;)
本文已收录于以下专栏:
相关文章推荐
本文主要使用Hive引擎对比测试了两种业界较为认可的列式存储格式——ORC和Parquet,并使用Text存储格式做对比,设置了星状模型、扁平式宽表,嵌套式宽表等多种场景进行测试,以供感兴趣的同学参考...
随着大数据时代的到来,越来越多的数据流向了Hadoop生态圈,同时对于能够快速的从TB甚至PB级别的数据中获取有价值的数据对于一个产品和公司来说更加重要,在Hadoop生态圈的快速发展过程...
背景随着大数据时代的到来,越来越多的数据流向了Hadoop生态圈,同时对于能够快速的从TB甚至PB级别的数据中获取有价值的数据对于一个产品和公司来说更加重要,在Hadoop生态圈的快速发展过程中,涌现...
数据做压缩和解压缩总会增加CPU的开销,但可以最大程度的减少文件所需的磁盘空间和网络I/O的开销
最好对那些I/O密集型的作业使用数据压缩
hive表的存储格式为
    TEXTFILE
   ...
目的:将上网日志导入到hive中,要求速度快,压缩高,查询快,表易维护。推荐使用ORC格式的表存储数据
思路:因为在hive指定RCFile格式的表,不能直接load数据,只能通过textfile表...
随着大数据时代的到来,越来越多的数据流向了Hadoop生态圈,同时对于能够快速的从TB甚至PB级别的数据中获取有价值的数据对于一个产品和公司来说更加重要,在Hadoop生态圈的快速发展过程中,涌现了一...
先介绍下Orc的文件格式,截一张官方的图:
    可以看到每个Orc文件由1个或多个stripe组成,每个stripe250MB大小,这个Stripe实际相当于之前的rcfile里的R...
一、首先来看下ORCfile。
Orcfile(Optimized Row Columnar)是hive 0.11版里引入的新的存储格式,是对之前的RCFile存储格式的优化,是HortonWork...
Hive文件存储格式
1.textfile
textfile为默认格式
存储方式:行存储
磁盘开销大 数据解析开销大
压缩的text文件 hive无法进行合并和拆分
2.sequencefile
spark、hive、impala、hbase、gbase在结构化数据方面查询原理对比(含parquet/orc)
他的最新文章
讲师:王禹华
讲师:宋宝华
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)查看: 31420|回复: 2
spark SQL编程
主题帖子积分
1.sparkSQL中核心的组件是什么?
2.SchemaRDD组成都包含什么?
3.sparkSQL是否支持多种数据源?
4.spark SQL支持hive哪些功能,不支持哪些功能?
spark SQL支持像SQL,hiveQL或scala那样的关系型查询语句,sparkSQL中核心的组件是SchemaRDD,SchemaRDD由row对象和描述每行中每列的数据类型的schema组成。SchemaRDD和关系型数据库中的表类似,可以从已存在的RDD,parquet文件,json数据集或运行HiveQL创建而来。
sparkSQL中最重要的一个类是SQLContext或它的继承类,创建SQLContext如下:val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD复制代码
除了基本的SQLContext外,还可以创建HiveContext,它提供了SQLContext所没有的超函数,其他的特征包括使用更复杂的HiveQL解析器进行解析查询语句、访问HiveUDF、从hive表中读取数据。当使用HiveContext,不必安装hive,并且任何SQLContext可用的数据源都可用于HiveContext。
指定的用于解析查询语句的SQL变体可通过spark.sql.dialect选项进行设置,该参数可通过SQLContext的setConf方法或在SQL中使用SET key=value进行设置。对于SQLContext来说,唯一可用的dialect是sql,它使用了SparkSQL提供的一个简单的SQL解析器,对于HiveContext来说,默认的dialect是hiveql,sql也是可用的,但HiveQL解析器会复杂的多。
数据源sparkSQL支持多种数据源,SchemaRDD可被当作普通的RDD操作并能被注册为一个临时表,注册SchemaRDD成为一个表允许你在它的数据上运行SQL查询。本节描述了多种将数据导入SchemaRDD的方法。
RDDsparkSQL支持两种不同的方式来将已存在的RDD转换为SchemaRDD,第一种方式使用反射来推断RDD的schema,这种方式会有很多简洁的代码并且能在你书写spark应用程序时已经知道schema的情况下很好的工作。
使用反射推断schemasparkSQL中的scala接口支持自动将包含case类的RDD转换为SchemaRDD,其中case类定义了表的schema。case类中的属性会被用于表中的列名,case类还可以被嵌套或包含复杂类型如Sequence或Array。// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile(&examples/src/main/resources/people.txt&).map(_.split(&,&)).map(p =& Person(p(0), p(1).trim.toInt))
people.registerTempTable(&people&)
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql(&SELECT name FROM people WHERE age &= 13 AND age &= 19&)
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t =& &Name: & + t(0)).collect().foreach(println)复制代码
编程方式指定的schema当case类不能被提前定义(如数据被编码成字符串或文本数据集将被解析),这时可以以编程方式来创建SchemaRDD:
1.从原始的RDD创建行的RDD
2.创建匹配第一步创建的RDD中的行结构的StructType
3.使用SQLContext的applySchema方法将schema应用到行RDD中// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile(&examples/src/main/resources/people.txt&)
// The schema is encoded in a string
val schemaString = &name age&
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
// Generate the schema based on the string of schema
val schema =
&&StructType(
& & schemaString.split(& &).map(fieldName =& StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(&,&)).map(p =& Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemaRDD.registerTempTable(&people&)
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql(&SELECT name FROM people&)
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
results.map(t =& &Name: & + t(0)).collect().foreach(println)复制代码
parquet文件parquet是柱状格式数据,它可以被其他很多的数据处理系统支持,sparkSQL支持读写parquet文件以自动保护原始数据的schema。编程方式导入数据例子如下:// sqlContext from the previous example is used in this example.
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile(&people.parquet&)
// Read in the parquet file created above.&&Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a SchemaRDD.
val parquetFile = sqlContext.parquetFile(&people.parquet&)
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable(&parquetFile&)
val teenagers = sqlContext.sql(&SELECT name FROM parquetFile WHERE age &= 13 AND age &= 19&)
teenagers.map(t =& &Name: & + t(0)).collect().foreach(println)复制代码
配置parquet的配置可通过SQLContext的setConf方法或通过运行SET key=value方式完成。属性名默认值注释spark.sql.parquet.binaryAsStringfalse一些其他的支持parquet的系统如impala或旧版本的spark,在写出parquet schema的时候不会区分二进制数据和字符串。该配置选项告诉sparkSQL将字符串解析为二进制数据以使兼容这些系统spark.sql.parquet.cacheMetadatafalse打开缓存parquet schema元数据会提升静态数据的查询速度spark.pression.codecsnappy设置写parquet文件的压缩方式,包括:uncompressed,snappy,gzip,lzojson数据集
sparkSQL可以通过json数据集推断出schema并将该schema转换为SchemaRDD,该转换可以通过SQLContext的以下两个方法实现:jsonFile-从json文件所在目录中导入数据,文件的每行必须是一个json对象jsonRdd-从已存在的RDD导入数据,RDD中的每个元素是包含json对象的字符串// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = &examples/src/main/resources/people.json&
// Create a SchemaRDD from the file(s) pointed to by path
val people = sqlContext.jsonFile(path)
// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//&&|-- age: IntegerType
//&&|-- name: StringType
// Register this SchemaRDD as a table.
people.registerTempTable(&people&)
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql(&SELECT name FROM people WHERE age &= 13 AND age &= 19&)
// Alternatively, a SchemaRDD can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
&&&&&{&name&:&Yin&,&address&:{&city&:&Columbus&,&state&:&Ohio&}}&&& :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)复制代码
hive表sparkSQL同时也支持读写存储在hive中数据,然而,由于hive有很多的依赖,所以在使用hive前需要运行sbt/sbt -Phive assembly/assembly来编译一个包含hive的jar包。这个jar包也需要放到所有的worker节点上。
配置hive的一些参数需要放在conf目录的hive-site.xml文件中。
当需要使用hive则必须构造一个HiveContext,它继承于SQLContext,当用户没有部署过hive也可以创建HiveContext,当没有配置hive-site.xml文件,那么context会自动在当前目录中创建metastore_db和warehouse。// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql(&CREATE TABLE IF NOT EXISTS src (key INT, value STRING)&)
sqlContext.sql(&LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src&)
// Queries are expressed in HiveQL
sqlContext.sql(&FROM src SELECT key, value&).collect().foreach(println)复制代码
性能优化对于很多情况来说需要使用将数据缓存在内存中或使用一些试验性选项来优化的方式来提高性能。将数据缓存在内存sparkSQL可以通过cacheTable(&tableName&)方法将表缓存在内存中,这样sparkSQL将只会扫描需要的列并将自动优化压缩以最小化内存使用和GC压力。使用uncacheTable(&tableName&)将表从内存移除。如果使用cache,那么表将不会使用柱状格式缓存在内存中,所以推荐使用cacheTable方法。
相关的一些参数如下:属性名默认值注释spark.pressedfalse当为true时,sparkSQL会基于数据的统计情况自动选择一种压缩方式来压缩每列数据spark.sql.inMemoryColumnarStorage.batchSize1000控制缓存时的大小,较大的值可以提高内存使用率和压缩效率,但会出现OOM
其他的配置选项下面的一些参数也可以用来优化查询性能:属性名默认值注释spark.sql.autoBroadcastJoinThreshold10000当处理join查询时广播到每个worker的表的最大字节数,当设置为-1广播功能将失效spark.sql.codegenfalse当为true时,那么会在运行时动态生成指定查询的表达式,对于那些拥有复杂表达式的查询来说,该选项会导致明显的速度提升,然而对于简单的查询该选项会减慢查询速度spark.sql.shuffle.partitions200配置在处理join或aggregation时shuffle数据的partition数量
其他sql接口运行thrift jdbc server运行如下命令即可启动jdbc server:./sbin/start-thriftserver.sh复制代码
该命令接受所有bin/spark-submit的命令行参数,另外还可通过--hiveconf参数指定hive的属性文件,默认该服务会监听localhost:10000,可通过以下两种方式进行修改:export HIVE_SERVER2_THRIFT_PORT=&listening-port&
export HIVE_SERVER2_THRIFT_BIND_HOST=&listening-host&
./sbin/start-thriftserver.sh --master &master-uri& ...复制代码./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=&listening-port& --hiveconf hive.server2.thrift.bind.host=&listening-host& --master &master-uri& ...
复制代码
现在就可以使用下面命令进行测试了:./bin/beeline复制代码
然后在beeline命令行模式下运行!connect jdbc:hive2://localhost:10000,beeline将会提示你输入用户名和密码,具体可查看。
运行sparkSQL CLIsparkSQL CLI可以在本地模式运行hive metastore并执行通过命令行输入的查询语句,sparkSQL CLI不能与thrift jdbc服务进行通信。下面代码可以启动sparkSQL CLI:./bin/spark-sql复制代码
与其他系统兼容shark用户迁移指南调度通过SET spark.sql.thriftserver.scheduler.pool=来设置jdbc客户端会话的公平调度池。
reducer数量在shark中,默认的reducer数量为1并且通过mapred.reduce.tasks属性进行设置,sparkSQL已经废弃了该属性,以spark.sql.shuffle.partitions(默认值为200)代替,通过以下代码进行设置:SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;复制代码
还可以将该属性放在hive-site.xml文件中覆盖默认值。到目前,mapred.reduce.tasks属性还可以使用,会被自动转换为spark.sql.shuffle.partitions。
缓存spark.cache表属性也不再可用了,任何以_cached结尾的表名也不会被缓存了,现在通过CACHE TABLE和UNCACHE TABLE两个语句来控制表缓存:CACHE TABLE logs_last_
UNCACHE TABLE logs_last_复制代码
注意:CACHE TABLE tbl是懒加载的,类似RDD中的.cache方法,该命令只是将tbl标记为当被计算的时候确保partitions被缓存,只有等tbl真正被执行的时候才会缓存。如果希望立即被缓存,使用如下方式:CACHE TABLE logs_last_
SELECT COUNT(1) FROM logs_last_复制代码
与hive兼容sparkSQL可以与hive metastore,serdes和UDF兼容。sparkSQL thrift jdbc服务能够与当前已经安装的hive进行兼容,不需要修改已存在的hive metastore或者改变数据存放目录。
支持的hive特性
sparkSQL支持很多hive特性,如:hive 查询语句:select,group by,order by,cluster by,sort by所有hive操作:关系型操作(=, ⇔, ==, &&, &, &, &=, &=等),算术操作(+, -, *, /, %等),裸机操作(AND, &&, OR, ||等),复杂类型构造,数学函数(sign, ln, cos等),字符串函数(instr, length, printf等)用户定义函数(UDF)用户定义聚合函数(UDAF)用户定义序列化格式(serdes)join:join,{left|right|full} outer join,left semi join,cross join联合查询子查询:select col from(select a+b as col from t1)t2取样操作解释操作表分割所有的hive DDL函数:create table,create table as select,alter table大部分的hive数据类型:TINYINT,SMALLINT,INT,BIGINT,BOOLEAN,FLOAT,DOUBLE,STRING,BINARY,TIMESTAMP,ARRAY&&,MAP&&,STRUCT&&
不支持的hive功能
主要的hive特性
sparkSQL目前不支持使用动态分片向表插入数据sparkSQL不支持带有桶的表,桶是hive表分片的hash分片方式
深奥的hive特性
hive中带有分片的表使用不同的输入格式,在sparkSQL中,所有的表分片使用相同的输入格式hive支持在join操作使用non-equi的条件(如key&10),在sparkSQL中如果使用这种方式会输出错误的结果UNION和DATE类型唯一join单查询多插入sparkSQL不支持piggyback浏览来收集列统计,只支持操作hive metastore中的sizeInBytes字段
hive输入输出格式
sparkSQL只支持TextOutputFormathadoop归档文件
hive优化很多的hive优化目前都不能用户sparkSQL,其中一些(如索引)对于sparkSQL不是很重要的,因为sparkSQL是内存计算模型,其他的一些会在未来的sparkSQL版本中得到支持:块级别的bitmap索引和虚拟字段(用来建立索引)自动将join转换为map join:在大表与很多小表进行join时,hive会自动将join转换为map join,下个版本的sparkSQL会得到支持自动决定join和groupby时reducer的数量,目前,sparkSQL需要使用SET spark.sql.shuffle.partitions=[num_tasks];来控制并行度只查询元数据:hive在查询时可以只查询元数据,sparkSQL需要部署任务来计算结果sparkSQL不支持hive中的skew data flagsparkSQL不支持hive的STREAMTABLE合并多个小文件作为查询结果:如果查询结果包括很多的小文件,hive可以合并这些小文件为大文件避免HDFS元数据容量溢出。sparkSQL暂时不支持该特性
书写语言集成的关系型查询语言集成查询是高级特性并只在scala中支持。如:// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
// The following is the same as 'SELECT name FROM people WHERE age &= 10 AND age &= 19'
val teenagers = people.where('age &= 10).where('age &= 19).select('name)
teenagers.map(t =& &Name: & + t(0)).collect().foreach(println)复制代码
使用'符号会隐式将相关语句转换为sql表达式,相关详细信息可查看。
sparkSQL数据类型
数字类型:Byte,Short,Integer,Long,Float,Double,Decimal字符串类型:String二进制类型:Binary布尔类型:Boolean日期类型:Timestamp复杂类型:Array(elementType,containsNull),表示一系列的elementType类型的数组,containsNull表示数组中的值是否允许为null;Map(keyType,valueType,valueContainsNull):表示一系列的key-value值对;Struct(fields):表示由一系列的StructFields组成的结构体,StructFields的定义为StructField(name,dataType,nullable) 所有的sparkSQL数据类型都在org.apache.spark.sql包中,通过import org.apache.spark.sql._来导入所有的数据类型。
数据类型scala中值类型访问api或创建数据类型ByteTypeByteByteTypeShortTypeShortShortTypeIntegerTypeIntIntegerTypeLongTypeLongLongTypeFloatTypeFloatFloatTypeDoubleDoubleDoubleTypeDecimalTypescala.math.sql.BigDecimalDecimalTypeStringTypeStringStringTypeBinaryTypeArray[Byte]BinaryTypeBooleanTypeBooleanBooleanTypeTimestampTypejava.sql.TimestampTimestampTypeArrayTypescala.collection.SeqArrayType(elementType,[containsNull]),containsNull默认值为falseMapTypescala.collection.MapMapType(keyType,valueType,[valueContainsNull]),valueContainsNull默认为trueStructTypeorg.apache.spark.sql.RowStructType(fields),fields是StructFields的Seq,两个名字相同的fields是不允许的StructField字段数据类型在scala的值类型StructField(name,dataType,nullable)
欢迎加入about云群 、 ,云计算爱好者群,关注
主题帖子积分
中级会员, 积分 963, 距离下一级还需 37 积分
中级会员, 积分 963, 距离下一级还需 37 积分
主题帖子积分
中级会员, 积分 393, 距离下一级还需 607 积分
中级会员, 积分 393, 距离下一级还需 607 积分
好材料,一定要顶。
积极上进,爱好学习
站长推荐 /4
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
等待验证会员请验证邮箱
新手获取积分方法
技术类问答,解决学习openstack,hadoop生态系统中遇到的问题
Powered by

我要回帖

更多关于 spark 文件 snappy 的文章

 

随机推荐