1.MapReduce思想

MapReduce思想在生活中处处可见, 我们或多或少都曾接触过这种思想.MapReduce的思想核心是分而治之
充分利用了并行处理的优势
MapReduce任务过程是分为两个处理阶段:

  • Map阶段: Map阶段的主要作用是”分” , 即把复杂的任务分解为若干个”简单的任务”来并行处理. Map阶段的这些任务可以并行计算, 彼此没有依赖关系
  • Reduce阶段: Reduce阶段的主要作用是”合”, 即堆Map阶段的结果进行全局汇总

MapReduce的思想
image.png

2.官方WordCount案例源码解析

image.png
image.png
image.png
经过查看分析官方WordCount案例源码我们发现一个统计单词数量的MapReduce程序的代码由三个部分组成.

  • Mapper类
  • Reducer类
  • 运行作业的代码(Driver)

Mapper类继承了org.apache.hadoop.mapreduce.Mapper类重写了其中的map方法, Reducer类继承了org.apache.hadoop.mapreduce.Reducer类重写了其中的reduce方法
重写的Map方法作用: map方法其中的逻辑就是用户希望mr程序map阶段如何处理的逻辑;
重写的Reduce方法作用: reduce方法其中的逻辑是用户希望mr程序reduce阶段如何处理的逻辑

1.Hadoop序列化

为什么进行序列化?
序列化主要是我们网络通信传输数据时,或者把对象持久化到文件 , 需要把对象序列化二进制的结构
观察源码时发现自定义Mapper类与自定义Reducer类都有泛型类型约束 , 比如自定义Mapper有四个形参, 但是形参类型不是常见的Java基本类型.
为什么Hadoop要选择建立自己的序列化格式而使用Java自带的serializable?

  • 序列化分布式程序中非常重要,在hadoop中,集群中多个节点的进程间的通信是通过RPC(远程过程调用 Renote Procedurce Call)实现; RPC将消息序列化成二进制流发送到远程节点, 远程过程节点再将接收到的二进制数据反序列化为原始的消息 , 因此RPC往往追求一下特点:
    • 紧凑: 数据更加紧凑 , 能充分利用网络带宽资源
    • 快速: 序列化和反序列化的性能开销更低
  • Hadoop使用的是自己的序列化格式Writable. 它比Java的序列化Serialzation更加紧凑速度快, 一个对象使用Serialzation序列化之后 , 会携带很多额外信息比如校验信息 , Header, 继承体系等等

Java包装类型与Hadoop常用序列化类型

Java包装类型 Hadoop Writable 类型
Boolean BooleanWritable
Byte ByteWritable
Integer IntWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
Float FloatWritable

3.MapReduce编程规范以及示例编写

1.MapReduce 编程规范以及示例编写

  • 用户自定义一个Mapper类继承Hadoop的Mapper类
  • Mapper的输入数据是KV对的形式(类型可以自定义)
  • Map阶段的业务逻辑定义在map()方法中
  • Mapper的输出数据是KV对的形式(类型可以自定义)

注意: map()方法是对输入的一个KV对调用一次!!

2.Reducer类

  • 用户自定义Reducer类要继承Hadoop的Reducer类
  • Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
  • Reducer的业务逻辑写在reduce()方法中
  • Reduce()方法是对相同K的一组KV对调用执行一次

    3.Driver阶段

    创建提交YARN集群运行的Job对象 , 其中封装了MapReduce程序运行所需要的相关参数输入数据路径,输出数据路径等, 也相当于是一个YARN集群的客户端 , 主要作用就是提交我们的MapReduce程序运行

4.代码实现

1.需求

在给定的文本文件中统计输出每一个单词出现的总次数
输入数据: wc.txt
输出:

  1. apache 2
  2. clickhouse 2
  3. mapreduce 1
  4. spark 2
  5. xiaoming 1

2.实现方案

1.整体思路梳理

Map阶段:

  1. map()方法中把传入的数据转成String类型
  2. 根据空格切分出单词
  3. 输出<单词,1>

Reduce阶段:

  1. 汇总各个key(单词)的个数, 遍历value数据进行累加
  2. 输出key的总数

Driver

  1. 获取配置文件对象, 获取job对象实例
  2. 指定程序jar的本地路径
  3. 指定mapper/reducer类
  4. 指定Mapper终输出的kv数据类型
  5. 指定最终输出的kv数据类型
  6. 指定Job处理的原始数据路径
  7. 指定Job输出结果路径
  8. 提交作业

    2.编写Mapper类

    ```java package com.anda.hadoop.mapper;

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

  • @author anda
  • @since 1.0 */ public class WordCountMapper extends Mapper {

    Text k = new Text(); IntWritable v = new IntWritable(1);

    @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

  1. // 1. 获取第一行
  2. String line = value.toString();
  3. // 2. 切割
  4. String[] words = line.split(" ");
  5. // 3. 输出
  6. for (String word : words) {
  7. k.set(word);
  8. context.write(k, v);
  9. }
  10. }

}

  1. <a name="lSfyu"></a>
  2. #### 3.编写Reducer
  3. ```java
  4. package com.anda.hadoop.mapper;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. import java.io.IOException;
  9. /**
  10. * @author anda
  11. * @since 1.0
  12. */
  13. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  14. int sum;
  15. IntWritable v = new IntWritable();
  16. @Override
  17. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  18. // 1. 累加求和
  19. sum = 0;
  20. for (IntWritable count : values) {
  21. sum += count.get();
  22. }
  23. // 2. 输出
  24. v.set(sum);
  25. context.write(key, v);
  26. }
  27. }

4.Driver 驱动类

  1. package com.anda.hadoop.mapper;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. /**
  11. * @author anda
  12. * @since 1.0
  13. */
  14. public class WordCountDriver {
  15. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  16. // 1. 获取配置信息以及封装任务
  17. Configuration configuration = new Configuration();
  18. Job job = Job.getInstance(configuration);
  19. // 2. 设置jar加载路径
  20. job.setJarByClass(WordCountDriver.class);
  21. // 3. 设置map和reduce类
  22. job.setMapperClass(WordCountMapper.class);
  23. job.setReducerClass(WordCountReducer.class);
  24. // 4. 设置map输出
  25. job.setMapOutputKeyClass(Text.class);
  26. job.setMapOutputValueClass(IntWritable.class);
  27. // 5. 设置最终输出kv类型
  28. job.setOutputKeyClass(Text.class);
  29. job.setOutputValueClass(IntWritable.class);
  30. FileInputFormat.setInputPaths(job, new Path(args[0]));
  31. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  32. // 7. 提交
  33. boolean b = job.waitForCompletion(true);
  34. System.exit(b ? 0 : 1);
  35. }
  36. }

5.运行任务

  1. 本地模式
    1. 直接IDEA中运行驱动类集合
    2. idea中需要传入参数即可

image.png

运行结束 , 到输出路径查看结果
image.png
注意本地idea运行mr任务与集群没有任何关系, 没有提交任务到yarn集群 , 是在本地使用多线程方式模拟的mr的运行

  1. Yarn集群模式
    1. 把程序打成jar包 , 上传到Hadoop集群 , 选择合适的jar包

image.png
准备原始数据文件上传到HDFS的路径 , 不能是本地路径 , 因为跨节点运行无法获取数据!!

  1. 启动Hadoop集群(Hdfs, Yarn)
  2. 使用haddoop命令提交任务
    1. $ hadoop jar anda-hadoop-1.0.0.jar com.anda.hadoop.mapper.WordCountDriver /user/anda/test/input /user/anda/test/back
    image.png
    执行成功

    4.序列化Writable接口

    基本序列化类型往往不能满足所有需求 , 比如在Hadoop框架内部传递一个自定义bean对象 , 那么该对象需要实现Writable序列化接口.

    1.实现Writable序列化步骤如下

  1. 必须实现Writable接口
  2. 反序列化时, 需要反射调用空参构造函数 , 所以必须有空参构造

    1. public CostomBean(){}
  3. 重写序列化方法

    1. @Override
    2. public void write(DataOutput out)throws Exception{}
  4. 重写反序列化方法

    1. @Override
    2. public void readFields(DataInput in)throws IOException{}
  5. 反序列化的字段顺序和序列化字段必须完全一致

  6. 方便展示结果数据 , 需要重写bean对象 的toString()方法, 可以自定义分隔符
  7. 如果自定义Bean对象需要放在mapper输出KV中的K,则该对象还需要实现Comparable接口, 因为MapReduce框中的Suffle 过程要求对key必须能排序

