hadoop jar files自定义分区必须打成JAR包放到服务器端运行吗?

gcpopo的充实人生_百度空间
如果要自定义数据类型,作value的类型的话,可以实现Writable接口,如果做key的类型的话,必须实现更加严格的接口——WritableComparable。
Hive 的启动方式
& & & && &&&hive&&命令行模式,直接输入/hive/bin/hive的执行程序,或者输入 hive –service cli
& & & && &&&hive&&web界面的启动方式,hive –service hwi&&
& & & && &&&hive&&远程服务 (端口号10000) 启动方式,nohup hive –service hiveserver&&&
Hive 与 JDBC&
& & & & 导入hive\lib下的所有jar包到IDE的classpath里面,还有hadoop中的 hadoop-0.20.2-core.jar包,即可运行下列代码:
& & & & package com.javabloger.
& & & & import java.sql.C
& & & & import java.sql.DriverM
& & & & import java.sql.ResultS
& & & & import java.sql.S
& & & & public class HiveTestCase {
& & & && &&&public static void main(String[] args) throws&&Exception {
& & & && && && &Class.forName(&org.apache.hadoop.hive.jdbc.HiveDriver&);
& & & && && && &String dropSQL=&drop table javabloger&;
& & & && && &nbs
通常情况下hive将数据存储到hadoop上/user/hive/warehouse目录下,关系型数据库使用索引index去加快查询速度,而hive使用的是以恶搞所谓的partition columns的概念,例如比如说存在某一行叫做state,可以根据state中存储的数据值,将state分为50个partitions。如果存在 date列的话,那么通常按照时间进行partition,hive在对分区的列上进行查询的速度会比较快,原因是hadoop在数据存储上将不同的分区存储在了不同的目录文件下。例如对于上面的列state和date,可能的存储模型如下:
当然每个分区内的数据文件可能还是比较大,幸好在hive中存在一个所谓的buckets的概念,buckets根据hash值将数据分割成更小的数据文件,还是上面的例子,如果使用buckets的话,可能的存储模型如下:
我们将通过实际hql语句来分析hql的语法。
该条语句创建表page_view,表中有5列,同时在见表语句中指出了各个列的数据类型,在hive中内建支持的数据类型如下:
String path = ((org.apache.hadoop.mapreduce.lib.input.FileSplit)context.getInputSplit()).getPath().toString();
另外,通过查询API,发现原有的Reporter类仍旧存在,下涉子接口Progressable,Progressable的实现类就有Mapper.Context 已经 Reducer.Context,所以Reporter原有的方法应该都能够移植过来。
查看所有表:
使用通配符:
show tables ‘带通配符’;
查看表分区:
Show partitions table_
查看表结构:
Describe table_
两表关联(只支持等值关联):
select a.*,b.* from cn_se_section_sdt0 a
join cn_se_section_dimt0 b
on( a.section_id=b.section_id)
分区表创建:
Create table cn_se_section_sdt4&
(stat_date date
,search_type_id int
,section_id int
,seo_clk_cnt int
,site_clk_cnt int)
partitioned by (dt string)
row format delimited fields terminated by ',';
分区表加载数据:
load data local inpath '/usr/mydata/aaa.txt' into table cn_se_section_sdt4 partition(dt='');
分区表查询:
select a.* from cn_se_section_sdt4 a where a.dt='' limit 3;
分区表添加分区:
alter table cn_se_section_sdt4 add partition (dt='');
Create table cn_se_section_sdt0&
(stat_date date
,search_type_id int
,section_id int
,seo_clk_cnt int
,site_clk_cnt int)&&
row format delimited fields terminated by ',‘;
复制一个空表:
create table table_name1 like table_name2;
加载数据:
Load data from local:
Hive&load data local inpath '/usr/mydata/sdt.txt' overwrite into table cn_se_section_sdt0;
Load data from hdfs:
hive& load data inpath'/home/chengjuan/input3/sdt.txt' overwrite into table cn_s
Hive是為簡化編寫MapReduce程序而生的,使用MapReduce做過數據分析的人都知道,很多分析程序除業務邏輯不同外,程序流程基本一樣。在這種情況下,就需要Hive這樣的用戶編程接口。Hive本身不存儲和計算數據,它完全依賴於HDFS和MapReduce,Hive中的表純邏輯表,就是些表的定義等,也就是表的元數據。使用SQL實現Hive是因為SQL大家都熟悉,轉換成本低,類似作用的Pig就不是SQL。
HBase為查詢而生的,它通過組織起節點內所有機器的內存,提供一個超大的內存Hash表,它需要組織自己的數據結構,包括磁盤和內存中的,而Hive是不做這個的,表在HBase中是物理表,而不是邏輯表,搜索引擎使用它來存儲索引,以滿足查詢的實時性需求。
转自:/flying5/archive//2078399.html
主要参考这篇文章1.Copy a file from the local file system toHDFSThe srcFile variable needs to contain the full name (path +file name) of the file in the local filesystem.&The dstFile variable needs to contain the desired full name ofthe file in the Hadoop file system.Configuration&config&=&new&Configuration();
&&FileSystemhdfs&=&FileSystem.get(config);
&&PathsrcPath&=&new&Path(srcFile);
&&PathdstPath&=&new&Path(dstFile);
&&hdfs.copyFromLocalFile(srcPath,&dstPath);
2.Create HDFS fileThe fileName variable contains the file name and path in theHadoop file system.&The content of the file is the buff variable which is an arrayof bytes.//byte[] buff - The content of thefile&&Configuration&config&=&new&Configuration();
&&FileSystemhdfs&=&FileSystem.get(config);
&&Pathpath&=&new&Path(fileName);
&&FSDataOutputStream&outputStream&=&hdfs.create(path);
&&outputStream.write(buff,&0,&buff.length);3.Rename HDFS fileIn order to rename a file in Hadoop file system, we need thefull name (path + name) of&the file we want to rename. The rename method returns true ift
mr自带的例子中的源码SecondarySort,我重新写了一下,基本没变。
这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)
public static class Map extends Mapper&LongWritable, Text, IntPair, IntWritable&
public static class Reduce extends Reducer&IntPair, NullWritable, IntWritable, IntWritable&
1 首先说一下工作原理:
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是&LongWritable, Text&的原因。然后调用自定义Map的map方法,将一个个&LongWritable, Text&对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出&IntPair, IntWritable&。最终是生成一个List&IntPair, IntWritable&。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,10/2011
转自:/flying5/archive//2078407.htmlSpark1.0.0部署指南 - 推酷
Spark1.0.0部署指南
1 节点说明
192.168.1.111
ActiveNameNode
192.168.1 .112
StandbyNameNode,Master,Worker
192.168.1 .113
DataNode,Master,Worker
192.168.1 .114
DataNode,Worker
HDFS集群和Spark集群之间节点共用。
2 安装HDFS
见HDFS2.X和Hive的安装部署文档:
3 Spark部署
Spark常用的安装部署模式有
Spark On Yarn
Standalone
,可以同时使用。
3.1 Spark on Yarn
这种模式,借助Yarn资源分配的功能,使用Spark客户端来向Yarn提交任务运行。只需将Spark的部署包放置到Yarn集群的某个节点上即可(或者是Yarn的客户端, 能读取到Yarn集群的配置文件即可 )。Spark本身的Worker节点、Master节点不需要启动。
但是,Spark的部署包须是基于对应的Yarn版本正确编译后的,否则会出现Spark和Yarn的兼容性问题。
on Yarn的两种运行方式,其运行结束后的日志不能在Yarn的Application管理界面看到,目前只能在客户端通过:
yarn logs -applicationId &applicationId&
命令查看每个Application的日志。
3.1.1 配置
部署这种模式,需要修改conf目录下的
spark-env.sh
文件。在其中新增如下配置选项:
export HADOOP_HOME= /home/hadoop/hadoop-2.0.0-cdh4.5.0
export HADOOP_CONF_DIR= $HADOOP_HOME/etc/hadoop
SPARK_EXECUTOR_INSTANCES=2
SPARK_EXECUTOR_CORES=1
SPARK_EXECUTOR_MEMORY=400M
SPARK_DRIVER_MEMORY=400M
SPARK_YARN_APP_NAME=&Spark 1.0.0&
(1) HADOOP_HOME :当前节点中HDFS的部署路径,因为Spark需要和HDFS中的节点在一起;
(2) HADOOP_CONF_DIR :HDFS节点中的conf配置文件路径,正常情况下此目录为$HADOOP_HOME/etc/hadoop;
(3) SPARK_EXECUTOR_INSTANCES :在Yarn集群中启动的Worker的数目,默认为2个;
(4) SPARK_EXECUTOR_CORES :每个Worker所占用的CPU核的数目;
(5) SPARK_EXECUTOR_MEMORY :每个Worker所占用的内存大小;
(6) SPARK_DRIVER_MEMORY :Spark应用程序Application所占的内存大小,这里的Driver对应Yarn中的ApplicationMaster;
(7) SPARK_YARN_APP_NAME :Spark Application在Yarn中的名字;
配置完成后,将Spark部署文件放置到Yarn的节点中即可。这里,将
spark-1.0.0整个目录
放到Yarn集群的一个节点
192.168.1.112的 /home/hadoop
(设为spark的安装路径的父目录)路径下。
3.1.2 测试
在Spark的部署路径的bin路径下,执行spark-submit脚本来运行
spark-examples
包中的例子。执行如下:
./bin/spark-submit --master yarn \
--class org.apache.spark.examples.JavaWordCount \
--executor-memory 400M \
--driver-memory 400M \
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar ./hdfs-site.xml
这个例子是计算WordCount的,例子被打包在 /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar 包中,对应的Class为 org.apache.spark.examples.JavaWordCount , ./hdfs-site.xml 是HDFS中指定路径下的一个文件,WordCount就是针对它来做的。而
--master yarn
就是指定运行在Yarn集群中,以yarn模式运行。
Spark On Yarn有两种运行模式,一种是Yarn Cluster方式,一种是Yarn Client方式。
(1) Yarn Cluster : Spark Driver程序将作为一个ApplicationMaster在YARN集群中先启动,然后再由ApplicationMaster向RM申请资源启动executor以运行Task。因为Driver程序在Yarn中运行,所以程序的运行结果不能在客户端显示,所以最好将结果保存在HDFS上,客户端的终端显示的是作为Yarn的job的运行情况。
(2) Yarn Client : Spark Driver程序在客户端上运行,然后向Yarn申请运行exeutor以运行Task,本地程序负责最后的结果汇总等。客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和executor,另外ApplicationMaster和executor都是装载在container里运行,container默认的内存是1G,ApplicationMaster分配的内存是driver-memory,executor分配的内存是executor-memory。同时,因为Driver在客户端,所以程序的运行结果可以在客户端显示,Driver以进程名为 SparkSubmit 的形式存在。
上面命令中的提交方式“yarn”就是 默认按照“Yarn Client”方式 运行。用户可自定义运行方式,通过“--master”指定程序以 yarn、yarn-cluster或者yarn-client 中的一种方式运行。
需要重点说明的是最后文件的路径,是相当于HDFS中的/user/hadoop而言,hadoop是当前命令的用户。“./hdfs-site.xml”在HDFS中的全路径为“ hdfs://namespace/user/hadoop/hdfs-site.xml ”,其中hadoop是当前的用户,namespace是HDFS的命名空间;如果写成“/hdfs-site.xml”则在HDFS中指的是“
hdfs://namespace/hdfs-site.xml
”;当然也可以直接传入“
hdfs://namespace/user/hadoop/hdfs-site.xml
”用于指定在HDFS中的要进行WordCount计算的文件。
另外,Spark应用程序需要的CPU Core数目和内存,需要根据当前Yarn的NodeManager的硬件条件相应设置,不能超过NodeManager的硬件条件。
./bin/spark-submit --master yarn \
--class org.apache.spark.examples.JavaWordCount \
--executor-memory 400M \
--driver-memory 400M \
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar hdfs://namespace/user/hadoop/hdfs-site.xml
在Yarn的ResourceManager对应的Web界面中查看启动的Application。
同时可以在启动脚本的客户端看到WordCount的运行结果:
3.2 Spark Standalone
这种模式,就是把Spark单独作为一个集群来进行部署。集群中有两种节点,
一种是Master,另一种是Worker节点
。Master负责分配任务给Worker节点来执行,并负责最后的结果合并,Worker节点负责具体的任务执行。
3.2.1 配置
所需修改的配置文件除了spark-env.sh文件以外,还有slave文件,都位于conf目录中。
slave文件中保存的是worker节点host或者IP,此处的配置为:
192.168.1.112
192.168.1.113
192.168.1.114
至于spark-env.sh文件,可以配置如下属性:
(1) SPARK_MASTER_PORT :Master服务端口,默认为7077;
(2) SPARK_WORKER_CORES :每个Worker进程所需要的CPU核的数目;
(3) SPARK_WORKER_MEMORY :每个Worker进程所需要的内存大小;
(4) SPARK_WORKER_INSTANCES :每个Worker节点上运行Worker进程的数目;
(5) SPARK_MASTER_WEBUI_PORT :Master节点对应Web服务的端口;
(6) export SPARK_DAEMON_JAVA_OPTS=&-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:.1.118:.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark &:用于指定Master的HA,依赖于zookeeper集群;
(7) export SPARK_JAVA_OPTS=&-Dspark.cores.max=4& :用于限定每个提交的Spark Application的使用的CPU核的数目,因为 缺省情况下提交的Application会使用所有集群中剩余的CPU Core 。
注意在Worker进程的CPU个数和内存大小的时候,要结合机器的实际硬件条件,如果一个Worker节点上的所有Worker进程需要的CPU总数目或者内存大小超过当前Worker节点的硬件条件,则Worker进程会 启动失败 。
将配置好的Spark文件拷贝至每个Spark集群的节点上的相同路径中。为方便使用 spark-shell ,可以在环境变量中配置上 SPARK_HOME 。
3.2.2 启动
配置结束后,就该启动集群了。这里使用Master的
方式,选取 192.168.1.112、192.168.1.113 节点作为 Master , 192.168.1.112、192.168.1.113、192.168.1.114 节点上运行两个 Worker 进程。
首先在 192.168.1.113 节点上做此操作:
启动之后,可以查看当前节点的进程:
另外,为了保证Master的HA,在 192.168.1.112 节点上只启动 Master :
192.168.1.112 节点的进程为:
启动过后,通过Web页面查看集群的情况,这里访问的是:
再看standby节点 192.168.1.112 的web界面
3.2.3 测试
Spark的bin子目录中的spark-submit脚本是用于提交程序到集群中运行的工具,我们使用此工具做一个关于pi的计算。命令如下:
./bin/spark-submit --master spark://spark113:7077 \
--class org.apache.spark.examples.SparkPi \
--name Spark-Pi --executor-memory 400M \
--driver-memory 512M \
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar
其中--master参数用于指定Master节点的URI,但是这里
填的是Host, 不是IP
任务启动之后,在Spark的Master的Web界面可以看到运行中的Application。
任务运行结束之后,在Web界面中 Completed Applications表格 中会看到对应的结果。
同时,命令行中会打印出来运行的结果,如下所示:
4 spark-submit工具
上面测试程序的提交都是使用的 spark-submit 脚本,其位于 $SPARK_HOME/bin 目录中,执行时需要传入的参数说明如下:
Usage: spark-submit [options] &app jar | python file& [app options]
--master MASTER_URL
可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
--deploy-mode DEPLOY_MODE
Driver程序运行的地方,client或者cluster
--class CLASS_NAME
主类名称,含包名
--name NAME
Application名称
--jars JARS
Driver依赖的第三方jar包
--py-files PY_FILES
用逗号隔开的放置在Python应用程序PYTHONPATH上的.zip, .egg, .py文件列表
--files FILES
用逗号隔开的要放置在每个executor工作目录的文件列表
--properties-file FILE
设置应用程序属性的文件路径,默认是conf/spark-defaults.conf
--driver-memory MEM
Driver程序使用内存大小
--driver-java-options
--driver-library-path
Driver程序的库路径
--driver-class-path
Driver程序的类路径
--executor-memory MEM
executor内存大小,默认1G
--driver-cores NUM
Driver程序的使用CPU个数,仅限于Spark Alone模式
--supervise
失败后是否重启Driver,仅限于Spark Alone模式
--total-executor-cores NUM
executor使用的总核数,仅限于Spark Alone、Spark on Mesos模式
--executor-cores NUM
每个executor使用的内核数,默认为1,仅限于Spark on Yarn模式
--queue QUEUE_NAME
提交应用程序给哪个YARN的队列,默认是default队列,仅限于Spark on Yarn模式
--num-executors NUM
启动的executor数量,默认是2个,仅限于Spark on Yarn模式
--archives ARCHIVES
仅限于Spark on Yarn模式
另外,在执行spark-submit.sh工具进行提交应用之前,可以使用如下方式提前定义好当前Spark Application所使用的 CPU Core 数目和内存大小:
SPARK_JAVA_OPTS=&-Dspark.cores.max=2 -Dspark.executor.memory=600m& \
./bin/spark-submit --master spark://update113:7077 \
--class org.apache.spark.examples.SparkPi \
5 Spark HistoryServer
类似于Mapreduce的JobHistoryServer,Spark也有一个服务可以保存历史Application的运行记录。
修改 $SPARK_HOME/conf 下的 spark-defaults.conf文件 (注意, 修改后的配置文件在每个节点都要有 ),其中可修改的配置属性为:
spark.history.updateInterval
以秒为单位,更新日志相关信息的时间间隔
spark.history.retainedApplications
保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除
spark.history.ui.port
HistoryServer的web端口
&spark.history.kerberos.enabled
是否使用kerberos方式登录访问HistoryServer,对于持久层位于安全集群的HDFS上是有用的,如果设置为true,就要配置下面的两个属性
&spark.history.kerberos.principal
用于HistoryServer的kerberos主体名称
spark.history.kerberos.keytab
用于HistoryServer的kerberos keytab文件位置
spark.history.ui.acls.enable
授权用户查看应用程序信息的时候是否检查acl。如果启用,只有应用程序所有者和spark.ui.view.acls指定的用户可以查看应用程序信息;否则,不做任何检查
spark.eventLog.enabled
是否记录Spark事件
spark.eventLog.dir
保存日志相关信息的路径,可以是hdfs://开头的HDFS路径,也可以是file://开头的本地路径,都需要提前创建
spark.yarn.historyServer.address
Server端的URL:Ip:port 或者host:port
此处的设置如下:
spark.eventLog.enabled
spark.eventLog.dir
hdfs://yh/user/hadoop/sparklogs
spark.yarn.historyServer.address
update113:18080
设置完文件之后,进入 sbin 目录启动服务:
运行完成的Application历史记录可以通过访问上面指定的HistoryServer地址查看,这里是
无论运行时是本地模式,还是 yarn-client、yarn-cluster ,运行记录均可在此页面查看。
并且程序运行时的环境变量、系统参数、各个阶段的耗时均可在此查看, 很强大 !
6 Spark可配置参数
Spark参数的配置可通过三种方式:
SparkConf方式& & &命令行参数方式& & 文件配置方式
6.1 应用属性
&spark.app.name
应用程序名称
spark.master
要连接的Spark集群Master的URL
spark.executor.memory
每个executor使用的内存大小
spark.serializer
org.apache.spark
.serializer.JavaSerializer
序列化方式,官方建议使用org.apache.spark.serializer.KryoSerializer,当然也可以任意是定义为org.apache.spark.Serializer子类的序化器
spark.kryo.registrator
如果要使用 Kryo序化器,需要创建一个继承KryoRegistrator的类并设置系统属性spark.kryo.registrator指向该类
&spark.local.dir
用于保存map输出文件或者转储RDD。可以多个目录,之间以逗号分隔。在Spark 1.0 及更高版本此属性会被环境变量&SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替
spark.logConf
SparkContext 启动时是否记录有效 SparkConf信息
6.2 运行环境变量
spark.executor.extraJavaOptions
传递给executor的额外JVM 选项,但是不能使用它来设置Spark属性或堆空间大小
spark.executor.extraClassPath
追加到executor类路径中的附加类路径
spark.executor.extraLibraryPath
启动executor JVM 时要用到的特殊库路径
spark.files.userClassPathFirst
executor在加载类的时候是否优先使用用户自定义的JAR包,而不是Spark带有的JAR包,目前,该属性只是一项试验功能
6.3 Shuffle操作相关属性
spark.shuffle.consolidateFiles
如果为true,在shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来说,合并文件可以提高文件系统性能,如果使用的是ext4 或 xfs 文件系统,建议设置为true;对于ext3,由于文件系统的限制,设置为true反而会使内核&8的机器降低性能
&spark.shuffle.spill
如果为true,在shuffle期间通过溢出数据到磁盘来降低了内存使用总量,溢出阈值是由spark.shuffle.memoryFraction指定的
spark.press
是否压缩在shuffle期间溢出的数据,如果压缩将使用pression.codec。
是否压缩map输出文件,压缩将使用pression.codec。
spark.shuffle.file.buffer.kb
每个shuffle的文件输出流内存缓冲区的大小,以KB为单位。这些缓冲区可以减少磁盘寻道的次数,也减少创建shuffle中间文件时的系统调用
spark.reducer.maxMbInFlight
每个reduce任务同时获取map输出的最大大小 (以兆字节为单位)。由于每个map输出都需要一个缓冲区来接收它,这代表着每个 reduce 任务有固定的内存开销,所以要设置小点,除非有很大内存
6.4 SparkUI相关属性
spark.ui.port
应用程序webUI的端口
spark.ui.retainedStages
在GC之前保留的stage数量
&spark.ui.killEnabled
允许在webUI将stage和相应的job杀死
&spark.eventLog.enabled
是否记录Spark事件,用于应用程序在完成后重构webUI
是否压缩记录Spark事件,前提spark.eventLog.enabled为true
spark.eventLog.dir
file:///tmp/spark-events
如果spark.eventLog.enabled为 true,该属性为记录spark事件的根目录。在此根目录中,Spark为每个应用程序创建分目录,并将应用程序的事件记录到在此目录中。可以将此属性设置为HDFS目录,以便history server读取历史记录文件
6.5 压缩和序列化相关属性
是否在发送之前压缩广播变量
是否压缩RDD分区
pression.codec
org.apache.spark.io.
LZFCompressionCodec
用于压缩内部数据如 RDD分区和shuffle输出的编码解码器, org.apache.spark.io.LZFCompressionCodec和org.apache.spark.io.SnappyCompressionCodec。其中,Snappy提供更快速的压缩和解压缩,而LZF提供了更好的压缩比
&pression.snappy
.block.size
使用Snappy编码解码器时,编码解码器使用的块大小 (以字节为单位)
&spark.closure.serializer
org.apache.spark.serializer.
JavaSerializer
用于闭包的序化器,目前只有支持Java序化器
spark.serializer. objectStreamReset
org.apache.spark.serializer.JavaSerializer序列化时,会缓存对象以防止写入冗余数据,此时会停止这些对象的垃圾收集。通过调用重置序化器,刷新该信息就可以收集旧对象。若要关闭这重定期重置功能将其设置为& = 0&。默认情况下每10000个对象将重置序化器
spark.kryo.referenceTracking
当使用Kryo序化数据时,是否跟踪对同一对象的引用。如果你的对象图有回路或者同一对象有多个副本,有必要设置为true;其他情况下可以禁用以提高性能
&spark.kryoserializer.buffer.mb
在Kryo 里允许的最大对象大小(Kryo会创建一个缓冲区,至少和序化的最大单个对象一样大)。每个worker的每个core只有一个缓冲区
6.6 执行时相关属性
spark.default.parallelism
本地模式:机器核数
其他:max(executor的core,2)
如果用户不设置,系统使用集群中运行shuffle操作的默认任务数(groupByKey、 reduceByKey等)
spark.broadcast.factory
org.apache.spark.broadcast.
HttpBroadcastFactory
广播的实现类
spark.broadcast.blockSize
TorrentBroadcastFactory块大小(以kb为单位)。过大会降低广播速度;过小会使印象BlockManager性能
spark.files.overwrite
通过 SparkContext.addFile() 添加的文件在目标中已经存在并且内容不匹配时,是否覆盖目标文件
spark.files.fetchTimeout
在获取由driver通过SparkContext.addFile()&添加的文件时,是否使用通信时间超时
spark.storage.memoryFraction
Java堆用于cache的比例
&spark.tachyonStore.baseDir
System.getProperty(&java.io.tmpdir&)
用于存储RDD的techyon目录,tachyon文件系统的URL由spark.tachyonStore.url设置,也可以是逗号分隔的多个techyon目录
spark.storage.
memoryMapThreshold
以字节为单位的块大小,用于磁盘读取一个块大小时进行内存映射。这可以防止Spark在内存映射时使用很小块,一般情况下,对块进行内存映射的开销接近或低于操作系统的页大小
spark.tachyonStore.url
tachyon://localhost:19998
基于techyon文件的URL
spark.cleaner.ttl
spark记录任何元数据(stages生成、task生成等)的持续时间。定期清理可以确保将超期的元数据丢弃,这在运行长时间任务是很有用的,如运行7*24的sparkstreaming任务。RDD持久化在内存中的超期数据也会被清理
6.7 网络相关属性
spark.driver.host
运行driver的主机名或 IP 地址
spark.driver.port
driver侦听的端口
spark.akka.frameSize
以MB为单位的driver和executor之间通信信息的大小,设置值越大,driver可以接受更大的计算结果
spark.akka.threads
用于通信的actor线程数,在大型集群中拥有更多CPU内核的driver可以增加actor线程数
spark.akka.timeout
以秒为单位的Spark节点之间超时时间
spark.akka.heartbeat.pauses
下面3个参数是用于设置Akka自带的故障探测器。启用的话,以秒为单位设置如下这三个参数,有助于对恶意的executor的定位,而对于由于GC暂停或网络滞后引起的情况下,不需要开启故障探测器;另外故障探测器的开启会导致由于心跳信息的频繁交换而引起的网络泛滥。 本参数是设置可接受的心跳停顿时间
spark.akka.failure-detector.threshold
对应Akka的akka.remote.transport-failure-detector.threshold
spark.akka.heartbeat.interval&
心跳间隔时间
6.8 调度相关属性
spark.task.cpus
为每个任务分配的内核数
spark.task.maxFailures
Task的最大重试次数
spark.scheduler.mode
Spark的任务调度模式,还有一种Fair模式
spark.cores.max
当应用程序运行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内核总数(不是指每台机器,而是整个集群)。如果不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值,而Mesos将使用集群中可用的内核
spark.mesos.coarse
如果设置为true,在Mesos集群中运行时使用粗粒度共享模式
&spark.speculation
以下几个参数是关于Spark推测执行机制的相关参数。此参数设定是否使用推测执行机制,如果设置为true则spark使用推测执行机制,对于Stage中拖后腿的Task在其他节点中重新启动,并将最先完成的Task的计算结果最为最终结果
spark.speculation.interval&
Spark多长时间进行检查task运行状态用以推测,以毫秒为单位
&spark.speculation.quantile
推测启动前,Stage必须要完成总Task的百分比
spark.speculation.multiplier
比已完成Task的运行速度中位数慢多少倍才启用推测
&spark.locality.wait
以下几个参数是关于Spark数据本地性的。本参数是以毫秒为单位启动本地数据task的等待时间,如果超出就启动下一本地优先级别的task。该设置同样可以应用到各优先级别的本地性之间(本地进程 -& 本地节点 -& 本地机架 -& 任意节点 ),当然,也可以通过spark.locality.wait.node等参数设置不同优先级别的本地性
&spark.locality.wait.process
spark.locality.wait
本地进程级别的本地等待时间
spark.locality.wait.node
spark.locality.wait
本地节点级别的本地等待时间
spark.locality.wait.rack
spark.locality.wait
本地机架级别的本地等待时间
spark.scheduler.revive.interval
复活重新获取资源的Task的最长时间间隔(毫秒),发生在Task因为本地资源不足而将资源分配给其他Task运行后进入等待时间,如果这个等待时间内重新获取足够的资源就继续计算
6.9 安全相关属性
spark.authenticate
是否启用内部身份验证
spark.authenticate.secret
设置组件之间进行身份验证的密钥。如果不是YARN上运行并且spark.authenticate为true时,需要设置密钥
spark.core.connection. auth.wait.timeout
进行身份认证的超时时间
spark.ui.filters
Spark web UI 要使用的以逗号分隔的筛选器名称列表。筛选器要符合javax servlet Filter标准,每个筛选器的参数可以通过设置java系统属性来指定: spark.&class name of filter&.params='param1=value1,param2=value2' 例如: -Dspark.ui.filters=com.test.filter1 -.test.filter1.params='param1=foo,param2=testing'
&spark.ui.acls.enable
Spark webUI存取权限是否启用。如果启用,在用户浏览web界面的时候会检查用户是否有访问权限
spark.ui.view.acls
以逗号分隔Spark webUI访问用户的列表。默认情况下只有启动Spark job的用户才有访问权限
6.10 SparkStreaming相关属性
spark.streaming.blockInterval
Spark Streaming接收器将接收数据合并成数据块并存储在Spark里的时间间隔,毫秒
spark.streaming.unpersist
如果设置为true,强迫将SparkStreaming持久化的RDD数据从Spark内存中清理,同样的,SparkStreaming接收的原始输入数据也会自动被清理;如果设置为false,则允许原始输入数据和持久化的RDD数据可被外部的Streaming应用程序访问,因为这些数据不会自动清理
6.11 Standalone模式特有属性
可以在文件 conf/spark-env.sh 中来设置此模式的特有相关属性:
(1) SPARK_MASTER_OPTS :配置master使用的属性
(2) SPARK_WORKER_OPTS :配置worker使用的属性
(3) SPARK_DAEMON_JAVA_OPTS :配置master和work都使用的属性
配置的时候,使用类似的语句:
export SPARK_MASTER_OPTS=&-Dx1=y1 -Dx2=y2&
其中x代表属性,y代表属性值。
SPARK_MASTER_OPTS
所支持的属性有:
spark.deploy.spreadOut
Standalone集群管理器是否自由选择节点还是固定到尽可能少的节点,前者会有更好的数据本地性,后者对于计算密集型工作负载更有效
spark.worker.timeout
master因为没有收到心跳信息而认为worker丢失的时间(秒)
spark.deploy.defaultCores
如果没有设置spark.cores.max,该参数设置Standalone集群分配给应用程序的最大内核数,如果不设置,应用程序获取所有的有效内核。注意在一个共享的集群中,设置一个低值防止攫取了所有的内核,影响他人的使用
SPARK_WORKER_OPTS
所支持的属性有
spark.worker.cleanup.enabled
是否定期清理worker的应用程序工作目录,只适用于Standalone模式,清理的时候将无视应用程序是否在运行
&spark.worker.cleanup.interval
清理worker本地过期的应用程序工作目录的时间间隔(秒)
spark.worker.cleanup.appDataTtl&
worker保留应用程序工作目录的有效时间。该时间由磁盘空间、应用程序日志、应用程序的jar包以及应用程序的提交频率来设定
SPARK_DAEMON_JAVA_OPTS
所支持的属性有:
spark.deploy.recoveryMode
下面3个参数是用于配置zookeeper模式的master HA。设置为ZOOKEEPER表示启用master备用恢复模式,默认为NONE
spark.deploy.zookeeper.url
zookeeper集群URL
&spark.deploy.zookeeper.dir
zooKeeper保存恢复状态的目录,缺省为/spark
spark.deploy.recoveryMode
设成FILESYSTEM启用master单节点恢复模式,缺省值为NONE
spark.deploy.recoveryDirectory
Spark保存恢复状态的目录
6.12 Spark on Yarn特有属性
spark.yarn.applicationMaster.waitTries
RM等待Spark AppMaster启动重试次数,也就是SparkContext初始化次数。超过这个数值,启动失败
spark.yarn.submit.file.replication
应用程序上传到HDFS的文件的副本数
spark.yarn.preserve.staging.files
若为true,在job结束后,将stage相关的文件保留而不是删除
spark.yarn.scheduler.heartbeat.interval-ms
Spark AppMaster发送心跳信息给YARN RM的时间间隔
spark.yarn.max.executor.failures
2倍于executor数
导致应用程序宣告失败的最大executor失败次数
spark.yarn.historyServer.address
Spark history server的地址(不要加http://)。这个地址会在Spark应用程序完成后提交给YARN RM,然后RM将信息从RM UI写到history server UI上。
7 示例配置
主要的配置文件均位于 $SPARK_HOME/conf 中,包括 slave、spark-env.sh、spark-defaults.conf 文件等。
7.1 slave文件
192.168.1.112
192.168.1.113
192.168.1.114
7.2 spark-env.sh文件
export JAVA_HOME=&/export/servers/jdk1.6.0_25&
export HADOOP_HOME=/home/hadoop/hadoop-2.0.0-cdh4.5.0
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
SPARK_EXECUTOR_INSTANCES=2
SPARK_EXECUTOR_CORES=1
SPARK_EXECUTOR_MEMORY=400M
SPARK_DRIVER_MEMORY=400M
SPARK_YARN_APP_NAME=&Spark 1.0.0&
SPARK_MASTER_WEBUI_PORT=8090
SPARK_WORKER_MEMORY=400M
SPARK_WORKER_CORES=1
SPARK_WORKER_INSTANCES=2
#Master HA
export SPARK_DAEMON_JAVA_OPTS=&-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:.1.118:.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark&
7.3 spark-defaults.conf文件
#history server
spark.eventLog.enabled
spark.eventLog.dir
hdfs://namespace/user/hadoop/sparklogs
spark.yarn.historyServer.address
spark113:18080
spark.shuffle.consolidateFiles true
spark.task.cpus 1
spark.task.maxFailures 3
#scheduler type
spark.scheduler.mode FAIR
park.authenticate true
spark.authenticate.secret hadoop
spark.core.connection.auth.wait.timeout 1500
spark.ui.acls.enable true
spark.ui.view.acls root,hadoop
#each executor used max memory
spark.executor.memory 400m
#spark on yarn
spark.yarn.applicationMaster.waitTries 5
spark.yarn.submit.file.replication 3
spark.yarn.preserve.staging.files false
spark.yarn.scheduler.heartbeat.interval-ms 5000
#park standalone and on mesos
spark.cores.max 4
8 Spark SQL
Spark支持Scala、Python等语言写的脚本直接在Spark环境执行,更重要的是支持对Hive语句进行包装后在Spark上运行。这就是Spark SQL。
8.1 相关配置
配置的步骤比较简单,把Hive的配置文件 hive-site.xml 直接放置到 $SPARK_HOME的conf路径 下即可。如果是想在Spark集群本地执行SQL的话,每个对应的节点都要做同样的配置。
8.2 运行SQL
启动bin目录下的 spark-shell 脚本,依次执行如下语句:
val sc: SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
hql(&CREATE TABLE IF NOT EXISTS src (key INT, value STRING)&)
hql(&LOAD DATA LOCAL INPATH '/examples /data.txt' INTO TABLE src&)
hql(&FROM src SELECT key, value&).collect().foreach(println)
上面的命令,分别是声明SparkContext对象,利用hql方法执行Hive的SQL语句,在执行SQL语句的过程中,可以通过Hive的Cli客户端进行查看相应操作的结果。
8.3 on yarn模式
由于spark-shell脚本是在本地执行的,如果想放到Yarn上去执行的话,可以使用上面第4节中的spark-submit工具,这时候需要对需要输入的sql语句进行包装,将包装类打包成jar文件,再提交。
包装类的代码如下:
3 import java.util.L
5 import org.apache.spark.SparkC
6 import org.apache.spark.api.java.JavaSparkC
7 import org.apache.spark.sql.api.java.R
8 import org.apache.spark.sql.hive.api.java.JavaHiveC
* Description:
15 public class SparkSQL {
public static void main(String[] args) {
if(args.length != 2){
System.out.println(&usage: &applicationName& &sql statments&&);
System.exit(1);
String applicationName = args[0];
String sql = args[1];
SparkConf conf = new SparkConf().setAppName(applicationName);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaHiveContext hiveContext = new JavaHiveContext(sc);
List&Row& results = hiveContext.hql(sql).collect();
System.out.println(&Sql is:& + sql + &, has been executed over.&);
System.out.println(&The result size is & + results.size() + &, they are:&);
for(int i=0; i&results.size(); i++){
System.out.println(results.get(i).toString());
System.out.println(&Execute over ...&);
sc.stop();
System.out.println(&Stop over ...&);
将其打包成jar文件 spark-0.0.1-SNAPSHOT.jar ,再使用 spark-submit 工具进行任务的提交,命令如下:
./spark-submit \
--class spark.SparkSQL \
--master yarn-cluster \
--num-executors 3 \
--driver-memory 400m --executor-memory 400m --executor-cores 1 \
--jars /home/hadoop/spark-1.0.0/examples/libs/spark-core_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/examples/libs/spark-hive_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-core-3.2.2.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-rdbms-3.2.1.jar,/home/hadoop/hive-0.12.0/lib/mysql-connector-java-5.1.27-bin.jar
--files /home/hadoop/spark-1.0.0/conf/hive-site.xml \
/home/hadoop/spark-1.0.0/examples/libs/spark-0.0.1-SNAPSHOT.jar &hiveTest& &CREATE TABLE IF NOT EXISTS test4 (key INT, value STRING)&
其中, --master 参数指定的是yarn-cluster模式,当然也可以使用yarn-client模式,至于区别,已经在上文说了;--class指定的是我们包装类的主类,见上文源码; --jars 是依赖的四个jar包; --files 是指定的hive-site.xml配置文件,提交到Yarn中的Application在执行的时候,需要把此配置文件分发到每个Executor上;最后的两个参数, 一个是Application的名称,一个是运行的SQL语句 。
运行结束后,可以到 Spark HistoryServer 中查看运行结果。
&-------------------------------------------------------------------------------
如果您看了本篇博客,觉得对您有所收获,请点击右下角的& [推荐]
如果您想转载本博客, 请注明出处
如果您对本文有意见或者建议,欢迎留言
感谢您的阅读,请关注我的后续博客
已发表评论数()
&&登&&&陆&&
已收藏到推刊!
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见

我要回帖

更多关于 hadoop example.jar 的文章

 

随机推荐