请教“spark如何截spark读取csv文件件的前几列并妗

一、Spark-sql创建外部分区表 1.使用spark-sql spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10--executor-cores 2 --executor-memory 3G 2.spark-sql中创建parquet分区表: create external table pgls.convert_parq(bill_num string,logis_id string,store_id string,store_code string,creater_id string,order_status INT,pay_status INT,order_require_varieties INT,order_require_amount decimal(19,4),order_rec_amount decimal(19,4),order_rec_gpf decimal(19,4),deli_fee FLOAT,order_type INT,last_modify_time timestamp,order_submit_time timestamp) partitioned by(order_submit_date date)row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'stored as parquetfilelocation '/test/spark/convert/parquet/bill_parq/'; 二、CSV转Parquet 代码:org.apache.spark.ConvertToParquet.scala package org.apache.sparkimport com.ecfront.fs.operation.HDFSOperationimport org.apache.hadoop.conf.Configurationimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.types._/*** CSV 转换为 parquet* 参数:输入路径, 输出路径, 分区数*/object ConvertToParquet{def main(args: Array[String]) {if(args.length != 3){println("jar args: inputFiles outPath numpartitions")System.exit(0)}val inputPath = args(0)val outPath = args(1)val numPartitions = args(2).toIntprintln("==========================================")println("=========input: "+ inputPath )println("=========output: "+ outPath )println("==numPartitions: "+ numPartitions )println("==========================================")//判断输出目录是否存在,存在则删除val fo = HDFSOperation(new Configuration())val existDir = fo.existDir(outPath)if(existDir) {println("HDFS exists outpath: " + outPath)println("start to delete ...")val isDelete = fo.deleteDir(outPath)if(isDelete){println(outPath +" delete done. ")}}val conf = new SparkConf()val sc = new SparkContext(conf) //参数SparkConf创建SparkContext,val sqlContext = new SQLContext(sc) //参数SparkContext创建SQLContextval schema = StructType(Array(StructField("bill_num",DataTypes.StringType,false),StructField("logis_id",DataTypes.StringType,false),StructField("store_id",DataTypes.StringType,false),StructField("store_code",DataTypes.StringType,false),StructField("creater_id",DataTypes.StringType,false),StructField("order_status",DataTypes.IntegerType,false),StructField("pay_status",DataTypes.IntegerType,false),StructField("order_require_varieties",DataTypes.IntegerType,false),StructField("order_require_amount",DataTypes.createDecimalType(19,4),false),StructField("order_rec_amount",DataTypes.createDecimalType(19,4),false),StructField("order_rec_gpf",DataTypes.createDecimalType(19,4),false),StructField("deli_fee",DataTypes.FloatType,false),StructField("order_type",DataTypes.IntegerType,false),StructField("last_modify_time",DataTypes.TimestampType,false),StructField("order_submit_time",DataTypes.TimestampType,false),StructField("order_submit_date",DataTypes.DateType,false)))convert(sqlContext, inputPath, schema, outPath, numPartitions)}//CSV转换为parquetdef convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {// 将text导入到DataFrameval df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("delimiter", ",").load(inputpath)// 转换为parquet// df.write.parquet(outpath) // 转换时以block数为分区数df.coalesce(numPartitions).write.parquet(outpath) //自定义分区数}} 打包后jar上传至本地目录: /soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar 事先在HDFS上生成CSV文件,HDFS目录: /test/spark/convert/data/order// 执行命令: spark-submit --queue spark --master yarn --num-executors 10--executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar/test/spark/convert/data/order// /test/spark/convert/parquet/bill_parq/order_submit_date= pom.xml相关内容: 1.依赖包:
com.ecfront ez-fs 0.9 org.apache.spark spark-core_2.10 1.6.1 org.apache.spark spark-sql_2.10 1.6.1 com.databricks spark-csv_2.11 1.4.0 org.apache.hadoop hadoop-client 2.6.0 2.plugins(含打入依赖包)
net.alchim31.mavenscala-maven-plugin3.2.1
org.apache.maven.pluginsmaven-compiler-plugin2.0.2
org.apache.maven.pluginsmaven-shade-plugin1.4 *:*META-INF/*.SFMETA-INF/*.DSAMETA-INF/*.RSA
net.alchim31.maven scala-maven-plugin scala-compile-firstprocess-resources add-source compilescala-test-compileprocess-test-resources testCompile
org.apache.maven.plugins maven-compiler-plugin compile compile
org.apache.maven.plugins maven-shade-plugin 1.4 true
package shade <transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">org.apache.spark.ConvertToParquet
三、表添加分区 spark-sql下执行 alter table pgls.convert_parq add partition(order_submit_date=''); 可通过sql查询到相应数据: select * from pgls.convert_parq where order_submit_date='' limit 5;
最新教程周点击榜
微信扫一扫正在播放:33.flatMap函数处理csv文件进行扁平化RDD处理
下载学院APP缓存视频离线看
购买本课程后即可享受以下服务:
24小时讲师答疑
所有课时永久观看
购买课程返学分
购买本课程后,就可以记笔记了~~
开始记笔记……
记录时间点
我的笔记同学的笔记
购买本课程后,就可以提问题了~~
向老师提问……
记录时间点
我的问题同学的问题Apache Spark DataFrames入门指南:创建DataFrame &#8211; 过往记忆
欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop。
欢迎关注微信公共帐号:iteblog_hadoop
IT技术前沿:geek_toutiao
  本系列文章翻译自:《scala data analysis cookbook》第二章:Getting Started with Apache
DataFrames。原书是基于 1.4.1编写的,我这里使用的是Spark 1.6.0,丢弃了一些已经标记为遗弃的函数。并且修正了其中的错误。
  一、从csv文件创建DataFrame
    如何做?
    如何工作的
    附录
  二、操作DataFrame
    打印DataFrame里面的模式
    对DataFrame里面的数据进行采样
    查询DataFrame里面的列
    根据条件过滤数据
    对DataFrame里面的数据进行排序
    对列进行重命名
    将DataFrame看作是关系型数据表
    对两个DataFrame进行Join操作
    将DataFrame保存成文件
  三、从Scala case class中创建DataFrame
    如何做?
    如何工作的
    附录
一、从csv文件创建DataFrame
  本文将介绍如何从csv文件创建DataFrame。
  从csv文件创建DataFrame主要包括以下几步骤:
  1、在build.sbt文件里面添加spark-csv支持库;
  2、创建SparkConf对象,其中包括Spark运行所有的环境信息;
  3、创建SparkContext对象,它是进入Spark的核心切入点,然后我们可以通过它创建SQLContext对象;
  4、使用SQLContext对象加载CSV文件;
  5、Spark内置是不支持解析CSV文件的,但是Databricks公司开发了一个类库可以支持解析CSV文件。所以我们需要把这个依赖文件加载到依赖文件中(pom.xml或者是build.sbt)
如果你是SBT工程,请加入以下依赖到build.sbt文件中:
libraryDependencies += &com.databricks& % &spark-csv_2.10& % &1.3.0&
如果你是Maven工程,请加入以下依赖到pom.xml文件中:
&dependency&
&groupid&com.databricks&/groupid&
&artifactid&spark-csv_2.10&/artifactid&
&version&1.3.0&/version&
&/dependency&
  6、SparkConf持有所有运行Spark程序的信息,在这个实例中,我们将以本地的方式运行这个程序,而且我们打算使用2个核(local[2]),部分代码片段如下:
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName(&csvDataFrame&).setMaster(&local[2]&)
  7、使用SparkConf初始化SparkContext对象,SparkContext是进入Spark的核心切入点:
val sc = new SparkContext(conf)
在Spark中查询数据最简单的一种方式就是使用SQL查询,所以我们可以定义一个SQLContext对象:
val sqlContext=new SQLContext(sc)
  8、现在我们就可以加载事先准备好的数据了:
import com.databricks.spark.csv._
val students=sqlContext.csvFile(filePath=&StudentData.csv&, useHeader=true, delimiter='|')
其中,students对象的类型是org.apache. spark.sql.DataFrame。
如何工作的
  csvFile方法接收需要加载的csv文件路径filePath,如果需要加载的csv文件有头部信息,我们可以将useHeader设置为true,这样就可以将第一行的信息当作列名称来读;delimiter指定csv文件列之间的分隔符。
  除了使用csvFile函数,我们还可以使用sqlContext里面的load来加载csv文件:
val options = Map(&header& -& &true&, &path& -& &E:\\StudentData.csv&)
val newStudents = sqlContext.read.options(options).format(&com.databricks.spark.csv&).load()
为了方便大家测试,我提供了StudentData.csv文件的部分数据集:
id|studentName|phone|email
1|Burke|1-300-746-8446|ullamcorper.velit.in@ametnullaDonec.co.uk
2|Kamal|1-668-571-5046|pede.Suspendisse@interdumenim.edu
3|Olga|1-956-311-1686|Aenean.eget.metus@dictumcursusNunc.edu
4|Belle|1-246-894-6340|vitae.aliquet.nec@neque.co.uk
5|Trevor|1-300-527-4967|dapibus.
6|Laurel|1-691-379-9921|adipiscing@consectetueripsum.edu
7|Sara|1-608-140-1995|Donec.nibh@enimEtiamimperdiet.edu
8|Kaseem|1-881-586-2689|cursus.et.magna@euismod.org
9|Lev|1-916-367-5608|Vivamus.
10|Maya|1-271-683-2698|accumsan.convallis@ornarelectusjusto.edu
11|Emi|1-467-270-1337|
12|Caleb|1-683-212-0896|Suspendisse@Quisque.edu
13|Florence|1-603-575-2444|sit.amet.dapibus@lacusAliquamrutrum.ca
14|Anika|1-856-828-7883|euismod@ligulaelit.co.uk
15|Tarik|1-398-171-2268|
16|Amena|1-878-250-3129|lorem.luctus.
17|Blossom|1-154-406-modo.auctor@eratSed.co.uk
18|Guy|1-869-521-3230|senectus.et.
19|Malachi|1-608-637-2772|Proin.mi.
20|Edward|1-711-710-6552|lectus@aliquetlibero.co.uk
本博客文章除特别声明,全部都是原创!
禁止个人和公司转载本文、谢谢理解:
下面文章您可能感兴趣spark(11)
有时候数据量会大到本机可能无法存储,这时就需要探索别的读取和保存方法了。
Spark支持很多种输入源和输出源。一部分原因是Spark本身是基于Hadoop生态圈二构建的,so spark可以通过Hadoop MapReduce 所使用的InputFormat 和 OutPutFormat 接口访问数据,而大部分常见的文件格式与存储系统(S3,HDFS,Cassandra,HBase等)都支持这种接口。
Spark所支持的三种常见数据源:
文本格式与文件系统
数据库与键值存储
2、文件格式:
- 文本文件:不是结构化,普通的文本文件,每一行一条记录
- JSON: 半结构化,常见的基于文本的格式,半结构化;大多数库都要求每行一条记录。
- CSV:结构化,非常常见的基于文本的格式,通常在电子表格应用中使用。
- SequenceFile:结构化,一种用于键值对数据的常见Hadoop文件格式。
- Protocol buffers:结构化,一种快速的,节约空间的跨语言格式。
- 对象文件:用来将Spark作业中的数据存储下来以让共享的代码读取。改变类的时候它会失效,因为他依赖于java序列化。
(1) 文本文件
当我们将一个文本文件读取为RDD时,输入的每一行都会成为RDD的每一个元素。也可以将多个文本文件读取为一个pair RDD,其中键是文件名,值是文件内容。
scala& val input = sc.textFile("/Users/mac/Documents/javascala/VNCluster/500points.txt ")
input: org.apache.spark.rdd.RDD[String] = /Users/mac/Documents/javascala/VNCluster/500points.txt
MapPartitionsRDD[1] at textFile at &console&:24
同样可以指定分区数
scala& val input = sc.textFile("/Users/mac/Documents/javascala/VNCluster/500points.txt ",4)
input: org.apache.spark.rdd.RDD[String] = /Users/mac/Documents/javascala/VNCluster/500points.txt
MapPartitionsRDD[3] at textFile at &console&:24
我是读入一个数据点文本文件,然后对数据做一些操作存储在源目录中:
scala& val data = input.map(x =& (x.split(" ")(0),x.split(" ")(1)))
data: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[6] at map at &console&:26
scala& data.first
res3: (String, String) = (0.2489,0.7091)
scala& data.filter{case(x,y) =& y.toDouble & 0.5}
res5: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at filter at &console&:29
scala& val afterDeal = data.filter{case(x,y) =& y.toDouble & 0.5}
afterDeal: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[11] at filter at &console&:28
scala& afterDeal.first
res7: (String, String) = (0.2489,0.7091)
scala& afterDeal.saveAsTextFile("/Users/mac/Documents/javascala/VNCluster/rdd.txt")
读取JSON文件:将数据作为文本文件,然后对JSON数据进行解析,这样的方法可以在所有支持的编程语言中使用。
保存JSON文件:不需要考虑格式错误的数据,并且也知道要写出的数据类型。
(3)逗号分隔值与制表符分隔值
逗号分隔值(CSV)文件每行都有固定数目的字段,字段间用逗号隔开。记录通常是一行一条,有时也可以跨行。
val conf = new SparkConf().setMaster("local").setAppName("My app")
val sc = new SparkContext(conf)
val input = sc.textFile("inputFile")
val result = input.map{ line =&
val reader = new CSVReader(new StringBuilder(line))
reader.readNext()
保存CSV:和保存保存文本文件基本一样
scala& val input = sc.textFile("/Users/mac/Desktop/500points.csv")
input: org.apache.spark.rdd.RDD[String] = /Users/mac/Desktop/500points.csv MapPartitionsRDD[1] at textFile at &console&:24
scala& input.first
res0: String = 237.09,712.1
scala& val data = input.map(x =& (x.split(",")(0),x.split(",")(1)))
data: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at &console&:26
scala& data.first
res1: (String, String) = (237.09,712.1)
scala& val deal = data.mapValues(x =& x.toDouble / x.max.toDouble)
deal: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[3] at mapValues at &console&:28
scala& deal.saveAsTextFile("/Users/mac/Desktop/500pointsresult.csv" )
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:6937次
排名:千里之外
原创:65篇
(5)(8)(13)(9)(16)(18)(8)原文链接:
CSV格式的文件也称为逗号分隔值(Comma-Separated Values,CSV,有时也称为字符分隔值,因为分隔字符也可以不是逗号。在本文中的CSV格式的数据就不是简单的逗号分割的),其文件以纯文本形式存表格数据(数字和文本)。CSV文件由任意数目的记录组成,记录间以某种换行符分隔;每条记录由字段组成,字段间的分隔符是其它字符或字符串,最常见的是逗号或制表符。通常,所有记录都有完全相同的字段序列。
  本篇文章将介绍如何使用&1.3+的外部数据源接口来自定义CSV输入格式的文件解析器。这个外部数据源接口是由databricks公司开发并开源的(地址:/databricks/spark-csv),通过这个类库我们可以在&SQL中解析并查询CSV中的数据。因为用到了Spark的外部数据源接口,所以我们需要在Spark 1.3+上面使用。在使用之前,我们需要引入以下的依赖:
&dependency&
&&&&&groupId&com.databricks&/groupId&
&&&&&artifactId&spark-csv_2.10&/artifactId&
&&&&&version&1.0.3&/version&
&/dependency&
目前spark-csv_2.10的最新版就是1.0.3。如果我们想在Spark shell里面使用,我们可以在--jars选项里面加入这个依赖,如下:
[iteblog@spark $] bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3
  和文章中用到的load函数类似,在使用CSV类库的时候,我们需要在options中传入以下几个选项:
  1、path:看名字就知道,这个就是我们需要解析的CSV文件的路径,路径支持通配符;  2、header:默认值是false。我们知道,CSV文件第一行一般是解释各个列的含义的名称,如果我们不需要加载这一行,我们可以将这个选项设置为true;  3、delimiter:默认情况下,CSV是使用英文逗号分隔的,如果不是这个分隔,我们就可以设置这个选项。  4、quote:默认情况下的引号是'"',我们可以通过设置这个选项来支持别的引号。  5、mode:解析的模式。默认值是PERMISSIVE,支持的选项有    (1)、PERMISSIVE:尝试解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored.    (2)、DROPMALFORMED:drops lines which have fewer or more tokens than expected    (3)、FAILFAST: aborts with a RuntimeException if encounters any malformed line
1、在Spark SQL中使用
  我们可以通过注册临时表,然后使用纯SQL方式去查询CSV文件:
CREATE&TABLE&cars
USING com.databricks.spark.csv
OPTIONS (path&"cars.csv", header&"true")
我们还可以在DDL中指定列的名字和类型,如下:
CREATE&TABLEcars (yearMade&double, carMake string, carModel string, comments string, blank string)
USING com.databricks.spark.csv
OPTIONS (path&"cars.csv", header&"true")
2、通过方式
  推荐的方式是通过调用SQLContext的load/save函数来加载CSV数据:
import&org.apache.spark.sql.SQLContext
val&sqlContext&=&new&SQLContext(sc)
val&df&=&sqlContext.load("com.databricks.spark.csv", Map("path"&-&&"cars.csv",&"header"-&&"true"))
df.select("year",&"model").save("newcars.csv",&"com.databricks.spark.csv")
当然,我们还可以使用com.databricks.spark.csv._的隐式转换:
import&org.apache.spark.sql.SQLContext
import&com.databricks.spark.csv._
val&sqlContext&=&new&SQLContext(sc)
val&cars&=&sqlContext.csvFile("cars.csv")
cars.select("year",&"model").saveAsCsvFile("newcars.tsv")
3、在Java中使用
和在中使用类似,我们也推荐调用SQLContext类中&load/save函数
&* User: 过往记忆
&* Time: 下午23:26
&* 本文地址:
&* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
&* 过往记忆博客微信公共帐号:iteblog_hadoop
import&org.apache.spark.sql.SQLContext
SQLContext sqlContext =&new&SQLContext(sc);
HashMap&String, String& options =&new&HashMap&String, String&();
options.put("header",&"true");
options.put("path",&"cars.csv");
DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
df.select("year",&"model").save("newcars.csv",&"com.databricks.spark.csv");
在Java或者是Scala中,我们可以通过CsvParser里面的函数来读取CSV文件:
import&com.databricks.spark.csv.CsvP
SQLContext sqlContext =&new&org.apache.spark.sql.SQLContext(sc);
DataFrame cars = (new&CsvParser()).withUseHeader(true).csvFile(sqlContext,&"cars.csv");
4、在中使用
在中,我们也可以使用SQLContext类中&load/save函数来读取和保存CSV文件:
from&pyspark.sql&import&SQLContext
sqlContext&=&SQLContext(sc)
df&=&sqlContext.load(source="com.databricks.spark.csv", header="true", path&=&"cars.csv")
df.select("year",&"model").save("newcars.csv",&"com.databricks.spark.csv")
阅读(...) 评论()

我要回帖

更多关于 spark读取csv文件 的文章

 

随机推荐