2.Writable接口案例

  1. 需求 统计每台智能音箱设备内容播放时长 , 原始日志格式

    1. 001 001577c3 kar_890809 120.192.100.99 1116 954 200
    2. 日志Id 设备Id appkey 网络IP 自有内容时长 第三方内容时长 网络状态码

    1.编写MapReduce程序

  2. 创建SpeakBean对象 ```java package com.anda.hadoop.writable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

/**

  • @author anda
  • @since 1.0 */ // 1. 实现writable接口 public class SpeakBean implements Writable {

    private Long selfDuration; private Long thirdPartDuration; private Long sumDuration;

    // 2.反序列化时 , 需要反射调用空参构造函数 , 所以必须有 public SpeakBean() {

    }

    public SpeakBean(Long selfDuration, Long thirdPartDuration) {

    1. this.selfDuration = selfDuration;
    2. this.thirdPartDuration = thirdPartDuration;
    3. this.sumDuration = this.selfDuration + this.thirdPartDuration;

    }

    // 3.写序列化方法 @Override public void write(DataOutput dataOutput) throws IOException {

    1. dataOutput.writeLong(selfDuration);
    2. dataOutput.writeLong(thirdPartDuration);
    3. dataOutput.writeLong(sumDuration);

    }

    // 4. 反序列化方法 // 5. 反序列化方法读顺序必须和写序列化方法的写顺序必须一致 @Override public void readFields(DataInput dataInput) throws IOException {

    1. this.selfDuration = dataInput.readLong();
    2. this.thirdPartDuration = dataInput.readLong();
    3. this.sumDuration = dataInput.readLong();

    }

    public Long getSelfDuration() {

    1. return selfDuration;

    }

    public void setSelfDuration(Long selfDuration) {

    1. this.selfDuration = selfDuration;

    }

    public Long getThirdPartDuration() {

    1. return thirdPartDuration;

    }

    public void setThirdPartDuration(Long thirdPartDuration) {

    1. this.thirdPartDuration = thirdPartDuration;

    }

    public Long getSumDuration() {

    1. return sumDuration;

    }

    public void setSumDuration(Long sumDuration) {

    1. this.sumDuration = sumDuration;

    }

    public void set(Long selfDuration, Long thirdPartDuration) {

    1. this.selfDuration = selfDuration;
    2. this.thirdPartDuration = thirdPartDuration;
    3. this.sumDuration = this.selfDuration + this.thirdPartDuration;

    }

    // 6. 编写toString方法, 方便后续打印 @Override public String toString() {

    1. return "SpeakBean{" +
    2. "selfDuration=" + selfDuration +
    3. ", thirdPartDuration=" + thirdPartDuration +
    4. ", sumDuration=" + sumDuration +
    5. '}';

    } }

  1. 2. 编写Mapper
  2. ```java
  3. package com.anda.hadoop.writable;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import java.io.IOException;
  8. /**
  9. * @author anda
  10. * @since 1.0
  11. */
  12. public class SpeakDurationMapper extends Mapper<LongWritable, Text, Text, SpeakBean> {
  13. SpeakBean v = new SpeakBean();
  14. Text text = new Text();
  15. @Override
  16. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  17. // 1. 获取一行
  18. String line = value.toString();
  19. // 2. 切割字段
  20. String[] files = line.split(" ");
  21. // 3.封装对象 取出设备Id
  22. String deviceId = files[1];
  23. long selfDuration = Long.parseLong(files[files.length - 3]);
  24. long thirdPartDuration = Long.parseLong(files[files.length - 2]);
  25. text.set(deviceId);
  26. v.set(selfDuration, thirdPartDuration);
  27. // 4.写出
  28. context.write(text, v);
  29. }
  30. }
  1. 编写Reducer ```java package com.anda.hadoop.writable;

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**

  • @author anda
  • @since 1.0 */ public class SpeakDurationReducer extends Reducer {

    @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

    1. long selfDuration = 0;
    2. long thirdPartDuration = 0;
    3. // 1. 遍历所有bean, 将其中的自有 , 第三方时长分别累加
    4. for (SpeakBean value : values) {
    5. selfDuration += value.getSelfDuration();
    6. thirdPartDuration += value.getThirdPartDuration();
    7. }
    8. SpeakBean speakBean = new SpeakBean(selfDuration, thirdPartDuration);
    9. context.write(key, speakBean);

    } }

  1. 4. 编写驱动
  2. ```java
  3. package com.anda.hadoop.writable;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.io.IOException;
  11. /**
  12. * @author anda
  13. * @since 1.0
  14. */
  15. public class SpeakerDriver {
  16. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  17. args = new String[]{"/Users/anda/Desktop/test.txt.bak", "/Users/anda/Desktop/test.txt.dir"};
  18. // 1.获取配置信息 , 或者job对象实例
  19. Configuration configuration = new Configuration();
  20. Job job = Job.getInstance(configuration);
  21. // 2. 指定本程序的jar包所在的本地路径
  22. job.setJarByClass(SpeakerDriver.class);
  23. // 3. 指定本业务job要使用的mapper/reducer业务类
  24. job.setMapperClass(SpeakDurationMapper.class);
  25. job.setReducerClass(SpeakDurationReducer.class);
  26. // 4. 指定mapper输出数据的kv类型
  27. job.setMapOutputKeyClass(Text.class);
  28. job.setMapOutputValueClass(SpeakBean.class);
  29. // 5.指定最终输出的数据的kv类型
  30. job.setOutputKeyClass(Text.class);
  31. job.setOutputValueClass(SpeakBean.class);
  32. // 6.指定job的输出原始文件所在目录
  33. FileInputFormat.setInputPaths(job, new Path(args[0]));
  34. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  35. // 7.
  36. boolean b = job.waitForCompletion(true);
  37. System.exit(b ? 0 : 1);
  38. }
  39. }

mr 编程技巧总结

  • 结合业务设计map输出的key和value, 尽量用key相同则取往同一个reduce的特点
  • map()方法中获取到只是一行文本数据 , 尽量不做聚合运算
  • reduce()方法的参数要清楚含义

    5.MapReduce原理分析

    1.MapTask运行机制详解

    MapTask流程
    image.png
    详细步骤:
  1. 首先 , 读取数据组件InputFormat(默认 TextInputFormat)会通过gitSplit方法对输入目录中文件进行逻辑切分规划得到splits,有多少个split就对应启用多少个MapTask, split与block的对应关系默认是1对1.
  2. 将输入文件切分为splits之后 , 由RecordReader对象(默认LineRecordReader), 进行读取 , 以\n作为分隔符 , 读取一行数据 , 返回,Key表示每行首字符偏移值 , value表示这一行文本内容.
  3. 读取split返回 进入用户自己继承的Mapper类中 , 执行用户重写的map函数 , RecordReader 读取一行这里调用一次.
  4. map逻辑完之后 , 将map的每条结果通过context.write进行collect数据收集 , 在collect中, 会先对其进行分区处理 , 默认使用HashPartitioner

