如何避免hadoop streaming.jar 自动给单行数据加tab

superpopb2b 的BLOG
用户名:superpopb2b
文章数:72
访问量:65472
注册日期:
阅读量:5863
阅读量:12276
阅读量:418943
阅读量:1107078
51CTO推荐博文
& & 单位有一组业务一直都是使用Streaming压缩文本日志,大体上就是设置作业输出为BZ2格式,怎么输入就怎么输出,没有任何处理功能在里面。但是每行结尾都多出来一个TAB。终于,有一个业务需要使用TAB前的最后一个字段,不去掉不行了。& & 虽然是个小问题,但是网上搜了一圈,也没有很好的解决。很多人都遇到了,但是单位的业务比较特殊,只有map没有reduce。/questions//hadoop-streaming-api-how-to-remove-unwanted-delimiters这个上面直接说“As I discussed with friends, there's no easy way to achieve the goal,...”。& & Streaming有个特点,默认是按照TAB去区分Key和Value。如果没有设置Key字段的数目,默认一行里面第一个TAB之前的做Key,后面的是Value。如果没有找到Tab,就全都是Key字段,Value是空。之所以后面会多出个Tab,正是Key和Value之间的那个Tab。& & 首先是考察Streaming的Map,在PipeMapper.java。InputWriter处理输出,所以尝试实现自定义输出。在MapReduce作业配置里面,stream.map.input.writer.class负责指定InputWriter是哪一个,默认是TextInputWriter。Streaming在这里比较坑,增加-Dstream.map.input.writer.class=XXX的选项并不能令Streaming使用自定义的实现类,必须实现自己的IdentifierResolver,然后在其中对不同类型的输入设定不同类型的InputWriter,而其中的输入类型,必须由stream.map.input选项传入。是否设置成功以作业运行时候JobTracker的配置参数表为准。& & 不巧的是,使用自定义的InputWriter代替TextInputWriter,行尾的Tab是没了,行首又多了个数字。估计是Hadoop给Mapper传入的Key被打印出来了。oooorz....不能瞎猜了,还是看看代码吧。& & 好在代码蛮短的还是。& & Streaming会把本身、以及用户-file -cacheFile -cacheArchive&等选项指定的文件,打成一个Jar包提交到集群进行MR作业。把集群的输出,作为用户实现Mapper的输入;读取用户实现Mapper的输出,作为整个Map作业的输出。Input/Output相对于用户自定义作业,Writer/Reader体现为Streaming的行为,因此是InputWriter和OutputReader。简单来讲,Hadoop给出的(K,V)---streaming---&&用户自定义Mapper&---streaming---&Hadoop的Mapper输出& &Streaming由PipeMapRunner启动作业,异步收集用户作业输出,进而向Hadoop汇报作业进度。整个作业的基础设置、作业提交都是由StreamJob类完成。& & 作业的执行是PipeMapRed/PipeMapper/PipReducer/PipCombiner这几个类。解决方案也就在这里。在MROutputThread的run方法里面,outCollector.collect(key, value);这句之前,加上下面的代码片段即可。&&&&&&&&&&if&(value&instanceof&Text)&{
&&&&&&&&&&&&if&(value.toString().isEmpty())
&&&&&&&&&&&&&&value&=&NullWritable.get();
&&&&&&&&&&}& & 是不是很简单。& & 为什么这样做是可行的?还是源于org.apache.hadoop.mapred.TextOutputFormat。直接上代码。package&org.apache.hadoop.
import&java.io.DataOutputS
import&java.io.IOE
import&java.io.UnsupportedEncodingE
import&org.apache.hadoop.classification.InterfaceA
import&org.apache.hadoop.classification.InterfaceS
import&org.apache.hadoop.fs.FileS
import&org.apache.hadoop.fs.P
import&org.apache.hadoop.fs.FSDataOutputS
import&org.apache.hadoop.io.NullW
import&org.apache.hadoop.io.T
import&org.apache.pressionC
import&org.apache.press.GzipC
import&org.apache.hadoop.util.*;
/**&An&{@link&OutputFormat}&that&writes&plain&text&files.&
@InterfaceAudience.Public
@InterfaceStability.Stable
public&class&TextOutputFormat&K,&V&&extends&FileOutputFormat&K,&V&&{
&&protected&static&class&LineRecordWriter&K,&V&
&&&&implements&RecordWriter&K,&V&&{
&&&&private&static&final&String&utf8&=&"UTF-8";
&&&&private&static&final&byte[]&
&&&&static&{
&&&&&&try&{
&&&&&&&&newline&=&"\n".getBytes(utf8);
&&&&&&}&catch&(UnsupportedEncodingException&uee)&{
&&&&&&&&throw&new&IllegalArgumentException("can't&find&"&+&utf8&+&"&encoding");
&&&&protected&DataOutputStream&
&&&&private&final&byte[]&keyValueS
&&&&public&LineRecordWriter(DataOutputStream&out,&String&keyValueSeparator)&{
&&&&&&this.out&=&
&&&&&&try&{
&&&&&&&&this.keyValueSeparator&=&keyValueSeparator.getBytes(utf8);
&&&&&&}&catch&(UnsupportedEncodingException&uee)&{
&&&&&&&&throw&new&IllegalArgumentException("can't&find&"&+&utf8&+&"&encoding");
&&&&public&LineRecordWriter(DataOutputStream&out)&{
&&&&&&this(out,&"\t");
&&&&&*&Write&the&object&to&the&byte&stream,&handling&Text&as&a&special
&&&&&*&case.
&&&&&*&@param&o&the&object&to&print
&&&&&*&@throws&IOException&if&the&write&throws,&we&pass&it&on
&&&&private&void&writeObject(Object&o)&throws&IOException&{
&&&&&&if&(o&instanceof&Text)&{
&&&&&&&&Text&to&=&(Text)&o;
&&&&&&&&out.write(to.getBytes(),&0,&to.getLength());
&&&&&&}&else&{
&&&&&&&&out.write(o.toString().getBytes(utf8));
&&&&public&synchronized&void&write(K&key,&V&value)
&&&&&&throws&IOException&{
&&&&&&&&&&&&boolean&nullKey&=&key&==&null&||&key&instanceof&NullW
&&&&&&boolean&nullValue&=&value&==&null&||&value&instanceof&NullW
&&&&&&if&(nullKey&&&&nullValue)&{
&&&&&&if&(!nullKey)&{
&&&&&&&&writeObject(key);
&&&&&&if&(!(nullKey&||&nullValue))&{
&&&&&&&&out.write(keyValueSeparator);
&&&&&&if&(!nullValue)&{
&&&&&&&&writeObject(value);
&&&&&&out.write(newline);
&&&&public&synchronized&void&close(Reporter&reporter)&throws&IOException&{
&&&&&&out.close();
&&public&RecordWriter&K,&V&&getRecordWriter(FileSystem&ignored,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&JobConf&job,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&String&name,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&Progressable&progress)
&&&&throws&IOException&{
&&&&boolean&isCompressed&=&getCompressOutput(job);
&&&&String&keyValueSeparator&=&job.get("mapreduce.output.textoutputformat.separator",
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&"\t");
&&&&if&(!isCompressed)&{
&&&&&&Path&file&=&FileOutputFormat.getTaskOutputPath(job,&name);
&&&&&&FileSystem&fs&=&file.getFileSystem(job);
&&&&&&FSDataOutputStream&fileOut&=&fs.create(file,&progress);
&&&&&&return&new&LineRecordWriter&K,&V&(fileOut,&keyValueSeparator);
&&&&}&else&{&
&&&&&&Class&?&extends&CompressionCodec&&codecClass&=
&&&&&&&&getOutputCompressorClass(job,&GzipCodec.class);
&&&&&&//&create&the&named&codec
&&&&&&CompressionCodec&codec&=&ReflectionUtils.newInstance(codecClass,&job);
&&&&&&//&build&the&filename&including&the&extension
&&&&&&Path&file&=&
&&&&&&&&FileOutputFormat.getTaskOutputPath(job,
&&&&&&&&Path&file&=
&&&&&&&&FileOutputFormat.getTaskOutputPath(job,
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&name&+&codec.getDefaultExtension());
&&&&&&FileSystem&fs&=&file.getFileSystem(job);
&&&&&&FSDataOutputStream&fileOut&=&fs.create(file,&progress);
&&&&&&return&new&LineRecordWriter&K,&V&(new&DataOutputStream
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&(codec.createOutputStream(fileOut)),
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&keyValueSeparator);
}& & 注意到LineRecordWriter.write了么?后记:& & A. 网上很多是想办法修改分隔符,把TAB换成空字符。这是一个非常粗暴的做法,基本上就是埋坑!为什么呢?& & 日志文本内容可以是很丰富的,这次出问题是因为每行没有TAB。如果换做含有TAB的文本,把分隔符变为空串,就把日志中原有的TAB去掉了。& & B. 之所以这么搞,也是受到了stackoverflow的这个Q&A的启发。/questions//hadoop-streaming-remove-trailing-tab-from-reducer-output。类似的,Q&A也是采用修改分隔符的办法,是不可取的。但是仔细发现,是可以在自己重写的TextOutputFormat&K,V&里,修改LineRecordWriter.write方法的。& & 重写TextOutputFormat是十分优雅的解决,看似修改了Hadoop本身的东西,但是在Streaming最新版没有加入这个fix之前,防止对每个版本的Streaming都要变更、重新编译打包。另外,Streaming不是独立的项目,编译它需要同时编译Hadoop!& & 加上下面这段&&&&if&(!nullValue)&{
&&&&&&if&(value&instanceof&Text)&{
&&&&&&&&if&(value.toString().isEmpty())
&&&&&&&&&&nullValue&=&
&&&&&&}&&&
&&&&}& & C. 虽然是修改了Streaming代码,但是不需要考虑会影响同一机器所有用户的问题,也不用修改$HADOOP_HOME下的Streaming包。streaming提供了这个参数stream.shipped.hadoopstreaming。& & D. 有些设置似乎是指对Reducer生效,对于这种只有Mapper的作业不起作用。比如mapred.textoutputformat.ignoreseparator
mapred.textoutputformat.separator& & 设置了,没看到什么效果。& & 再有就是,命令行选项里面如果写-DXXX= \ 这样的语句,似乎也没有把这个参数设置为空串的效果,写-DXXX=&""也是一样。本文出自 “” 博客,请务必保留此出处
了这篇文章
类别:┆阅读(0)┆评论(0)2014年12月 高性能开发大版内专家分月排行榜第二
2014年12月 高性能开发大版内专家分月排行榜第二
2014年12月 高性能开发大版内专家分月排行榜第二
匿名用户不能发表回复!|
每天回帖即可获得10分可用分!小技巧:
你还可以输入10000个字符
(Ctrl+Enter)
请遵守CSDN,不得违反国家法律法规。
转载文章请注明出自“CSDN(www.csdn.net)”。如是商业用途请联系原作者。hadoop组件streaming使用注意
Categories:
Published on: 2012 年 05 月 22 日
关于管道:
不要在streaming里mapper和reducer中使用管道,会出现”java.io.IOException: Broken pipe”错误。
关于程序文件:
如果使用的是非shell命令或者slave机器中没有的命令,都需要手动添加-file参数来分发程序文件。如果mapper和reducer需要分发两个或者更多个程序文件,则直接使用多个-file参数即可。
关于grep:
hadoop streaming -input /user/hadoop/hadoopfile -output /user/hadoop/result -mapper "grep hello" -jobconf mapre.job.name="grep-test" -jobconf stream.non.zero.exit.is.failure=false -jobconf mapred.reduce.tasks=1
/user/hadoop/hadoopfile : 待处理文件目录
-output /user/hadoop/result
:处理结果存放目录
-mapper "grep hello" :map程序
-jobconf mapre.job.name="grep-test" :任务名
-jobconf stream.non.zero.exit.is.failure=false
: map-reduce程序返回值不做判断;streaming默认的情况下,mapper和reducer的返回值不是0,被认为异常任务,将被再次执行,默认尝试4次都不是0,整个job都将失败。而grep在没有匹配结果时返回1。
-jobconf mapred.reduce.tasks=1 : reduce任务数。 此处也可配置为0,为0配置表示告诉Map/reduce框架不要创建reducer任务
hadoop jar $HADOOP_STREAMING_JAR \
-input /test/ylb/mock_data/cv-pt-demo.txt \
-output /test/ylb/mock_data/output/cv-pt-demo-10 \
-mapper "awk '\$1~/1/ {printf(\"%s\t%s\n\",\$1,\$2)}'" \
-reducer "awk 'BEGIN{pt[\"test_key\"]=0;} {key=\$2/10;pt[key]+=1;} END{ for(k in pt){printf(\"%d\t%d\n\",k,pt[k]);} }'"
-mapper " awk '
" , 这种写法的话, ' ' 里面的脚本中有三种字符(shell特殊字符)需要转义,分别是
" ' $ ,这种写法的好处是可以方便的引用外部shell变量
这种写法则不需要转义
类似的其他命令中需要嵌套单引号双引号也是这样。
关于“脏数据”:
我猜你可能也喜欢:
Share this
三江小渡 All rights reserved
Fastfood theme by
- Powered by
[put here your code]
quickbar tool -->
Recent Posts
by 三江小渡我猜你可能也喜欢:PHP实现双向链表给你的博客添加一个简易的计数器
by 三江小渡最多留言日志2011年全国数学建模B题题解和论文About偶滴名字来源,再次警示自己2011年全国数学建模B题[...]
by 三江小渡最多留言日志2011年全国数学建模B题题解和论文About偶滴名字来源,再次警示自己2011年全国数学建模B题[...]
by 三江小渡安装了rsync程序后,运行以下shell程序即可完成rsync服务的启动,自行修改相关的module和认证用[...]
by 三江小渡mac下没有找到好用的类似secureCRT,就自己写了个自动登录的脚本,分享一下,如果是新浪的,就基本不用修[...]
by 三江小渡协同过滤算法是推荐系统中最古老,也是最简单高效的推荐算法。简单说协同过滤就是根据以往的用户产生的数据分析,对用[...]
by 三江小渡无意中看到哲学家不解释的博客,被她的文章吸引甚深。又看了她的两篇文章:人人主站 和 哲学十二钗问答 。然后特别[...]
by 三江小渡The big difference between MySQL Table Type MyISAM and [...]
by 三江小渡最多留言日志2011年全国数学建模B题题解和论文About偶滴名字来源,再次警示自己2011年全国数学建模B题[...]
by 三江小渡我猜你可能也喜欢:用于大数据的并查集(基于HBase)的java类最长回文子串算法(Manacher)密码学原[...]
Categories
( 102)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 63)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 44)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 22)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 22)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 21)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 20)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 19)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 9)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 ( 7)Recent Posts by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡 by 三江小渡
Recent Comments
三江小渡 about 已经成年的没管过这个blog了 [笑cry] 小萤晒星星 about 哈哈哈,哇~侯哥小时候的博客呀 将就,不将就 about 楼主,请问一下怎么处理第一问中表格的数据呀,比如1和15节点形成一条路,那么怎么才能求这条路的路程呢?一条路的路程好求,只是不知道有那么多条路,而且在两个表格中的排列顺序也不一样,所以不知道怎么求,要是知道了如何处理数据,后面的都好说,就要交作业了,还行楼主教我,非常谢谢!!! 郑晓鹏 about 看过代码和分析不太明白为什么要每次需要重置以前的状态 三江小渡 about 对你也有用我也感到很开心~~:) 不会摇头的傻瓜 about 之前看老师推荐的相关论文,都会写稀疏性,潜意识觉得好像真的是硬伤,结果。。。 不会摇头的傻瓜 about 超级赞,最近刚好有用,!谢谢啦~ 唯美句子 about 这分析精神值得学习!! alan about 感觉很这篇科普型的文章很适合我这种读本科的刚开始学信息安全的人哈哈,以后会多看看你的博文的 三江小渡 about 万能的google 呀~~然后就是豆瓣搜书,看看评价好的 翻译过来的书,甚至会有英文版的书,
就这俩途径~
Not logged in
Welcome , today is 星期一, 2017 年 05 月 22 日
Leave a comment
feed for comments on this post
Trackback URL
Next Post: 利用HDFS的JavaAPI编程[二]
Previous Post: hadoop配置、运行错误总结[二]
Top of page
Bottom of page当前位置: →
→ 奇怪,使用tabControl控件时,Tab之间总是会自动掉换顺序.解决办法
奇怪,使用tabControl控件时,Tab之间总是会自动掉换顺序.解决办法
& 作者:佚名 & 来源: 互联网 & 热度:
&收藏到→_→:
摘要: 奇怪,使用tabControl控件时,Tab之间总是会自动掉换顺序.已经有几次了,有的时候突然发现Tab之间就掉换了顺序,而且没有规律.,没...
"奇怪,使用tabControl控件时,Tab之间总是会自动掉换顺序.解决办法"::
奇怪,使用tabcontrol控件时,tab之间总是会自动掉换顺序.已经有几次了,有的时候突然发现tab之间就掉换了顺序,而且没有规律.------解决方案--------------------没遇到过...
------解决方案--------------------我也遇到过,晕。不知道怎么回事
------解决方案--------------------同样郁闷,都是手动又调整过来,关注一下
------解决方案--------------------我也遇到过,关注一下
------解决方案--------------------经常的,系统老是自动调整生成顺序
------解决方案--------------------经常遇到 vs都有 然后要手工调整回来 爆麻烦 搜索此文相关文章:此文来自: 马开东博客
网址: 站长QQ
上一篇:没有了
奇怪,使用tabControl控件时,Tab之间总是会自动掉换顺序.解决办法_C#技术相关文章
C#技术_总排行榜
C#技术_最新
C#技术_月排行榜
C#技术_周排行榜
C#技术_日排行榜
马开东博客专栏
企业软件/开发
硬件/嵌入开发
马开东博客专栏
应用服务器
软件工程/管理/测试
马开东博客专栏
Linux/Unix
马开东博客专栏
开发语言/框架
专题开发/技术/项目
马开东博客专栏
高性能开发
马开东博客专栏

我要回帖

更多关于 hadoop streaming例子 的文章

 

随机推荐