数据压缩

概述

这是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担)

  1. Mapreduce支持将map输出的结果或者reduce输出的结果进行压缩,以减少网络IO或最终输出数据的体积
  2. 压缩特性运用得当能提高性能,但运用不当也可能降低性能
  3. 基本原则:
    运算密集型的job,少用压缩
    IO密集型的job,多用压缩

MR支持的压缩编码

压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换压缩格式后,原来的程序是否需要修改
DEFAULT DEFAULT .deflate 和文本处理一样,不需要修改
Gzip DEFAULT .gz 和文本处理一样,不需要修改
bzip2 bzip2 .bz2 和文本处理一样,不需要修改
LZO LZO .lzo 需要建索引,还需要指定输入格式
Snappy Snappy .snappy 和文本处理一样,不需要修改

为了支持多种压缩/解压缩算法,hadoop引入了编码/解码器,如下

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

压缩性能比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.9MB/s

压缩方式选择

Gzip压缩

优点: 压缩率比较高,而且压缩/解压速度也比较快,hadoop本身支持,在应用处理gzip格式的文件就和直接处理文本一样,大部分linux系统都自带gzip命令,使用方便

缺点: 不支持split

应用场景: 当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用gzip压缩格式,比如一天或者一小时的日志压缩成一个gzip文件,运行mapreduce程序的时候通过多个gzip文件达到并发.hive程序和java写的mapreduce程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改

Bzip2压缩

优点: 支持split,具有很高的压缩率,比gzip压缩率都高,hadoop本身支持,但不支持native,在linux系统下自带bzip2命令使用方便

缺点: 压缩/解压速度慢,不支持native

应用场景: 适用对速度要求不高,但需要较高压缩率的时候,可以作为mapreduce作业的输出格式.或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况,或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(应用程序不需要修改)的情况

Lzo压缩

优点: 压缩/解压速度也比较快,合理的压缩率.支持split,是hadoop中最流行的压缩格式.可以在linux系统下安装lzop命令,使用方便

缺点: 压缩率比gzip要低一些,hadoop本身不支持,需要安装.在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inoutformat为lzo格式)

应用场景: 一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越明显

Snappy压缩

优点: 高速压缩速度和合理的压缩率

缺点: 不支持split,压缩率比gzip要低,hadoop本身不支持,需要安装

应用场景: 当mapreduce作业的map输出的数据比较大的时候,作为map到reduce的中间数据的压缩格式,或者作为一个mapreduce作业的输出和另外一个mapreduce作业的输入

压缩位置选择

压缩可以在mapreduce作用的任意阶段启用

输入端采用压缩

在有大量数据并计划重复处理的情况下,应该考虑对输入进行压缩.然而,你无须显示指定使用的编解码方式.
hadoop自动检查文件扩展名.如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压,否则hadoop不会使用任何编解码器

输出采用压缩

当map任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术.这能显著改善内部数据shuffle过程.shuffle是消耗资源最多的环节.可用于压缩mapper输出的快速编解码器包括LZO或者snapper

reducer输出采用压缩

在此阶段启用压缩技术能够减少要存储的数据量,因此降低所需的磁盘空间.
当mapreduce作业形成作业链条时,所以启用压缩同样有效

压缩配置参数

要在hadoop中启用压缩,可以配置如下参数(mapred-site.xml中)

image.png image.png
image.png

代码

测试输出压缩

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.io.IOUtils;
  3. import org.apache.hadoop.io.compress.CompressionCodec;
  4. import org.apache.hadoop.io.compress.CompressionOutputStream;
  5. import org.apache.hadoop.util.ReflectionUtils;
  6. import java.io.*;
  7. public class TestCompress {
  8. public static void main(String[] args) throws IOException, ClassNotFoundException {
  9. //测试压缩
  10. compress("/Users/jdxia/Desktop/website/data/input/order.txt", "org.apache.hadoop.io.compress.BZip2Codec");
  11. }
  12. //测试压缩
  13. @SuppressWarnings({"resource", "unchecked"})
  14. private static void compress(String filename, String method) throws IOException, ClassNotFoundException {
  15. //获取输入流
  16. FileInputStream fis = new FileInputStream(new File(filename));
  17. Class className = Class.forName(method);
  18. CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(className, new Configuration());
  19. //获取输出流,输出的文件是文件名加后缀
  20. FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
  21. CompressionOutputStream cos = codec.createOutputStream(fos);
  22. //流的对拷,暂时不关闭流,最后在filename所在文件夹中会有个压缩文件
  23. IOUtils.copyBytes(fis, cos, 1024*1024*5, false);
  24. //关闭资源
  25. fis.close();
  26. cos.close();
  27. fos.close();
  28. }
  29. }

测试输入压缩

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IOUtils;
  4. import org.apache.hadoop.io.compress.CompressionCodec;
  5. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  6. import org.apache.hadoop.io.compress.CompressionInputStream;
  7. import java.io.File;
  8. import java.io.FileInputStream;
  9. import java.io.FileOutputStream;
  10. import java.io.IOException;
  11. public class TestCompress {
  12. public static void main(String[] args) throws IOException, ClassNotFoundException {
  13. //测试压缩
  14. decompress("/Users/jdxia/Desktop/website/data/input/order.txt.bz2");
  15. }
  16. private static void decompress(String filename) throws IOException {
  17. //校验是否能解压缩
  18. CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
  19. CompressionCodec codec = factory.getCodec(new Path(filename));
  20. //如果不支持直接返回
  21. if (codec == null) {
  22. System.out.println("cannot find codec for file " + filename);
  23. return;
  24. }
  25. //获取输入流
  26. CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
  27. //获取输出流
  28. FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
  29. //流的对拷
  30. IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
  31. //关闭资源
  32. cis.close();
  33. fos.close();
  34. }
  35. }

map输出采用压缩

即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对map任务的中间结果输出做压缩,因为他要写在硬盘并且通过网络传输到reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可

这些工作只要设置两个属性就可以

map端输入可以根据扩展名来

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.io.compress.BZip2Codec;
  6. import org.apache.hadoop.io.compress.CompressionCodec;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import java.io.IOException;
  12. public class WorldCountDriver {
  13. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  14. Configuration conf = new Configuration();
  15. //开启map端输出压缩
  16. conf.setBoolean("mapreduce.map.output.compress", true);
  17. conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
  18. Job job = Job.getInstance(conf);
  19. //告诉框架,我们程序的位置
  20. job.setJarByClass(WorldCountDriver.class);
  21. //告诉框架,我们程序所用的mapper类和reduce类是什么
  22. job.setMapperClass(WorldCountMapper.class);
  23. job.setReducerClass(WorldCountReducer.class);
  24. //告诉框架我们程序输出的类型
  25. job.setMapOutputKeyClass(Text.class);
  26. job.setMapOutputValueClass(IntWritable.class);
  27. job.setOutputKeyClass(Text.class);
  28. job.setOutputValueClass(IntWritable.class);
  29. //告诉框架,我们程序使用的数据读取组件,结果输出所用的组件是什么
  30. //TextInputFormat是mapreduce程序中内置的一种读取数据组件,准备的叫做读取文本的输入组件
  31. job.setInputFormatClass(TextInputFormat.class);
  32. //job.setOutputFormatClass(TextOutputFormat.class);
  33. //告诉框架,我们要处理的数据文件在那个路径下
  34. FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input"));
  35. //告诉框架我们的处理结果要输出到什么地方
  36. FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output"));
  37. //这边不用submit,因为一提交就和我这个没关系了,我这就断开了就看不见了
  38. // job.submit();
  39. //提交后,然后等待服务器端返回值,看是不是true
  40. boolean res = job.waitForCompletion(true);
  41. //设置成功就退出码为0
  42. System.exit(res?0:1);
  43. }
  44. }

reduce输出采用压缩

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.io.compress.BZip2Codec;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.io.IOException;
  11. public class WorldCountDriver {
  12. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  13. Configuration conf = new Configuration();
  14. Job job = Job.getInstance(conf);
  15. //告诉框架,我们程序的位置
  16. job.setJarByClass(WorldCountDriver.class);
  17. //告诉框架,我们程序所用的mapper类和reduce类是什么
  18. job.setMapperClass(WorldCountMapper.class);
  19. job.setReducerClass(WorldCountReducer.class);
  20. //告诉框架我们程序输出的类型
  21. job.setMapOutputKeyClass(Text.class);
  22. job.setMapOutputValueClass(IntWritable.class);
  23. job.setOutputKeyClass(Text.class);
  24. job.setOutputValueClass(IntWritable.class);
  25. //告诉框架,我们程序使用的数据读取组件,结果输出所用的组件是什么
  26. //TextInputFormat是mapreduce程序中内置的一种读取数据组件,准备的叫做读取文本的输入组件
  27. job.setInputFormatClass(TextInputFormat.class);
  28. //job.setOutputFormatClass(TextOutputFormat.class);
  29. //告诉框架,我们要处理的数据文件在那个路径下
  30. FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input"));
  31. //告诉框架我们的处理结果要输出到什么地方
  32. FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output"));
  33. //设置reduce端输出压缩开启
  34. FileOutputFormat.setCompressOutput(job, true);
  35. //设置压缩的方式
  36. FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
  37. //提交后,然后等待服务器端返回值,看是不是true
  38. boolean res = job.waitForCompletion(true);
  39. //设置成功就退出码为0
  40. System.exit(res?0:1);
  41. }
  42. }

数据流的压缩和解压缩

CompressionCodec有两个方法可以用于轻松的压缩或解压缩数据.要想对正在被写入一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream,将其以压缩格式写入底层的流.

相反,想要对从输入流读取而来的数据进行解压缩,则调用createInputStream(InputStream)函数,从而获得一个CompressionInputStream,从而从底层的流读取未压缩的数据

Reducer输出压缩

在配置参数或在代码中都可以设置reduce的输出压缩

  1. 在配置参数中设置
  1. mapreduce.output.fileoutputformat.compress=false
  2. mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
  3. mapreduce.output.fileoutputformat.compress.type=RECORD
  1. 在代码中设置
  1. Job job = Job.getInstance(conf);
  2. FileOutputFormat.setCompressOutput(job, true);
  3. FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));

Mapper输出压缩

在配置参数或在代码中都可以设置reduce的输出压缩

  1. 在配置参数中设置
  1. mapreduce.map.output.compress=false
  2. mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
  1. 在代码中设置:
  1. conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
  2. conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);

压缩文件的读取(源码)

Hadoop自带的InputFormat类内置支持压缩文件的读取,比如TextInputformat类,在其initialize方法中:

  1. public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
  2. FileSplit split = (FileSplit) genericSplit;
  3. Configuration job = context.getConfiguration();
  4. this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
  5. start = split.getStart();
  6. end = start + split.getLength();
  7. final Path file = split.getPath();
  8. // open the file and seek to the start of the split
  9. final FileSystem fs = file.getFileSystem(job);
  10. fileIn = fs.open(file);
  11. //根据文件后缀名创建相应压缩编码的codec
  12. CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
  13. if (null!=codec) {
  14. isCompressedInput = true;
  15. decompressor = CodecPool.getDecompressor(codec);
  16. //判断是否属于可切片压缩编码类型
  17. if (codec instanceof SplittableCompressionCodec) {
  18. final SplitCompressionInputStream cIn =
  19. ((SplittableCompressionCodec)codec).createInputStream(
  20. fileIn, decompressor, start, end,
  21. SplittableCompressionCodec.READ_MODE.BYBLOCK);
  22. //如果是可切片压缩编码,则创建一个CompressedSplitLineReader读取压缩数据
  23. in = new CompressedSplitLineReader(cIn, job,
  24. this.recordDelimiterBytes);
  25. start = cIn.getAdjustedStart();
  26. end = cIn.getAdjustedEnd();
  27. filePosition = cIn;
  28. } else {
  29. //如果是不可切片压缩编码,则创建一个SplitLineReader读取压缩数据,并将文件输入流转换成解压数据流传递给普通SplitLineReader读取
  30. in = new SplitLineReader(codec.createInputStream(fileIn,
  31. decompressor), job, this.recordDelimiterBytes);
  32. filePosition = fileIn;
  33. }
  34. } else {
  35. fileIn.seek(start);
  36. //如果不是压缩文件,则创建普通SplitLineReader读取数据
  37. in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
  38. filePosition = fileIn;
  39. }