MapReduce提供Partitioner接口 , 他的作用就是根据key或value以及reduce的数量来决定当前的这对输出数据最终应该交给由那个reduce task处理 , 默认对key hash后再以reduce数量取模, 默认的取模方式只是为了平均reduce的处理能力 , 如果用户对自己的Partitioner有需求 , 可以定制并设置到job上.

  1. 接下来, 会将数据写入内存 , 内存中这片区域叫做环形缓冲区, 缓冲区的作用是批量收集map结果 , 减少磁盘IO的影响, 我们的key/value对以及Partition结果都会被写入缓冲区 , 当然吸入之前 , key与value值都会被序列化字节数组
    1. 环形缓冲区其实是一个数组 , 数组中存放着key, value的序列化数据和key,value的元数据信息, 包括partition , key的起始位置 , value的起始位置 , 以及value的长度 , 环形结构是一个抽象概念.
    2. 缓冲区是有大小限制 , 默认是100MB, 当map task的输出结果很多时, 就可能撑爆内存, 所以需要在一定条件下缓冲区中的数据临时写入磁盘 , 然后重新利用这一块缓冲区 , 这个内存往磁盘写数据的过程中称为Spill , 中文可译为溢写 , 这个溢出是由单独线程来完成, 不影响往缓冲区写map结果的线程. 溢写线程启动时不应该阻止map的结果输出 , 所以整个缓冲区有个溢写的比例spill.percent, 这个比例默认是0.8. 溢写线程启动 , 锁定这80MB的内存 , 执行溢写过程. MapTask的输出结果还可以以往剩下的20MB内存中写, 互不影响.
  2. 当溢写线程启动后 , 需要对这80MB空间内的key做排序(sort),排序是MapReduce 模型默认的行为!
    1. 如果job设置过Combiner, 那么现在就是使用Combiner的时候了 , 将有相同key的key/value对的value加起来, 减少溢写到磁盘的数据量.Combiner会优化MapReduce的中间结果 , 所以它在整个模型中会多次使用
    2. 哪些场景才能使用Combiner呢? 从这里分析,Combiner的输出是Reducer的输出 , Combiner决不能改变最终的计算结果场景, 比如,最大值,Combiner的使用一定得慎重, 如果用好, 它对job执行效率有帮助 , 反之会影响reduce的最终结果
  3. 合并溢写文件: 每次溢写在磁盘上生成一个临时文件(写之前判断是否有combiner), 如果map的输出结果很大, 很多次这样的溢写发生, 磁盘上相应的就会有多个临时文件存在, 当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并, 因为最终的文件只有一个, 写入磁盘 , 并且为这个文件提供了索引文件, 以记录每个reduce对应数据的偏移量

MapTask的一些配置

mapreduce.task.io.sort.factor 10
mapreduce.task.io.sort.mb 100
mapreduce.map.sort.spill.percent 0.80

2.MapTask的并行度

  1. MapTask并行度思考
  2. MapTask并行度决定机制

数据块: Block是HDFS物理上把数据分成一块一块.
切分: 数据切分只是逻辑上对输入进行分片 , 并不会在磁盘上将其切分成片存储

1.切片机制源码阅读

image.png

3.ReduceTask工作机制

image.png
Reduce 大致分为copy , sort , reduce 三个阶段,重点在前两个阶段, copy阶段会包含一个eventFetcher来获取已完成的map列表, 由Fetcher线程去copy数据. 在此过程中会启动两个merge线程, 分别是inMemoryMerger和onDiskMerger , 分别将内存中的数据 merge到磁盘和将磁盘中的数据进行merge, 待数据copy完成之后 , copy阶段就开发完成了 , 开始进行sort阶段,sort阶段主要是执行finalMerge , 纯粹的sort阶段 , 完成之后就是reduce阶段 , 调用用户定义的reduce函数进行处理.
详细步骤:

  • Copy阶段, 简单地拉取数据 , Reduce进程启动一些数据copy线程Fetcher , 通过Http方式请求maptask获取自己的文件 ,
  • merge阶段如map端的merge动作 , 只是数组中存放的是不同的map端copy来的数值 , copy过来的数据会先放入内存缓存区中, 这里的缓冲区大小要比map端的更为灵活, merge有三种形式: 内存到内存 ; 内存到磁盘 , 磁盘到磁盘. 默认情况下第一种形式不启用, 当内存中的数据量到达一定阈值 , 就启动内存到磁盘的merge, 与map端类似 , 这也是溢写的过程 , 这个过程中如果你设置有combiner, 也是会启动的, 然后启动第三种磁盘到磁盘merge方式生成最终的文件.
  • 合并排序,把分散的数据合并成一个大的数据后 , 还会对合并后的数据排序.
  • 对排序后的键值对调用reduce方法 , 键相等的键值对调用一次reduce方法, 每次调用会产生零个或者多个键值对 , 最后把这些输出的键值对写入的HDFS文件中.

    4.Reduce Task并行度

    ReduceTask的并行度同样影响整个job的执行并发度和执行效率 , 但与Maptask的并发数由切片数决定不同, ReduceTask数量的决定是可以直接动手设置:
    1. // 默认值是1 , 手动设置值为4
    2. job.setNumReduceTasks(4)
    注意事项
  1. ReduceTask=0 , 表示没有Reduce阶段 , 输出文件数和MapTask数量保持一致;
  2. ReduceTask数量不设置默认就是一个 , 输出文件数量为1个
  3. 如果数据分布不均匀 , 可能在Reduce阶段产生倾斜;

    5.Suffle机制

    map阶段处理的数据如何传递给reduce阶段 , 是MapReduce框架中最关键的一个流程 , 这个流程就叫shuffle.
    shuffle: 洗牌, 发牌—(核心机制 , 数据分区, 排序 , 分组 , combine , 合并等过程)
    MapReduce Shuffle机制
    MapTask的map方法之后 , ReduceTask的redcue方法之前的数据处理过程称之为Shuffle
    会按照key进行分区, 默认分区规则是key的hashcode值%numreducetasks

    1.MapReduce的分区与reduceTask的数量

    在MapReduce中, 通过我们指定分区 , 会将同一个分区的数据发送到同一个reduce当中进行处理(默认是key相同去往同一个分区), 列如我们为了数据的统计 , 我们可以把一批类似的数据发送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据
    如果保证相同的key的数据去往同一个reduce呢?只需要保证相同key的数据分发到同一个分区即可 , 结合以上原理分析我们知道MR程序shuffle机制默认就是这种规则

    1.分区源码

    MR程序默认使用的HashPartitioner, 保证了相同的key去往同一个分区!!!
    image.png

    2.自定义分区

    实际生产中需求变化多端 , 默认分区规则往往不能满足需求 , 需要结合业务逻辑来灵活控制分区规则以及分区数量!!!
    如何定制自己需要的分区规则
    具体步骤

  4. 自定义类继承Partitioner , 重写getPartition方法

  5. 在Driver驱动中 , 指定使用自定义Partitioner
  6. 在Driver驱动中 , 要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量

需求 , 按照不同的appkey把记录输出到不同的分区中
原始日志格式

  1. 0001 A a_001 192.168.10.101 11 11 200
  2. 0002 B a_001 192.168.10.102 11 11 200
  3. 0003 C a_001 192.168.10.103 11 11 200
  4. 0004 D a_001 192.168.10.104 11 11 200
  5. 0004 E a_001 192.168.10.105 11 11 200
  6. 0005 F b_002 192.168.10.106 11 11 200
  7. 0006 G b_002 192.168.10.107 11 11 200
  8. 0007 H b_002 192.168.10.108 11 11 200
  9. 0008 I b_002 192.168.10.109 11 11 200
  10. 0009 M b_002 192.168.10.100 11 11 200

输出结果

根据appKey把不同厂商的日志数据分别输出到不同的文件中

需求分析
面对业务需求 , 结合mr的特点 , 来设计map输出的kv, 以及reduce的kv数据
一个ReduceTask对应一个输出的文件 , 因为在shuffle机制 , 中每个reduceTask拉取的都是某个分区的数据, 分区对应一个输出文件 ,
结合appkey的前缀相同的特点 , 同时不能使用默认分区规则 , 而是使用自定义分区器, 只要appkey前缀同则数据进入同一个分区

整体思路:
Mapper

  1. 读取一行文本, 按照制表符切分
  2. 解析出appkey字段, 其余数据封装为PartitionBean对象 , (实现序列化Writable接口)
  3. 设计map()输出的kv, key—->appkey(依靠该字段完成分区),PartitionBean对象作为Value输出

Partition
自定义分区器, 实现按照appkey字段前缀来区分所属分区

Reduce

  1. reduce()正常输出即可 , 无需进行聚合操作

Driver

  1. 在原先设置job属性的同时增加设置使用自定义分区器
  2. 注意设置reducetask的数量(与分区数量保持一致)

PartitionBean

  1. package com.anda.hadoop.shuffie;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. /**
  7. * 0001 A a_001 192.168.10.101 11 11 200
  8. *
  9. * @author anda
  10. * @since 1.0
  11. */
  12. public class PartitionBean implements Writable {
  13. private String code;
  14. private String codeName;
  15. private String appKey;
  16. private String addressIp;
  17. private String timeDuration;
  18. private String thirdPartDuration;
  19. private String status;
  20. @Override
  21. public void write(DataOutput dataOutput) throws IOException {
  22. dataOutput.writeUTF(code);
  23. dataOutput.writeUTF(codeName);
  24. dataOutput.writeUTF(appKey);
  25. dataOutput.writeUTF(addressIp);
  26. dataOutput.writeUTF(timeDuration);
  27. dataOutput.writeUTF(thirdPartDuration);
  28. dataOutput.writeUTF(status);
  29. }
  30. @Override
  31. public void readFields(DataInput dataInput) throws IOException {
  32. this.code = dataInput.readUTF();
  33. this.codeName = dataInput.readUTF();
  34. this.appKey = dataInput.readUTF();
  35. this.addressIp = dataInput.readUTF();
  36. this.timeDuration = dataInput.readUTF();
  37. this.thirdPartDuration = dataInput.readUTF();
  38. this.status = dataInput.readUTF();
  39. }
  40. public PartitionBean() {
  41. }
  42. public PartitionBean(String code, String codeName, String appKey, String addressIp, String timeDuration, String thirdPartDuration, String status) {
  43. this.code = code;
  44. this.codeName = codeName;
  45. this.appKey = appKey;
  46. this.addressIp = addressIp;
  47. this.timeDuration = timeDuration;
  48. this.thirdPartDuration = thirdPartDuration;
  49. this.status = status;
  50. }
  51. @Override
  52. public String toString() {
  53. return "PartitionBean{" +
  54. "code='" + code + '\'' +
  55. ", codeName='" + codeName + '\'' +
  56. ", appKey='" + appKey + '\'' +
  57. ", addressIp='" + addressIp + '\'' +
  58. ", timeDuration='" + timeDuration + '\'' +
  59. ", thirdPartDuration='" + thirdPartDuration + '\'' +
  60. ", status='" + status + '\'' +
  61. '}';
  62. }
  63. public String getCode() {
  64. return code;
  65. }
  66. public void setCode(String code) {
  67. this.code = code;
  68. }
  69. public String getCodeName() {
  70. return codeName;
  71. }
  72. public void setCodeName(String codeName) {
  73. this.codeName = codeName;
  74. }
  75. public String getAppKey() {
  76. return appKey;
  77. }
  78. public void setAppKey(String appKey) {
  79. this.appKey = appKey;
  80. }
  81. public String getAddressIp() {
  82. return addressIp;
  83. }
  84. public void setAddressIp(String addressIp) {
  85. this.addressIp = addressIp;
  86. }
  87. public String getTimeDuration() {
  88. return timeDuration;
  89. }
  90. public void setTimeDuration(String timeDuration) {
  91. this.timeDuration = timeDuration;
  92. }
  93. public String getThirdPartDuration() {
  94. return thirdPartDuration;
  95. }
  96. public void setThirdPartDuration(String thirdPartDuration) {
  97. this.thirdPartDuration = thirdPartDuration;
  98. }
  99. public String getStatus() {
  100. return status;
  101. }
  102. public void setStatus(String status) {
  103. this.status = status;
  104. }
  105. }

Mapper

  1. package com.anda.hadoop.shuffie;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. /**
  7. * @author anda
  8. * @since 1.0
  9. */
  10. public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
  11. final PartitionBean partitionBean = new PartitionBean();
  12. final Text text = new Text();
  13. @Override
  14. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  15. final String[] fields = value.toString().split(" ");
  16. final String appKey = fields[2];
  17. partitionBean.setCode(fields[0]);
  18. partitionBean.setCodeName(fields[1]);
  19. partitionBean.setAppKey(appKey);
  20. partitionBean.setAddressIp(fields[3]);
  21. partitionBean.setTimeDuration(fields[4]);
  22. partitionBean.setThirdPartDuration(fields[5]);
  23. partitionBean.setStatus(fields[6]);
  24. text.set(appKey);
  25. context.write(text, partitionBean);
  26. }
  27. }

CustomPartitioner

  1. package com.anda.hadoop.shuffie;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. /**
  5. * @author anda
  6. * @since 1.0
  7. */
  8. public class CustomPartitioner extends Partitioner<Text, PartitionBean> {
  9. @Override
  10. public int getPartition(Text text, PartitionBean partitionBean, int i) {
  11. final String keyApp = text.toString();
  12. if (keyApp.equals("a_001")) {
  13. return 1;
  14. }
  15. if (keyApp.equals("b_002")) {
  16. return 2;
  17. }
  18. return 0;
  19. }
  20. }

PartitionReducer

  1. package com.anda.hadoop.shuffie;
  2. import org.apache.hadoop.io.NullWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. * @author anda
  8. * @since 1.0
  9. */
  10. public class PartitionReducer extends Reducer<Text, PartitionBean, NullWritable, PartitionBean> {
  11. @Override
  12. protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
  13. for (PartitionBean value : values) {
  14. context.write(NullWritable.get(), value);
  15. }
  16. }
  17. }

PartitionDriver

  1. package com.anda.hadoop.shuffie;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. /**
  11. * @author anda
  12. * @since 1.0
  13. */
  14. public class PartitionDriver {
  15. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  16. final Configuration configuration = new Configuration();
  17. final Job job = Job.getInstance(configuration);
  18. job.setJarByClass(PartitionDriver.class);
  19. job.setMapperClass(PartitionMapper.class);
  20. job.setReducerClass(PartitionReducer.class);
  21. job.setMapOutputKeyClass(Text.class);
  22. job.setMapOutputValueClass(PartitionBean.class);
  23. job.setOutputKeyClass(NullWritable.class);
  24. job.setOutputValueClass(PartitionBean.class);
  25. job.setPartitionerClass(CustomPartitioner.class);
  26. job.setNumReduceTasks(3);
  27. FileInputFormat.setInputPaths(job, new Path("/Users/anda/Desktop/text_dir/shuffle.txt"));
  28. //FileOutputFormat
  29. //FileOutputFormat
  30. FileOutputFormat.setOutputPath(job, new Path("/Users/anda/Desktop/text_dir/out_data"));
  31. final boolean flag = job.waitForCompletion(true);
  32. System.exit(flag ? 0 : 1);
  33. }
  34. }

总结:

  1. 自定义分区器最好保证分区数量与ReduceTask数量保持一致;
  2. 如果分区数量不止1 , 但是ReduceTask数量1 , 此时只会输出一个文件.
  3. 如果ReduceTask数量大于分区数量, 但是输出多个空文件
  4. 如果ReduceTask数量小于分区数量 , 有可能会报错

3.MapReduce 中的Combiner

image.png

  1. Combiner是MR程序中的mapper 和Reducer之外的一种组件
  2. Combiner组件的父类就是Reducer
  3. Combiner和Reducer的区别在于运行的位置
  4. Combiner是在每一个maptask所在的节点运行;
  5. Combiner的意义就是对每个maptask的输出进行局部汇总,以减少网络传输量
  6. Combiner能够应用的前提是不能影响最终的业务逻辑 , 此外 , Combiner的输出kv, 应该跟reducer的输入kv类型要对应起来.

举列说明
假设一个计算平均值的MR任务
Map阶段
2个MapTask
MapTask1输出数据: 10 / 5 / 15, 如果使用Combiner:(10+5+15)/3 = 10
MapTask2输出数据: 2 / 6 , 如果使用Combiner( 2 + 6 ) / 2 = 4
Reduce阶段汇总
(10+4)/2=7
而正确结果
(10+5+15+2+6)/5=7.6

  • 自定义Combiner实现步骤
    • 自定义一个Combiner继承Reducer, 重写Reduce方法
    • 在驱动(Driver)设置使用Combiner(默认是不适用Combiner组件)

1.改造WordCount程序

  1. package com.anda.hadoop.mapper;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. * @author anda
  8. * @since 1.0
  9. */
  10. public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
  11. IntWritable total = new IntWritable();
  12. @Override
  13. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  14. int sum = 0;
  15. for (IntWritable value : values) {
  16. int i = value.get();
  17. sum += i;
  18. }
  19. total.set(sum);
  20. context.write(key, total);
  21. }
  22. }
  1. <br />在驱动(Driver)设置使用Combiner
  1. job.setCombinerClass(WordCountCombiner.class);

image.pngimage.png

如果直接私用WordCountReducer作为Combiner使用是否可以 直接使用Reducer作为Combiner组件使用时可以的

6.MapReduce中的排序

排序是MapReduce框架中最重要的操作之一.
MapTask和ReduceTask均会堆数据按照key进行排序, 该操作属于Hadoop的默认行为, 任何应用程序中的数据均会被排序 , 不管逻辑上, 是否需要, 默认排序是按照字典顺序排序 , 且实现该排序的方法是快速排序

  • MapTask
    • 它会处理的结果暂时放到环形缓冲区中, 当环形缓冲区使用率达到一定阈值后 , 再对缓冲区中的数据进行一次快速排序 , 并将这些有序数据溢写到磁盘上
    • ReduceTask当所有的数据拷贝完毕后 , ReduceTask统一堆内存上磁盘上的所有数据进行一次归并排序.
      • 部分排序. MapReduce根据输入记录的键对数据集排序, 保证输出的每个文件内部有序
      • 全排序. 最终输出结果只有一个文件, 且文件内部有序 , 实现方式是只设置一个ReduceTask . 但该方法在处理大型文件时效率极低, 因为一台机器处理所有文件 , 完全丧失MapReduce所提供的并行架构
      • 辅助排序:(GroupingComparator分组): 在Reduce端堆key进行分组. 应用于: 接收的key为bean对象时 , 想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时, 采用分组排序.

        1.WritableComparable

        Bean对象如果作为Map输出的key时, 需要实现WritableComparable接口并重写compareTo方法指定排序规则
        全排序
        基于统计的播放时长案例的输出结果对总时长进行排序
        实现全局排序只能设置一个ReduceTask!!!
        播放时长案例输出结果
        1. 00fdaf3 33180 33420 00fdaf3 66600
        2. 00wersa4 30689 35191 00wersa4 65880
        3. 0a0fe2 43085 44254 0a0fe2 87339
        4. 0ad0s7 31702 29183 0ad0s7 60885
        5. 0sfs01 31883 29101 0sfs01 60984
        6. a00df6s 33239 36882 a00df6s 70121
        7. adfd00fd5 30727 31491 adfd00fd5 62218
        需求分析
        如何设计map()方法输出的key,value
        MR框架中shuffle阶段的排序是默认行为, 不管你是否都会进行排序.
        key: 把所有字段封装成一个bean对象 , 并且指定bean对象作为key输出 , 如果作为key输出, 需要实现排序接口, 指定自己的排序规则;
        具体步骤
        Mapper
  1. 读取结果文件, 按照制表符进行切分
  2. 解析出相应字段封装为SpeakBean
  3. SpeakBean实现WritableComparable接口重写compareTo方法
  4. map()方法输出k,v; key—>SpeakBean, value—>NullWritable.get()

Reducer

  1. 循环遍历输出

Mapper代码

  1. package com.anda.hadoop.sort;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. /**
  8. * @author anda
  9. * @since 1.0
  10. */
  11. public class SortMapper extends Mapper<LongWritable, Text, SpeakBeanSort, NullWritable> {
  12. Text k = new Text();
  13. @Override
  14. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  15. String[] fields = value.toString().split(" ");
  16. SpeakBeanSort speakBeanSort = new SpeakBeanSort(
  17. Long.parseLong(fields[1]),
  18. Long.parseLong(fields[2]),
  19. fields[0]
  20. );
  21. context.write(speakBeanSort, NullWritable.get());
  22. }
  23. }

