`
zhangxiong0301
  • 浏览: 351050 次
社区版块
存档分类
最新评论

HADOOP中mapreduce开启压缩功能

阅读更多

           

           最近给热云公司共享数据,我们把原始数据给到他们,让他们做计算。每天同步一次,数据量压缩后10几个G,数据来自hive的mapreduce查询。通过insert overwrite local directory select语句将数据写入本地的NFS,然后对数据压缩,并在NFS的服务端机器提供文件下载功能。由于压缩前数据量太大,大概有90G左右。因此在hive作业最后写入select结果数据到本地文件系统时直接报错中断了。而且就算能拷贝到本地,之后的压缩时间没有好几个小时也甭想完成。于是就想到了用启用hadoop的数据压缩功能,使mapreduce作业直接输出压缩好的数据到本地文件系统。具体步骤如下:

 

 

1.执行hive语句之前,在hive-cli中设置如下参数:

 

set  mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec;

 

此时执行测试语句:

 

insert overwrite  directory  '/user/reyun/testrawdataexport' 
select keywords,count(1) from qiku.illegalpackage group by keywords;

 

发现报错:

 

Diagnostic Messages for this Task:
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#1
        at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
Caused by: java.io.IOException: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.
        at org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.checkReducerHealth(ShuffleSchedulerImpl.java:333)
        at org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.copyFailed(ShuffleSchedulerImpl.java:255)
        at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:351)
        at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:193)

 

显然,shuffle时候出问题了。再看下具体执行任务的节点的报错信息:

 

2015-10-14 10:00:03,555 INFO [main] org.apache.hadoop.io.compress.CodecPool: Got brand-new compressor [.gz]
2015-10-14 10:00:03,556 WARN [main] org.apache.hadoop.mapred.IFile: Could not obtain compressor from CodecPool

 

 压缩器是hadoop带的功能,hdfs在做文件压缩时,只是向CodecPool获取压缩器,但显然CodecPool里没有可用的压缩器,因此需要我们配置。即下面第二步所做的事情。

 

2.配置hadoop,使其加载各种压缩工具,提供压缩功能。在hadoop的core-site.xml文件中加入如下片段:

 

 

<property>
	<name>io.compression.codecs</name>
	<value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec</value>
</property>

 

这时候在运行测试sql,发现正常运行了。但是结果却没有压缩。查看一下hive的执行计划。

 

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: illegalpackage
            Statistics: Num rows: 2 Data size: 250 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: keywords (type: string)
              outputColumnNames: keywords
              Statistics: Num rows: 2 Data size: 250 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: count(1)
                keys: keywords (type: string)
                mode: hash
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 250 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string)
                  sort order: +
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 2 Data size: 250 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col1 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 1 Data size: 125 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: _col0 (type: string), _col1 (type: bigint)
            outputColumnNames: _col0, _col1
            Statistics: Num rows: 1 Data size: 125 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 1 Data size: 125 Basic stats: COMPLETE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

 

发现其中一行compressed: false。因此想到是不是hive也加了一道开关控制数据是否压缩。在hive-defaut.xml里搜索compress关键字,发现确实有控制结果输出是否压缩的配置,默认不压缩。因此再在sql执行前加上hive的压缩开关配置.

 

set hive.exec.compress.output=true;         

 

此时,再执行sql发现能正确输出压缩后的结果。SUCCEED.

 

-rw-r--r--. 1 hadoop hadoop 48542102 10月 14 2015 000000_0.gz
-rw-r--r--. 1 hadoop hadoop 47870794 10月 14 2015 000001_0.gz
-rw-r--r--. 1 hadoop hadoop 47912034 10月 14 2015 000002_0.gz
-rw-r--r--. 1 hadoop hadoop 46831088 10月 14 2015 000003_0.gz
-rw-r--r--. 1 hadoop hadoop 47351273 10月 14 2015 000004_0.gz
-rw-r--r--. 1 hadoop hadoop 47238145 10月 14 2015 000005_0.gz
-rw-r--r--. 1 hadoop hadoop 47283714 10月 14 2015 000006_0.gz
-rw-r--r--. 1 hadoop hadoop 46932744 10月 14 2015 000007_0.gz
-rw-r--r--. 1 hadoop hadoop 46880888 10月 14 2015 000008_0.gz

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics