FileInputFormat

FileInputFormat是基本的数据读取类型,包括TextInputFormat、KeyValueInputFormat、NLineInputFormat、CombineTextInputFormat以及自定义的InputFormat。

  • TextInputFormat:默认的类型,key是偏移量Long类型,value是一行的数据;
  • KeyValueInputFormat:默认以tab分割,一行数据中tab前是key,tab后面是value;
  • NLineInputFormat:按行数定义切片大小;
  • CombineTextInputFormat:小文件数量较多时会使用,将多个小文件从逻辑上规划到一个切片中。

shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称为Shuffle或洗牌。

partition分区

默认分区是根据key的hashCode对ReduceTasks个数取模得到,用户没法控制哪个key存储到哪个分区。

通过自定义Partitioner,可以实现自定义分区。

例如,按手机号码前三位进行分区。

添加类继承Partitioner,

  1. package partition;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. /**
  5. * @author Administrator
  6. */
  7. public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
  8. @Override
  9. public int getPartition(Text text, FlowBean flowBean, int i) {
  10. String prePhoneNum = text.toString().substring(0, 3);
  11. int partition=4;
  12. if ("130".equals(prePhoneNum)) {
  13. partition = 0;
  14. } else if ("131".equals(prePhoneNum)) {
  15. partition = 1;
  16. } else if ("132".equals(prePhoneNum)) {
  17. partition = 2;
  18. } else if ("133".equals(prePhoneNum)) {
  19. partition = 3;
  20. }
  21. return partition;
  22. }
  23. }

然后在驱动类中添加

  1. job.setPartitionerClass(ProvincePartitioner.class);
  2. job.setNumReduceTasks(5);//默认值是1

这里一共分了5个区,分区号必须从零开始编号、逐一累加。

WritableComparable排序

MapTask和ReduceTask都会对数据按照key进行排序,属于Hadoop的默认行为。

默认排序是按照字典排序,且实现该排序的方法是快速排序。

排序的分类:部分排序、全排序、辅助排序(分组排序)、二次排序。

排序的实现:bean对象作为key传输,实现WritableComparable接口重写compareTo方法。

全排序

全排序要在实例中实现WritableComparable接口重写compareTo方法。

  1. @Override
  2. public int compareTo(FlowBean bean) {
  3. int result;
  4. if (sumFlow>bean.getSumFlow()){
  5. result=-1;
  6. }else if(sumFlow<bean.getSumFlow()){
  7. result=1;
  8. }else {
  9. result=0;
  10. }
  11. return result;
  12. }

分区排序

即分区和排序的结合。

Combiner合并

Combiner是Reducer的子类,在每一个MapTask所在节点运行。意义在于对每一个MapTask的输出进行局部汇总,以减少网络传输量。不适应于求均值的场景,会影响加权。

辅助排序

再自定义类继承WritableComparator类,重写compare方法,并在启动类关联。

以下为例

  1. package group;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import org.apache.hadoop.io.WritableComparator;
  4. /**
  5. * @author Administrator
  6. */
  7. public class OrderGroupComparator extends WritableComparator {
  8. protected OrderGroupComparator(){
  9. super(Order.class,true);
  10. }
  11. @Override
  12. public int compare(WritableComparable a, WritableComparable b) {
  13. Order aBean= (Order) a;
  14. Order bBean= (Order) b;
  15. int result;
  16. if (aBean.getId()>bBean.getId()){
  17. result=1;
  18. }else if (aBean.getId()<bBean.getId()){
  19. result=-1;
  20. }else {
  21. result=0;
  22. }
  23. return result;
  24. }
  25. }

OutputFormat

OutputFormat是MapReduce输出的基类,默认值是TextOutputFormat,即把结果记录为文本。

  • TextOutputFormat:文本输出;
  • SequenceFileOutputFormat:输出结果作为后续MapReduce任务的输入;
  • 自定义OutputFormat:包括输出到MySQL、Redis、HDFS等。

压缩与解压

这里以BZip2和Gzip压缩格式为例,直接上代码。

  1. package compress;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IOUtils;
  5. import org.apache.hadoop.io.compress.CompressionCodec;
  6. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  7. import org.apache.hadoop.io.compress.CompressionInputStream;
  8. import org.apache.hadoop.io.compress.CompressionOutputStream;
  9. import org.apache.hadoop.util.ReflectionUtils;
  10. import java.io.*;
  11. /**
  12. * @author Administrator
  13. */
  14. public class TestCompress {
  15. public static void main(String[] args) throws IOException, ClassNotFoundException {
  16. //压缩
  17. //compress("C:/Users/Administrator/Desktop/input/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
  18. //compress("C:/Users/Administrator/Desktop/input/hello.txt","org.apache.hadoop.io.compress.GzipCodec");
  19. //解压
  20. decompress("C:/Users/Administrator/Desktop/input/hello.txt.gz");
  21. }
  22. private static void compress(String fileName, String method) throws IOException, ClassNotFoundException {
  23. //获取输入输出流
  24. FileInputStream fileInputStream = new FileInputStream(new File(fileName));
  25. Class aClass = Class.forName(method);
  26. CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(aClass, new Configuration());
  27. FileOutputStream fileOutputStream = new FileOutputStream(new File(fileName + codec.getDefaultExtension()));
  28. CompressionOutputStream codecOutputStream = codec.createOutputStream(fileOutputStream);
  29. //流的拷贝
  30. IOUtils.copyBytes(fileInputStream,codecOutputStream,1024*1024,false);
  31. //关闭流
  32. IOUtils.closeStream(codecOutputStream);
  33. IOUtils.closeStream(fileOutputStream);
  34. IOUtils.closeStream(fileInputStream);
  35. }
  36. private static void decompress(String fileName) throws IOException {
  37. //压缩方式检查
  38. CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
  39. CompressionCodec codec = factory.getCodec(new Path(fileName));
  40. if (codec==null){
  41. System.out.println("can't process");
  42. return;
  43. }
  44. //获取输入输出流
  45. FileInputStream fileInputStream = new FileInputStream(new File(fileName));
  46. CompressionInputStream codecInputStream = codec.createInputStream(fileInputStream);
  47. FileOutputStream fileOutputStream = new FileOutputStream(new File(fileName + ".decode"));
  48. //流的对拷
  49. IOUtils.copyBytes(codecInputStream,fileOutputStream,1024*1024,false);
  50. //关闭流
  51. IOUtils.closeStream(fileOutputStream);
  52. IOUtils.closeStream(codecInputStream);
  53. IOUtils.closeStream(fileInputStream);
  54. }
  55. }