如何让hive支持多hive sql 字符串分割分割列

没有更多推荐了,
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!  近期在做某个大型银行的大数据项目,当在处理非结构化数据时,却发现他们给的数据并不符合hive和pig的处理要求,数据每行必须需要多个分割符才能完美处理,一下午也没有想到完美的办法解决,今天重新审视了一下整个过程。看来hive的命令行没法搞定了。于是乎,只能通过代码来搞定。
1、重新实现hive的InputFormat了,别急放码过来
package hiveS
import java.io.IOE
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapred.FileS
import org.apache.hadoop.mapred.InputS
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapred.RecordR
import org.apache.hadoop.mapred.R
import org.apache.hadoop.mapred.TextInputF
public class MyHiveInputFormat
extends TextInputFormat implements JobConfigurable{
public RecordReader&LongWritable, Text& getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new MyRecordReader((FileSplit) genericSplit, job);
2、仔细看看下面的方法,不解释,自己领悟。
package hiveS
import java.io.IOE
import java.io.InputS
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.io.compress.CompressionC
import org.apache.hadoop.io.compress.CompressionCodecF
import org.apache.hadoop.mapred.FileS
import org.apache.hadoop.mapred.RecordR
import org.apache.hadoop.util.LineR
public class MyRecordReader implements RecordReader&LongWritable, Text&{
private CompressionCodecFactory compressionCodecs =
private LineReader lineR
int maxLineL
public MyRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength) {
this.maxLineLength = maxLineL
this.start =
this.lineReader = new LineReader(in);
this.pos =
this.end = endO
public MyRecordReader(InputStream in, long offset, long endOffset,
Configuration job) throws IOException {
this.maxLineLength = job.getInt(
"mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
this.lineReader = new LineReader(in, job);
this.start =
this.end = endO
// 构造方法
public MyRecordReader(FileSplit inputSplit, Configuration job)
throws IOException {
maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
Integer.MAX_VALUE);
start = inputSplit.getStart();
end = start + inputSplit.getLength();
final Path file = inputSplit.getPath();
// 创建压缩器
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// 打开文件系统
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(file);
boolean skipFirstLine =
if (codec != null) {
lineReader = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
if (start != 0) {
skipFirstLine =
fileIn.seek(start);
lineReader = new LineReader(fileIn, job);
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
this.pos =
public void close() throws IOException {
if (lineReader != null)
lineReader.close();
public LongWritable createKey() {
return new LongWritable();
public Text createValue() {
return new Text();
public long getPos() throws IOException {
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
return Math.min(1.0f, (pos - start) / (float) (end - start));
public boolean next(LongWritable key, Text value) throws IOException {
while (pos & end) {
key.set(pos);
int newSize = lineReader.readLine(value, maxLineLength,
Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
maxLineLength));
// 把字符串中的"##"转变为"#"
String strReplace = value.toString().replaceAll("\\s+", "\001");
Text txtReplace = new Text();
txtReplace.set(strReplace);
value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
if (newSize == 0)
pos += newS
if (newSize & maxLineLength)
3、处理实例:如下
数据处理要求:
fewf fewfe
注意:字段之间的空格不一致
create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
注意:输出咱可没有重写哦
2、加载数据:
LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_
3、看看的成果:
select * from micmiu_
自己去试试,不解释
阅读(...) 评论()查看: 23389|回复: 1
hive使用经验总结:hive 的分隔符、orderby sort by distribute by的优化
主题听众收听
1.Hive分号字符是如何处理的?
2.在使用的过程中,hive的分号会遇到什么问题,该如何解决?
3.UNION ALL在hive中的作用是什么?
4.插入数据时, 字段的初始值与表定义中类型如何保持一致?
5.orderby sort by有什么不一样?
一、Hive 分号字符
分号是SQL语句结束标记,在HiveQL中也是,但是在HiveQL中,对分号的识别没有那么智慧,例如:
select concat(cookie_id,concat(';',’zoo’)) fromc02_clickstat_fatdt1 limit 2;
FAILED: Parse Error: line 0:-1 cannot recognize input'&EOF&' in function specification复制代码
可以推断,Hive解析语句的时候,只要遇到分号就认为语句结束,而无论是否用引号包含起来。
解决的办法是,使用分号的八进制的ASCII码进行转义,那么上述语句应写成:
select concat(cookie_id,concat('\073','zoo')) fromc02_clickstat_fatdt1 limit 2;
为什么是八进制ASCII码?
我尝试用十六进制的ASCII码,但Hive会将其视为字符串处理并未转义,好像仅支持八进制,原因不详。这个规则也适用于其他非SELECT语句,如CREATE TABLE中需要定义分隔符,那么对不可见字符做分隔符就需要用八进制的ASCII码来转义。
二、insert 新增数据
根据语法Insert必须加“OVERWRITE”关键字,也就是说每一次插入都是一次重写。那如何实现表中新增数据呢?
假设Hive中有表manbu,
hive& DESCRIBE
id int
value int复制代码
hive& SELECT * FROM
3 4
1 2
2 3复制代码
现增加一条记录:
hive& INSERT OVERWRITE TABLE manbu
SELECT id, value FROM (
SELECT id, value FROM manbu
UNION ALL
SELECT 4 AS id, 5 AS value FROM manbu limit 1
)复制代码
结果是:
hive&SELECT * FROM p1;
3 4
4 5
2 3
1 2复制代码
其中的关键在于, 关键字UNION ALL的应用, 即将原有数据集和新增数据集进行结合, 然后重写表.
三、初始值
INSERT OVERWRITE TABLE在插入数据时, 后面的字段的初始值应注意与表定义中的一致性. 例如, 当为一个STRING类型字段初始为NULL时:
NULL AS field_name // 这可能会被提示定义类型为STRING, 但这里是void
CAST(NULL AS STRING) AS field_name // 这样是正确的
又如, 为一个BIGINT类型的字段初始为0时:
CAST(0 AS BIGINT) AS field_name
四、orderby&&sort by&&distribute by的优化
Hive的排序关键字是SORT BY,它有意区别于传统数据库的ORDER BY也是为了强调两者的区别–SORT BY只能在单机范围内排序。
set mapred.reduce.tasks=2;(设置reduce的数量为2 )
selectcookie_id,page_id,id from c02_clickstat_fatdt1
where cookie_idIN('1.193.131.218.3.0','1.193.148.164.9.2')复制代码
1.193.148.164.9.2&&082403& && &
1.193.148.164.9.2&&660473& && &
1.193.148.164.9.2& &725326& && &
1.193.131.218.3.0&&01c183da6e4bc61053& && &
1.193.131.218.3.0&&01c183da6e4bc74174& && &
1.193.131.218.3.0&&01c183da6e4bc67988& && &
1.193.131.218.3.0&&01c183da6e4bc39999& && &
1.193.131.218.3.0&&01c183da6e4bc61053& && &
selectcookie_id,page_id,id from c02_clickstat_fatdt1 where
cookie_idIN('1.193.131.218.3.0','1.193.148.164.9.2')
SORT BYCOOKIE_ID,PAGE_ID;复制代码
SORT排序后的值
1.193.131.218.3.0& && && &&&& && & 01c183da6e4bc74174& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc61053& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc61053& && &
1.193.148.164.9.2& && && &&&& && &082403& && &
1.193.148.164.9.2& && && &&&& && &725326& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc39999& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc67988& && &
1.193.148.164.9.2& && && &&&& && & 660473& && &
selectcookie_id,page_id,id from c02_clickstat_fatdt1
where cookie_idIN('1.193.131.218.3.0','1.193.148.164.9.2')
ORDER BYPAGE_ID,COOKIE_ID;复制代码
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc74174& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc39999& && &
1.193.131.218.3.0& && && &&&& && & 01c183da6e4bc67988& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc61053& && &
1.193.131.218.3.0& && && &&&& && & 01c183da6e4bc61053& && &
1.193.148.164.9.2& && && &&&& && &082403& && &
1.193.148.164.9.2& && && &&&& && &725326& && &
1.193.148.164.9.2& && && &&&& && &660473& && &
可以看到SORT和ORDER排序出来的值不一样。一开始我指定了2个reduce进行数据分发(各自进行排序)。结果不一样的主要原因是上述查询没有reduce key,hive会生成随机数作为reduce key。这样的话输入记录也随机地被分发到不同reducer机器上去了。为了保证reducer之间没有重复的cookie_id记录,可以使用DISTRIBUTE BY关键字指定分发key为cookie_id。
selectcookie_id,country,id,page_id,id from c02_clickstat_fatdt1 where cookie_idIN('1.193.131.218.3.0','1.193.148.164.9.2')&&distribute by cookie_id SORT BY COOKIE_ID,page_复制代码
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc74174& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc39999& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc67988& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc61053& && &
1.193.131.218.3.0& && && &&&& && &01c183da6e4bc61053& && &
1.193.148.164.9.2& && && &&&& && &082403& && &
1.193.148.164.9.2& && && &&&& && & 725326& && &
1.193.148.164.9.2& && && &&&& && &660473& && &
例二:
CREATETABLE if not exists t_order(
id int,-- 编号
sale_idint, -- SID
customer_idint, -- CID
product_id int, -- PID
amountint -- 数量
)PARTITIONED BY (ds STRING);复制代码
在表中查询所有记录,并按照PID和数量排序:
setmapred.reduce.tasks=2;
Selectsale_id, amount from t_order
Sort bysale_id,复制代码
这一查询可能得到非期望的排序。指定的2个reducer分发到的数据可能是(各自排序):
Reducer1:
Sale_id |amount
0 | 100
1 | 30
1 | 50
2 | 20复制代码
Reducer2:
Sale_id |amount
0 |110
0 | 120
3 | 50
4 | 20复制代码
使用DISTRIBUTE BY关键字指定分发key为sale_id。改造后的HQL如下:
setmapred.reduce.tasks=2;
Selectsale_id, amount from t_order
Distributeby sale_id
Sort bysale_id,复制代码
这样能够保证查询的销售记录集合中,销售ID对应的数量是正确排序的,但是销售ID不能正确排序,原因是hive使用hadoop默认的HashPartitioner分发数据。
这就涉及到一个全排序的问题。解决的办法无外乎两种:
1.) 不分发数据,使用单个reducer:
setmapred.reduce.tasks=1;
这一方法的缺陷在于reduce端成为了性能瓶颈,而且在数据量大的情况下一般都无法得到结果。但是实践中这仍然是最常用的方法,原因是通常排序的查询是为了得到排名靠前的若干结果,因此可以用limit子句大大减少数据量。使用limit n后,传输到reduce端(单机)的数据记录数就减少到n* (map个数)。
2.) 修改Partitioner,这种方法可以做到全排序。
这里可以使用Hadoop自带的TotalOrderPartitioner(来自于Yahoo!的TeraSort项目),这是一个为了支持跨reducer分发有序数据开发的Partitioner,它需要一个SequenceFile格式的文件指定分发的数据区间。如果我们已经生成了这一文件(存储在/tmp/range_key_list,分成100个reducer),可以将上述查询改写为
setmapred.reduce.tasks=100;
sethive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderP
settotal.order.partitioner.path=/tmp/ range_key_
Selectsale_id, amount from t_order
Clusterby sale_id
S复制代码
有很多种方法生成这一区间文件(例如hadoop自带的o.a.h.mapreduce.lib.partition.InputSampler工具)。这里介绍用Hive生成的方法,例如有一个按id有序的t_sale表:
CREATETABLE if not exists t_sale (
id int,
namestring,
locstring
);复制代码
则生成按sale_id分发的区间文件的方法是:
createexternal table range_keys(sale_id int)
rowformat serde
'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
stored as
inputformat
'org.apache.hadoop.mapred.TextInputFormat'
outputformat
'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
location'/tmp/range_key_list';
insertoverwrite table range_keys
selectdistinct sale_id
fromsource t_sale sampletable(BUCKET 100 OUT OF 100 ON rand()) s
sort bysale_复制代码
生成的文件(/tmp/range_key_list目录下)可以让TotalOrderPartitioner按sale_id有序地分发reduce处理的数据。
区间文件需要考虑的主要问题是数据分发的均衡性,这有赖于对数据深入的理解。
本帖被以下淘专辑推荐:
& |主题: 102, 订阅: 1
主题听众收听
高级会员, 积分 1335, 距离下一级还需 3665 积分
高级会员, 积分 1335, 距离下一级还需 3665 积分
路过,学习学习
经常参与各类话题的讨论,发帖内容较有主见
经常帮助其他会员答疑
积极宣传本站,为本站带来更多注册会员
积极宣传本站,为本站带来更多的用户访问量
长期对论坛的繁荣而不断努力,或多次提出建设性意见
活跃且尽责职守的版主
为论坛做出突出贡献的会员
站长推荐 /4
会员注册不成功的原因
新手获取积分方法
hadoop3.0学习:零基础安装部署hadoop集群
about云课程:大数据日志实时分析
Powered by
& 2018 Designed by拒绝访问 | www.codesec.net | 百度云加速
请打开cookies.
此网站 (www.codesec.net) 的管理员禁止了您的访问。原因是您的访问包含了非浏览器特征(b43a7-ua98).
重新安装浏览器,或使用别的浏览器博客分类:
Hive建表的时候虽然可以指定字段分隔符,不过用insert overwrite local directory这种方式导出文件时,字段的分割符会被默认置为\001,一般都需要将字段分隔符转换为其它字符,可以使用下面的命令:
sed -e 's/\x01/|/g' file
可以将|替换成自己需要的分隔符,file为hive导出的文件。
https://cwiki.apache.org/Hive/languagemanual-ddl.html
浏览: 3269 次
来自: 空岛
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 hive 分割字符串 的文章

 

随机推荐