Reducer 代码

  1. package com.anda.hadoop.sort;
  2. import org.apache.hadoop.io.NullWritable;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. /**
  6. * @author anda
  7. * @since 1.0
  8. */
  9. public class SortReducer extends Reducer<SpeakBeanSort, NullWritable, NullWritable, SpeakBeanSort> {
  10. @Override
  11. protected void reduce(SpeakBeanSort key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  12. for (NullWritable value : values) {
  13. context.write(value, key);
  14. }
  15. }
  16. }
  1. ```java
  2. package com.anda.hadoop.sort;
  3. import org.apache.hadoop.io.WritableComparable;
  4. import java.io.DataInput;
  5. import java.io.DataOutput;
  6. import java.io.IOException;
  7. /**
  8. * @author anda
  9. * @since 1.0
  10. */
  11. public class SpeakBeanSort implements WritableComparable<SpeakBeanSort> {
  12. // 定义属性
  13. private Long selfDuration; // 自有内容时长
  14. private Long thirdPartDuration; // 第三方内容时长
  15. private String deviceId; // 设备Id
  16. private Long sumDuration; // 总时长
  17. // 准备一个空参构造
  18. public SpeakBeanSort() {
  19. }
  20. public SpeakBeanSort(Long selfDuration, Long thirdPartDuration, String deviceId) {
  21. this.selfDuration = selfDuration;
  22. this.thirdPartDuration = thirdPartDuration;
  23. this.deviceId = deviceId;
  24. this.sumDuration = this.selfDuration + this.thirdPartDuration;
  25. }
  26. @Override
  27. public void write(DataOutput dataOutput) throws IOException {
  28. dataOutput.writeLong(selfDuration);
  29. dataOutput.writeLong(thirdPartDuration);
  30. dataOutput.writeUTF(deviceId);
  31. dataOutput.writeLong(sumDuration);
  32. }
  33. @Override
  34. public void readFields(DataInput dataInput) throws IOException {
  35. this.selfDuration = dataInput.readLong();
  36. this.thirdPartDuration = dataInput.readLong();
  37. this.deviceId = dataInput.readUTF();
  38. this.sumDuration = dataInput.readLong();
  39. }
  40. @Override
  41. public String toString() {
  42. return "SpeakBeanSort{" +
  43. "selfDuration=" + selfDuration +
  44. ", thirdPartDuration=" + thirdPartDuration +
  45. ", deviceId='" + deviceId + '\'' +
  46. ", sumDuration=" + sumDuration +
  47. '}';
  48. }
  49. public Long getSelfDuration() {
  50. return selfDuration;
  51. }
  52. public void setSelfDuration(Long selfDuration) {
  53. this.selfDuration = selfDuration;
  54. }
  55. public Long getThirdPartDuration() {
  56. return thirdPartDuration;
  57. }
  58. public void setThirdPartDuration(Long thirdPartDuration) {
  59. this.thirdPartDuration = thirdPartDuration;
  60. }
  61. public String getDeviceId() {
  62. return deviceId;
  63. }
  64. public void setDeviceId(String deviceId) {
  65. this.deviceId = deviceId;
  66. }
  67. public Long getSumDuration() {
  68. return sumDuration;
  69. }
  70. public void setSumDuration(Long sumDuration) {
  71. this.sumDuration = sumDuration;
  72. }
  73. @Override
  74. public int compareTo(SpeakBeanSort o) {
  75. if (this.sumDuration > o.getSumDuration()) {
  76. return 1;
  77. } else if (this.sumDuration < o.getSumDuration()) {
  78. return -1;
  79. }
  80. return 0;
  81. }
  82. }

Driver代码

  1. package com.anda.hadoop.sort;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. /**
  9. * @author anda
  10. * @since 1.0
  11. */
  12. public class SortDriver {
  13. public static void main(String[] args) throws Exception {
  14. Configuration configuration = new Configuration();
  15. Job job = Job.getInstance(configuration);
  16. // 指定本程序的jar包所在的本地路径
  17. job.setJarByClass(SortDriver.class);
  18. // 指定本业务job要使用的mapper/reducer业务类
  19. job.setMapperClass(SortMapper.class);
  20. job.setReducerClass(SortReducer.class);
  21. // 指定mapper输出的数据kv类型
  22. job.setMapOutputKeyClass(SpeakBeanSort.class);
  23. job.setMapOutputValueClass(NullWritable.class);
  24. // 指定最终输出类型
  25. job.setOutputKeyClass(NullWritable.class);
  26. job.setOutputValueClass(SpeakBeanSort.class);
  27. job.setNumReduceTasks(1);
  28. // 指定job的输出原始文件位置所在目录
  29. FileInputFormat.setInputPaths(job, new Path("/Users/anda/Desktop/text_dir/sort.txt"));
  30. FileOutputFormat.setOutputPath(job, new Path("/Users/anda/Desktop/text_dir/sort_data"));
  31. boolean result = job.waitForCompletion(true);
  32. System.exit(result ? 0 : 1);
  33. }
  34. }

2.GroupingComparator

GroupingComparator是mapreduce当中reduce端的一个功能组件 , 主要的作用是决定哪些数据作为一组, 调用一次reduce的逻辑 , 默认是每个不同的key, 作为多个不同的组 , 每个组调用一次reduce逻辑 , 我们可以自定义GroupingComparator实现不同的key作为同一个组 , 调用一次reduce逻辑.

  1. 需求

原始需求

  1. Order_0000001 Pdt_01 222.8
  2. Order_0000001 Pdt_05 25.8
  3. Order_0000002 Pdt_03 522.8
  4. Order_0000002 Pdt_04 122.4
  5. Order_0000002 Pdt_05 722.4
  6. Order_0000003 Pdt_01 232.8

需要求出每一个订单中成交金额最大的一笔交易

  1. 实现思路

Mapper

  • 读取一行文本数据 , 切分出每个字段;
  • 订单Id和金额封装成一个Bean对象, Bean对象的排序规则指定为先按照订单Id排序 , 订单Id相等再按照金额降序排;
  • map()方法输出kv,key—>bean对象 , value->NullWritable.get();

Shuffle

  • 指定分区器 , 保证相同订单Id的数据去往同个分区(自定义分区器)
    • 指定GroupingComparator, 分组规则指定只要订单Id相同则认为属于同一组;

Reduce

  • 每个reduce()方法写出一组key的第一个

参考代码

  • OrderBean

OrderBean定义两个字段 , 一个字段是orderId,第二个字段是金额(注意金额一定要使用Double或者DoubleWritable类型 , 否则没法按照金额顺序排序)
排序规则指定先按照订单Id排序 , 订单Id相等再按照金额降序排序!!

  1. package com.anda.hadoop.group;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. /**
  7. * @author anda
  8. * @since 1.0
  9. */
  10. public class OrderBean implements WritableComparable<OrderBean> {
  11. private String orderId;
  12. private Double price;
  13. public OrderBean(){}
  14. @Override
  15. public int compareTo(OrderBean o) {
  16. int i = this.orderId.compareTo(o.orderId);
  17. if (i == 0) {
  18. return -this.price.compareTo(o.price);
  19. }
  20. return i;
  21. }
  22. @Override
  23. public void write(DataOutput dataOutput) throws IOException {
  24. dataOutput.writeUTF(this.orderId);
  25. dataOutput.writeDouble(this.price);
  26. }
  27. @Override
  28. public void readFields(DataInput dataInput) throws IOException {
  29. this.orderId = dataInput.readUTF();
  30. this.price = dataInput.readDouble();
  31. }
  32. public String getOrderId() {
  33. return orderId;
  34. }
  35. public void setOrderId(String orderId) {
  36. this.orderId = orderId;
  37. }
  38. public Double getPrice() {
  39. return price;
  40. }
  41. public void setPrice(Double price) {
  42. this.price = price;
  43. }
  44. @Override
  45. public String toString() {
  46. return "OrderBean{" +
  47. "orderId='" + orderId + '\'' +
  48. ", price=" + price +
  49. '}';
  50. }
  51. }
  • 自定义分区器

保证ID相同的订单去往同个分区最终去往同一个Reduce中

  1. package com.anda.hadoop.group;
  2. import org.apache.hadoop.io.NullWritable;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. /**
  5. * @author anda
  6. * @since 1.0
  7. */
  8. public class CustomPartitioner extends Partitioner<OrderBean, NullWritable> {
  9. @Override
  10. public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
  11. // 自定义分区 , 将相同订单Id的数据发送到同一个reduce里面去
  12. return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % i;
  13. }
  14. }
  • 自定义GroupingComparator

保证id相同的订单进入一个分组中, 进入分组的数据已经按照金额降序排序 , reduce()方法取出第一个即是金额最高的交易

  1. package com.anda.hadoop.group;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import org.apache.hadoop.io.WritableComparator;
  4. /**
  5. * @author anda
  6. * @since 1.0
  7. */
  8. public class CustomGroupingComparator extends WritableComparator {
  9. public CustomGroupingComparator() {
  10. super(OrderBean.class, true);
  11. }
  12. @Override
  13. public int compare(WritableComparable a, WritableComparable b) {
  14. OrderBean first = (OrderBean) a;
  15. OrderBean second = (OrderBean) b;
  16. final int i = first.getOrderId().compareTo(second.getOrderId());
  17. if (i == 0) {
  18. System.out.println(first.getOrderId() + "-----" + second.getOrderId());
  19. }
  20. return i;
  21. }
  22. }
  • Mapper ```java package com.anda.hadoop.group;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

  • @author anda
  • @since 1.0 */ public class GroupMapper extends Mapper {

    @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    1. // 读取一行文本 , 然后切分
    2. final String[] fields = value.toString().split(" ");
    3. OrderBean orderBean = new OrderBean();
    4. orderBean.setOrderId(fields[0]);
    5. orderBean.setPrice(Double.parseDouble(fields[2]));
    6. context.write(orderBean, NullWritable.get());

    } }

  1. - Reducer
  2. ```java
  3. package com.anda.hadoop.group;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. import java.io.IOException;
  7. /**
  8. * @author anda
  9. * @since 1.0
  10. */
  11. public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
  12. @Override
  13. protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  14. context.write(key, NullWritable.get());
  15. }
  16. }
  • Driver ```java package com.anda.hadoop.group;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

  • @author anda
  • @since 1.0 */ public class GroupDriver {

    public static void main(String[] args) throws Exception {

    1. Configuration configuration = new Configuration();
    2. Job job = Job.getInstance(configuration);
    3. job.setJarByClass(GroupDriver.class);
    4. job.setMapperClass(GroupMapper.class);
    5. job.setReducerClass(GroupReducer.class);
    6. job.setOutputKeyClass(OrderBean.class);
    7. job.setOutputValueClass(NullWritable.class);
    8. job.setMapOutputKeyClass(OrderBean.class);
    9. job.setMapOutputValueClass(NullWritable.class);
    10. FileInputFormat.setInputPaths(job, new Path("/Users/anda/Desktop/text_dir/group.txt"));
    11. FileOutputFormat.setOutputPath(job, new Path("/Users/anda/Desktop/text_dir/group_data"));
    12. job.setPartitionerClass(CustomPartitioner.class);
    13. //job.setGroupingComparatorClass(CustomGroupingComparator.class);
    14. job.setNumReduceTasks(3);
    15. boolean result = job.waitForCompletion(true);
    16. System.out.println(result ? 0 : 1);
    17. boolean b = job.waitForCompletion(true);
    18. System.out.println(b ? 0 : 1);

    } }

  1. <a name="dyr6l"></a>
  2. ## 7. MapReduce Join实战
  3. <a name="Rxb1Q"></a>
  4. ## 1.MR Reduce 端join
  5. <a name="rWeov"></a>
  6. ### 1.需求分析
  7. - 需求
  8. 投递行为数据表deliver_info:
  9. ```latex
  10. 1001 177725422 2020-01-03
  11. 1002 177725422 2020-01-04
  12. 1003 177725433 2020-01-03

职位表position:

177725422 产品经理 177725433 大数据开发工程师

假如数据量巨大 , 两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算

2.代码实现

通过将关联的条件作为map输出的key, 将两表满足join条件的数据并携带数据来源的文件信息 , 发往同一个reduce task , 在reduce中进行数据的串联.
Driver

  1. package com.anda.hadoop.join_reduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. /**
  10. * @author anda
  11. * @since 1.0
  12. */
  13. public class ReduceJoinDriver {
  14. public static void main(String[] args) throws Exception {
  15. // 1.获取配置文件对象, 获取job对象实例
  16. final Configuration configuration = new Configuration();
  17. final Job job = Job.getInstance(configuration);
  18. // 2.指定程序jar的本地路径
  19. job.setJarByClass(ReduceJoinDriver.class);
  20. // 3.指定mapper输出的kv数据类型
  21. job.setMapperClass(ReduceJoinMapper.class);
  22. job.setReducerClass(ReduceJoinReducer.class);
  23. // 4.指定Mapper输出的kv数据类型
  24. job.setMapOutputKeyClass(Text.class);
  25. job.setMapOutputValueClass(DeliverBean.class);
  26. // 5.指定最终输出的kv数据类型
  27. job.setOutputKeyClass(DeliverBean.class);
  28. job.setOutputValueClass(NullWritable.class);
  29. // 6.指定job输入结果路径
  30. FileInputFormat.setInputPaths(job, new Path("/Users/anda/Desktop/text_dir/join_mapper_reduce"));
  31. // 7.指定job输出结果路径
  32. FileOutputFormat.setOutputPath(job, new Path("/Users/anda/Desktop/text_dir/join_mapper_reduce/out_join"));
  33. // 8.提交作业
  34. boolean flag = job.waitForCompletion(true);
  35. System.exit(flag ? 0 : 1);
  36. }
  37. }

Mapper

  1. package com.anda.hadoop.join_reduce;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  6. import java.io.IOException;
  7. /**
  8. * @author anda
  9. * @since 1.0
  10. */
  11. public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, DeliverBean> {
  12. String fileName;
  13. DeliverBean deliverBean = new DeliverBean();
  14. Text text = new Text();
  15. @Override
  16. protected void setup(Context context) throws IOException, InterruptedException {
  17. // 1.获取输入文件切片
  18. FileSplit fileSplit = (FileSplit) context.getInputSplit();
  19. // 2.获取输入文件名称
  20. fileName = fileSplit.getPath().getName();
  21. }
  22. @Override
  23. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  24. // 1.获取输入数据
  25. String line = value.toString();
  26. // 2.不同文件分别处理
  27. if (fileName.startsWith("deliver_info")) {
  28. // 2.1切割
  29. String[] fields = line.split(" ");
  30. // 2.2封装了bean对象
  31. deliverBean.setUserId(fields[0]);
  32. deliverBean.setPositionId(fields[1]);
  33. deliverBean.setDate(fields[2]);
  34. deliverBean.setPositionName("");
  35. deliverBean.setFlag("deliver");
  36. text.set(fields[1]);
  37. } else {
  38. // 2.3.切割
  39. String[] fields = line.split(" ");
  40. // 2.4封装bean对象
  41. deliverBean.setPositionId(fields[0]);
  42. deliverBean.setPositionName(fields[1]);
  43. deliverBean.setUserId("");
  44. deliverBean.setDate("");
  45. deliverBean.setFlag("position");
  46. text.set(fields[0]);
  47. }
  48. context.write(text, deliverBean);
  49. }
  50. }

Reducer

  1. package com.anda.hadoop.join_reduce;
  2. import org.apache.commons.beanutils.BeanUtils;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. import java.io.IOException;
  7. import java.util.ArrayList;
  8. /**
  9. * @author anda
  10. * @since 1.0
  11. */
  12. public class ReduceJoinReducer extends Reducer<Text, DeliverBean, DeliverBean, NullWritable> {
  13. @Override
  14. protected void reduce(Text key, Iterable<DeliverBean> values, Context context) throws IOException, InterruptedException {
  15. // 1.准备投递行为数据的集合
  16. ArrayList<DeliverBean> doBeans = new ArrayList<>();
  17. values.forEach(System.out::println);
  18. // 2.准备bean对象
  19. DeliverBean pBean = new DeliverBean();
  20. for (DeliverBean bean : values) {
  21. if ("deliver".equals(bean.getFlag())) {
  22. DeliverBean dBean = new DeliverBean();
  23. try {
  24. BeanUtils.copyProperties(dBean, bean);
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. doBeans.add(dBean);
  29. } else {
  30. try {
  31. BeanUtils.copyProperties(pBean, bean);
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. }
  37. for (DeliverBean bean : doBeans) {
  38. bean.setPositionName(pBean.getPositionName());
  39. context.write(bean, NullWritable.get());
  40. }
  41. }
  42. }

Bean

  1. package com.anda.hadoop.join_reduce;
  2. import com.google.common.base.Objects;
  3. import org.apache.hadoop.io.Writable;
  4. import java.io.DataInput;
  5. import java.io.DataOutput;
  6. import java.io.IOException;
  7. /**
  8. * @author anda
  9. * @since 1.0
  10. */
  11. public class DeliverBean implements Writable {
  12. private String userId;
  13. private String positionId;
  14. private String date;
  15. private String positionName;
  16. private String flag;
  17. public DeliverBean() {
  18. }
  19. public DeliverBean(String userId, String positionId, String date, String positionName, String flag) {
  20. this.userId = userId;
  21. this.positionId = positionId;
  22. this.date = date;
  23. this.positionName = positionName;
  24. this.flag = flag;
  25. }
  26. @Override
  27. public void write(DataOutput dataOutput) throws IOException {
  28. dataOutput.writeUTF(this.userId);
  29. dataOutput.writeUTF(this.positionId);
  30. dataOutput.writeUTF(this.date);
  31. dataOutput.writeUTF(this.positionName);
  32. dataOutput.writeUTF(this.flag);
  33. }
  34. @Override
  35. public void readFields(DataInput dataInput) throws IOException {
  36. this.userId = dataInput.readUTF();
  37. this.positionId = dataInput.readUTF();
  38. this.date = dataInput.readUTF();
  39. this.positionName = dataInput.readUTF();
  40. this.flag = dataInput.readUTF();
  41. }
  42. @Override
  43. public String toString() {
  44. return Objects.toStringHelper(this)
  45. .add("userId", userId)
  46. .add("positionId", positionId)
  47. .add("date", date)
  48. .add("positionName", positionName)
  49. .add("flag", flag)
  50. .toString();
  51. }
  52. public String getUserId() {
  53. return userId;
  54. }
  55. public void setUserId(String userId) {
  56. this.userId = userId;
  57. }
  58. public String getPositionId() {
  59. return positionId;
  60. }
  61. public void setPositionId(String positionId) {
  62. this.positionId = positionId;
  63. }
  64. public String getDate() {
  65. return date;
  66. }
  67. public void setDate(String date) {
  68. this.date = date;
  69. }
  70. public String getPositionName() {
  71. return positionName;
  72. }
  73. public void setPositionName(String positionName) {
  74. this.positionName = positionName;
  75. }
  76. public String getFlag() {
  77. return flag;
  78. }
  79. public void setFlag(String flag) {
  80. this.flag = flag;
  81. }
  82. }

2.MR Map端join

1.需求分析

使用与关联表有小表的情形;
可以将小表分发到所有的map节点, 这样map节点可以在本地对自己所读到的达表数据进行join并输出最终结果, 可以大大提高join操作的并发度 , 加快处理速度

2.代码实现

  • 在Mapper的setup阶段, 将文件读取到缓存集合中
  • 在驱动函数中加载缓存

// 缓存普通文件到Task运行节点
job.addCacheFile(new URI(“file:///Users/…”))

Dirver

  1. package com.anda.hadoop.join_map;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.net.URI;
  10. /**
  11. * @author anda
  12. * @since 1.0
  13. */
  14. public class MapDeliverBeanJoinDriver {
  15. public static void main(String[] args) throws Exception {
  16. Configuration configuration = new Configuration();
  17. Job job = Job.getInstance(configuration);
  18. job.setJarByClass(MapDeliverBeanJoinDriver.class);
  19. job.setMapperClass(MapDeliverBeanJoinMap.class);
  20. job.setOutputKeyClass(Text.class);
  21. job.setOutputValueClass(NullWritable.class);
  22. FileInputFormat.setInputPaths(job, new Path("/Users/anda/Desktop/text_dir/join_mapper_reduce/deliver_info.txt"));
  23. FileOutputFormat.setOutputPath(job, new Path("/Users/anda/Desktop/text_dir/join_mapper_reduce/map_join_out"));
  24. job.addCacheFile(new URI("file:///Users/anda/Desktop/text_dir/join_mapper_reduce/position_info.txt"));
  25. job.setNumReduceTasks(0);
  26. boolean flag = job.waitForCompletion(true);
  27. System.exit(flag ? 0 : 1);
  28. }
  29. }

Bean

  1. package com.anda.hadoop.join_map;
  2. import com.google.common.base.Objects;
  3. import org.apache.hadoop.io.Writable;
  4. import java.io.DataInput;
  5. import java.io.DataOutput;
  6. import java.io.IOException;
  7. /**
  8. * @author anda
  9. * @since 1.0
  10. */
  11. public class MapDeliverBean implements Writable {
  12. private String userId;
  13. private String positionId;
  14. private String date;
  15. private String positionName;
  16. private String flag;
  17. public MapDeliverBean() {
  18. }
  19. public MapDeliverBean(String userId, String positionId, String date, String positionName, String flag) {
  20. this.userId = userId;
  21. this.positionId = positionId;
  22. this.date = date;
  23. this.positionName = positionName;
  24. this.flag = flag;
  25. }
  26. @Override
  27. public void write(DataOutput dataOutput) throws IOException {
  28. dataOutput.writeUTF(this.userId);
  29. dataOutput.writeUTF(this.positionId);
  30. dataOutput.writeUTF(this.date);
  31. dataOutput.writeUTF(this.positionName);
  32. dataOutput.writeUTF(this.flag);
  33. }
  34. @Override
  35. public void readFields(DataInput dataInput) throws IOException {
  36. this.userId = dataInput.readUTF();
  37. this.positionId = dataInput.readUTF();
  38. this.date = dataInput.readUTF();
  39. this.positionName = dataInput.readUTF();
  40. this.flag = dataInput.readUTF();
  41. }
  42. @Override
  43. public String toString() {
  44. return Objects.toStringHelper(this)
  45. .add("userId", userId)
  46. .add("positionId", positionId)
  47. .add("date", date)
  48. .add("positionName", positionName)
  49. .add("flag", flag)
  50. .toString();
  51. }
  52. }

Mapper

  1. package com.anda.hadoop.join_map;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import java.io.BufferedReader;
  8. import java.io.FileInputStream;
  9. import java.io.IOException;
  10. import java.io.InputStreamReader;
  11. import java.nio.charset.StandardCharsets;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. /**
  15. * @author anda
  16. * @since 1.0
  17. */
  18. public class MapDeliverBeanJoinMap extends Mapper<LongWritable, Text, Text, NullWritable> {
  19. Map<String, String> pMap = new HashMap();
  20. Text text = new Text();
  21. @Override
  22. protected void setup(Context context) throws IOException, InterruptedException {
  23. // 1. 获取缓存文件
  24. BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("position_info.txt"), StandardCharsets.UTF_8));
  25. String line;
  26. while (StringUtils.isNotBlank(line = reader.readLine())) {
  27. // 2.切割
  28. String[] fields = line.split(" ");
  29. // 3.缓存数据
  30. pMap.put(fields[0], fields[1]);
  31. }
  32. // 4 关流
  33. reader.close();
  34. }
  35. @Override
  36. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  37. // 1.获取一行
  38. String line = value.toString();
  39. // 2.截取
  40. String[] fields = line.split(" ");
  41. // 3.获取职位id
  42. String pId = fields[1];
  43. // 4.获取职位名称
  44. String pName = pMap.get(pId);
  45. // 4.拼接
  46. text.set(line + " " + pName);
  47. // 5.写出
  48. context.write(text, NullWritable.get());
  49. }
  50. }

3.数据倾斜解决方案

什么是数据倾斜

  • 数据倾斜无非就是大量的key被partition分配到同一个分区里,
  • 绝大多数task执行都非常快 , 但个别task执行慢, 甚至慢

通用解决方案:
对key增加随机数

1.InputFormat

运行MapReduce程序时 , 输入的文件格式包括: 基于行的日志文件, 二进制格式文件 , 数据库表等, 那么正对于不同的数据类型 , MapReduce是如何读取这些数据的呢?
InputFormat是MapReduce框架用来读取数据的类.
InputFormat常见子类包括:

  • TextInputFormat(普通文本文件, MR框架默认的读取实现类型)
  • KeyValueTextInputFormat(读取一行文本数据按照指定分隔符 , 把数据封装为kv类型)
  • NLineInputFormat(读取数据按照行数进行划分分片)
  • CombineTextInputFormat(合并小文件,避免启动过多MapTask任务)
  • 自定义InputFormat
  1. CombineTextInputFormat案例

MR框架默认的TextInputFormat切片机制按文件划分片 , 文件无论多小 , 都是一个单独的切片, 然后由一个MapTask处理 , 如果有大量小文件 , 就对应的会生成并启动大量的MapTask, 而每个MapTask处理的数据量很小大量时间浪费在初始化资源启动回收等阶段, 这种方式导致资源利用率不高.
CombineTextInputFormat用于小文件过多的场景, 他可以将多个小文件从逻辑上划分成一个切面, 这样多个小文件就可以交给MapTask处理, 提高资源利用率.
需求
将输入数据中的多个小文件合并为一个切片处理

  • 运行WordCount案例 , 准备多个小文件

具体使用方式

  1. // 设置mapreduce读取数据的类型 , 如果不设置InputFormat 它默认用的是TextInputFormat.class
  2. job.setInputFormatClass(CombineTextInputFormat.class);
  3. // 设置虚拟存储切片的最大值设置4M
  4. CombineTextInputFormat.setMaxInputSplitSize(job, 1024 * 6 * 1024);

验证切片数量的变化

  • CombineTextInputFormat切片原理

切片生成过程分为两部分, 虚拟存储过程和切片过程
假设设置setMaxInputSplitSize的值为6M
17个小文件
image.png

  • 虚拟存储过程: 把输入目录下所有的文件大小 , 依次和设置的setMaxInputSplitSize值进行比较 , 如果不大于设置的最大值 , 逻辑上划分一个. 如果输入文件大于设置的最大值且大于两倍,此时文件将均分为2个虚拟存储块(防止出现太小切片)

比如setMaxInputSplitSize值为4M , 输入文件大小为8.02M,则逻辑上分出一个4M的块, 剩余的大小为4.02M,如果按照4M逻辑划分, 就会出现0.02M的非常小的虚拟存储文件,所以将剩余的4.02M切割成(2.01M和2.01M)两个文件.

  • 切片的过程中
    • 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值 , 大于等于则单独形成一个切片
    • 如果不大于则跟下一个虚拟存储文件进行合并 , 共同形成一个切片
    • 按照输入的文件, 3.1M 995KB 这十七个小文件

最终会形成9个切片
前面十六个都会是(3.1M+3.1M) 共有8分 , 最后一个995KB单独是一份
image.png

注意: 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值

1.自定义InputFormat

2.OutputFormat

1.自定义OutputFormat