- MapReduce概述
- Hadoop序列化
- MapReduce框架原理
MapReduce概述
MapReduce定义
- MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
MapReduce优缺点
优点
MapReduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
适合PB级以上海量数据的离线处理
-
缺点
不擅长实时计算
MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
不擅长流式计算(Sparkstream、Flink)
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
不擅长DAG(有向无环图)计算(Spark)
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
MapReduce核心思想

分布式的运算程序往往需要分成至少2个阶段。
- 第一个阶段的MapTask并发实例,完全并行运行,互不相干。
- 第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
- MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
总结:分析WordCount数据流走向深入理解MapReduce核心思想。
MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
采用反编译工具反编译源码,发现WordCount案例有Map类、Reduce类和驱动类。且数据的类型是Hadoop自身封装的序列化类型。
- 源码位于
/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar - 反编译后查看到
WordCount文件 ```java package org.apache.hadoop.examples;
import java.io.IOException; import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper extends Mapper
private final static IntWritable one = new IntWritable(1);private Text word = new Text();//mappublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}
}
//reduce
public static class IntSumReducer
extends Reducer
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}
}
//demo
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println(“Usage: wordcount
<a name="rEZ6x"></a>## 常用数据序列化类型| **Java类型** | **Hadoop Writable类型** || --- | --- || Boolean | BooleanWritable || Byte | ByteWritable || Int | IntWritable || Float | FloatWritable || Long | LongWritable || Double | DoubleWritable || String | Text || Map | MapWritable || Array | ArrayWritable || Null | NullWritable |<a name="pJgIn"></a>## MapReduce编程规范> 用户编写的程序分成三个部分:Mapper、Reducer和Driver<a name="ctd9q"></a>### Mapper- 用户自定义的Mapper要继承自己的父类- Mapper的输入数据是KV对的形式(KV的类型可自定义)- Mapper中的业务逻辑写在map()方法中- Mapper的输出数据是KV对的形式(KV的类型可自定义)- map()方法(MapTask进程)对每一个<K,V>调用一次- 相同的key,会整合在一起 `<key, <value1, value2>>`的格式<a name="fPbME"></a>### Reducer阶段- 用户自定义的Mapper要继承自己的父类- Reducer的输入数据类型对应Mapper的输出数据类型,也是KV- Reducer的业务逻辑写在reduce()方法中- ReduceTask进程对每一组相同K的<K,V>组调用一次reduce()方法<a name="z809K"></a>### Driver阶段- 相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。<a name="VJQjd"></a>## WordCount案例实操<a name="DKaC1"></a>### 本地测试<a name="vA3uy"></a>#### 需求:在给定的文本文件中统计输出每一个单词出现的总次数- 输入数据- [hello.txt](https://www.yuque.com/attachments/yuque/0/2022/txt/25955514/1649400306402-ca570c2d-c48b-42da-874c-58ac609f346b.txt?_lake_card=%7B%22src%22%3A%22https%3A%2F%2Fwww.yuque.com%2Fattachments%2Fyuque%2F0%2F2022%2Ftxt%2F25955514%2F1649400306402-ca570c2d-c48b-42da-874c-58ac609f346b.txt%22%2C%22name%22%3A%22hello.txt%22%2C%22size%22%3A60%2C%22type%22%3A%22text%2Fplain%22%2C%22ext%22%3A%22txt%22%2C%22status%22%3A%22done%22%2C%22taskId%22%3A%22u0f57ebdd-bf5a-44d0-b677-976e713866f%22%2C%22taskType%22%3A%22upload%22%2C%22id%22%3A%22ub245edf2%22%2C%22card%22%3A%22file%22%7D)- - 期望输出数据- atguigu 2- banzhang 1- cls 2- hadoop 1- jiao 1- ss 2- xue 1<a name="ZncAL"></a>#### 需求分析- 按照MapReduce编程规范,分别编写mapper、Reducer、Driver<a name="AyIZi"></a>#### 环境准备```xml<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency></dependencies>
编写程序
- 编写Mapper类 ```java package com.foreign.wordcount;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/8 2:53 下午
*/
/**
- LongWritable:Map阶段输入的key的类型
- Text:Map阶段输入value的类型
- Text:Map阶段输出的key类型
IntWritable:Map阶段输出的value类型 */ public class WordCountMapper extends Mapper
{ Text k = new Text(); IntWritable v = new IntWritable(1);
@Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { //获取一行String line = value.toString();//切割String[] words = line.split(" ");//输出for (String word : words) {k.set(word);context.write(k, v);}
} } ```
- 编写Reducer类型 ```java package com.foreign.wordcount;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/8 3:12 下午
*/ public class WordCountReducer extends Reducer
{ int sum; IntWritable v = new IntWritable();
@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {//累加求和sum = 0;for (IntWritable value : values) {sum += value.get();}//输出v.set(sum);context.write(key, v);}
}
- 编写Driver驱动类```javapackage com.foreign.wordcount;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author fangke* @Description:* @Package* @date: 2022/4/8 3:18 下午* <p>*/public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//获取配置信息以及获取job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);//关联本driver程序的jarjob.setJarByClass(WordCountDriver.class);//关联Mapper和Reducer的jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//设置Mapper输出的key和valuejob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置最终输出的key和value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入和输出路径(本地路径)// FileInputFormat.setInputPaths(job, new Path("/Users/foreign/Desktop/hello.txt"));// FileOutputFormat.setOutputPath(job, new Path("/Users/foreign/Desktop/output"));//linux路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}
提交到集群测试
maven打包
maven打jar包,需要添加的打包插件依赖
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
打出来的jar包
- 带依赖
mapreduce-1.0-SNAPSHOT-jar-with-dependencies.jar - 不带依赖
mapreduce-1.0-SNAPSHOT.jar
- 带依赖
- 修改不带依赖的jar包名称为
wc.jar方便调用,并拷贝到Hadoop集群的/opt/module/hadoop-3.1.3目录下 执行WordCount程序
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
Hadoop序列化特点
紧凑:高效使用存储空间
- 快速:读写数据的额外开销小
-
自定义bean对象实现序列化接口(Writable)
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
实现序列化步骤
必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法
- 重写反序列化方法
- 注意反序列化的顺序和序列化的顺序完全一致
- 要想把结果显示在文件中,需要重写
toString(),可用"\t"分开,方便后续用 - 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序
序列化案例实操
需求:统计每个手机号耗费的总上行流量、总下行流量、总流量
输入数据
phone_data.txt输入数据格式
| 7 13560436666 120.196.100.99 1116 954 200
id 手机号码 网络ip 上行流量 下行流量 网络状态码 | | —- |
期望输出数据格式
| 13560436666 1116 954 2070 手机号码 上行流量 下行流量 总流量 |
|---|
需求分析
编写程序
编写流量统计的bean对象
package com.foreign.flow;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/*** @author fangke* @Description:* @Package* @date: 2022/4/8 5:12 下午* <p>*///实现Writable接口public class FlowBean implements Writable {//上行流量private long upFlow;//下行流量private long downFlow;//总流量private long sumFlow;//提供无参构造public FlowBean() {}//提供get set 方法public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//实现序列化和反序列化方法 注意顺序@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}//重写toString方法@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}}
编写Mapper类
package com.foreign.flow;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author fangke* @Description:* @Package* @date: 2022/4/8 5:21 下午* <p>*/public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {private Text k = new Text();private FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {//获取一行数据String line = value.toString();//切割数据String[] split = line.split("\t");//抓取我们需要的数据String phone = split[1];String up = split[split.length - 3];String down = split[split.length - 2];//封装key valuek.set(phone);v.setUpFlow(Long.parseLong(up));v.setDownFlow(Long.parseLong(down));v.setSumFlow();//输出数据context.write(k, v);}}
编写reducer类
package com.foreign.flow;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author fangke* @Description:* @Package* @date: 2022/4/8 5:27 下午* <p>*/public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {private FlowBean v = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {long totalUp = 0;long totalDown = 0;//遍历values,将上行和下行累加for (FlowBean value : values) {totalUp += value.getUpFlow();totalDown += value.getDownFlow();}//封装vv.setUpFlow(totalUp);v.setDownFlow(totalDown);v.setSumFlow();//输出context.write(key, v);}}
编写Driver驱动类
package com.foreign.flow;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author fangke* @Description:* @Package* @date: 2022/4/8 5:30 下午* <p>*/public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("/Users/foreign/Desktop/phone_data.txt"));FileOutputFormat.setOutputPath(job, new Path("/Users/foreign/Desktop/flowoutput"));// 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}}
MapReduce框架原理
InputFormat数据载入
切片与MapTask并行度决定机制
问题引出
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
数据块:
- Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片:
思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
TextInputFormat
TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
例子
如下一个分片包含了4条文本记录
Rich learning formIntelligent learning engineLearning more convenientFrom the real demand for more close to the enterprise
每条记录表示为以下键/值对
(0,Rich learning form)(20,Intelligent learning engine)(49,Learning more convenient)(74,From the real demand for more close to the enterprise)
CombineTextInputFormat切片机制
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
应用场景
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。切片机制
虚拟存储过程
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
切片过程
判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
- 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
输入数据
期望
不做任何处理,运行之前的WordCount案例程序,观察切片个数为4。
- 在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为3
- 驱动类中添加代码如下:
//如果不设置InputFormat,它默认用的是TextInputFormat.class<br />job.setInputFormatClass(CombineTextInputFormat.class);虚拟存储切片最大值设置4m<br />CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
将
setMaxInputSplitSize设置为20M 则切片个数为1个MapReduce工程流程


上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:
- MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
- 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
- 多个溢出文件会被合并成大的溢出文件
- 在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
- ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
- ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
- 合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
- 注意:
Partition分区
问题引出
要求将统计结果按照条件输出到不同文件中,比如将统计结果按照手机号开头3位数输出到不同分区中
默认Partitioner分区

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
自定义Partitioner步骤
自定义继承Partitioner,重写getPartition()方法
- 在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
如果ReduceTask的数量 > getParittion的结果数,则会多产生几个空的输出文件
part-r-000xx- 如果 1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception
- 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件
part-r-00000 -
案例分析
例如:假设自定义分区数为5,则
输入数据
期望输出数据
分区类 ```java package com.foreign.partition;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/9 3:01 下午
*/ public class ProvincePartitioner extends Partitioner
{ @Override public int getPartition(Text text, FlowBean flowBean, int i) { //获取手机号前三位prePhoneString phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量partition,根据prePhone设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分区号partitionreturn partition;
} } ```
- 在驱动类中增加自定义数据分区设置和ReduceTask设置 ```java package com.foreign.partition;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/8 5:30 下午
*/ public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定自定义分区器job.setPartitionerClass(ProvincePartitioner.class);//设置相应数量的ReduceTaskjob.setNumReduceTasks(5);// 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("/Users/foreign/Desktop/phone_data.txt"));FileOutputFormat.setOutputPath(job, new Path("/Users/foreign/Desktop/flowoutput"));// 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);
WritableComparatable 排序
概述
- 排序是MapReduce框架中最重要的操作之一。
- MapTask和ReduceTask均会对数据按照key进行排序,该操作数据Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
-
Map阶段排序
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,在对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
Reduce阶段排序
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定的阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
排序分类
部分排序
- MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序。
- 全排序
- 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask,但该方法在处理大型文件时效率极地,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
二次排序
bean对象作为key传输,需要实现
WritableComparable接口重写compareTo方法,就可以实现排序。@Overridepublic int compareTo(FlowBean bean) {int result;// 按照总流量大小,倒序排列if (this.sumFlow > bean.getSumFlow()) {result = -1;}else if (this.sumFlow < bean.getSumFlow()) {result = 1;}else {result = 0;}return result;}
WritableComparable排序案例实操(全排序)
需求:
-
需求分析
代码实现
```java package com.foreign.writablecomparable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/9 3:01 下午
*/ public class ProvincePartitioner extends Partitioner
{ @Override public int getPartition(Text text, FlowBean flowBean, int i) { //获取手机号前三位prePhoneString phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量partition,根据prePhone设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分区号partitionreturn partition;
} }
java package com.foreign.writablecomparable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/8 5:21 下午
*/ public class FlowMapper extends Mapper
{ private Text k = new Text(); private FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { //获取一行数据String line = value.toString();//切割数据String[] split = line.split("\t");//抓取我们需要的数据String phone = split[1];String up = split[split.length - 3];String down = split[split.length - 2];//封装key valuek.set(phone);v.setUpFlow(Long.parseLong(up));v.setDownFlow(Long.parseLong(down));v.setSumFlow();//输出数据context.write(k, v);
} }
java package com.foreign.writablecomparable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/8 5:12 下午
*/ //实现Writable接口 public class FlowBean implements WritableComparable
{ //上行流量 private long upFlow; //下行流量 private long downFlow; //总流量 private long sumFlow;
//提供无参构造 public FlowBean() { }
//提供get set 方法 public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//实现序列化和反序列化方法 注意顺序 @Override public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);
}
@Override public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();
}
//重写toString方法 @Override public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override public int compareTo(FlowBean o) {
//按照总流量比较,倒序排列if(this.sumFlow > o.sumFlow){return -1;}else if(this.sumFlow < o.sumFlow){return 1;}else {return 0;}
} }
java package com.foreign.writablecomparable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/8 5:27 下午
*/ public class FlowReducer extends Reducer
{ private FlowBean v = new FlowBean();
@Override protected void reduce(FlowBean key, Iterable
values, Reducer .Context context) throws IOException, InterruptedException { //遍历values集合,循环写出,避免总流量相同的情况for (Text value : values) {//调换KV位置,反向写出context.write(value,key);}
} }
java package com.foreign.writablecomparable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/8 5:30 下午
*/ public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 设置Map端输出KV类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定自定义分区器job.setPartitionerClass(ProvincePartitioner.class);//设置相应数量的ReduceTaskjob.setNumReduceTasks(5);// 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("/Users/foreign/Desktop/flowoutput")); //上个结果输出的路径FileOutputFormat.setOutputPath(job, new Path("/Users/foreign/Desktop/flowoutput2"));// 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);
WritableComparable排序案例实操(区内排序)
需求:
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/9 3:01 下午
*/ public class ProvincePartitioner extends Partitioner
{ @Override public int getPartition(FlowBean flowBean, Text text, int numPartitions) { //获取手机号前三位String phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量partition,根据prePhone设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分区号partitionreturn partition;
}
}
```javapackage com.foreign.partitioncomparable;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author fangke* @Description:* @Package* @date: 2022/4/8 5:30 下午* <p>*/public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 设置Map端输出KV类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定自定义分区器job.setPartitionerClass(ProvincePartitioner.class);//设置相应数量的ReduceTaskjob.setNumReduceTasks(5);// 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("/Users/foreign/Desktop/flowoutput")); //上个结果输出的路径FileOutputFormat.setOutputPath(job, new Path("/Users/foreign/Desktop/flowoutput2"));// 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}}
Combiner合并
- Combiner是MR程序中Mapper和Reducer之外的一种组件。
- Combiner组件的父类就是Reducer。
- Combiner和Reducer的区别在于运行的位置。
- Combiner是在每一个MapTask所在的节点运行
- Reducer是接收全局所有Mapper的输出结果
- Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量
- Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出KV应该跟Reducer的输入KV类型要对应起来。
自定义Combiner实现步骤
自定义一个Combiner继承Reducer,重写Reduce方法
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable outV = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}outV.set(sum);context.write(key,outV);}}
驱动类加
job.setCombinerClass(WordCountCombiner.class);Combiner合并案例实操
需求
统计过程中对每一个MapTask的输出进行局部汇总,以减少网络传输量即采用Combiner功能
- 数据输入
期望输出数据
增加一个WordCountCombiner类继承Reducer ```java public class WordCountCombiner extends Reducer
{
private IntWritable outV = new IntWritable();
@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}//封装outKVoutV.set(sum);//写出outKVcontext.write(key,outV);}
}
- 在WordCountDriver驱动类中指定Combiner- `job.setCombinerClass(WordCountCombiner.class);// 指定需要使用combiner,以及用哪个类作为combiner的逻辑`<a name="DyXAK"></a>#### 案例实操——方案二- 将WordCountReducer作为Combiner在WordcountDriver驱动类中指定- `job.setCombinerClass(WordCountReducer.class);// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑`- <a name="VeAUJ"></a>## OutputFormat数据输出- 是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口- 默认输出格式`TextOutputFormat`<a name="r6zjs"></a>### 自定义OutputFormat- 可以输出数据到MySQL等存储框架中<a name="eliSk"></a>### 自定义OutputFormat案例实操<a name="KfS9Q"></a>#### 需求- 过滤输入的log日志,包含atguigu的网站输出到 桌面,不包含的输出到 `/Users/foriegn`目录下- [log.txt](https://www.yuque.com/attachments/yuque/0/2022/txt/25955514/1649834360461-1f144917-f225-4d89-b707-428b5eaf89bb.txt?_lake_card=%7B%22src%22%3A%22https%3A%2F%2Fwww.yuque.com%2Fattachments%2Fyuque%2F0%2F2022%2Ftxt%2F25955514%2F1649834360461-1f144917-f225-4d89-b707-428b5eaf89bb.txt%22%2C%22name%22%3A%22log.txt%22%2C%22size%22%3A201%2C%22type%22%3A%22text%2Fplain%22%2C%22ext%22%3A%22txt%22%2C%22status%22%3A%22done%22%2C%22taskId%22%3A%22u378ff72d-c188-4442-8949-8bfb1981b11%22%2C%22taskType%22%3A%22upload%22%2C%22id%22%3A%22ufc33271f%22%2C%22card%22%3A%22file%22%7D)<a name="RZca7"></a>#### 需求分析<a name="xiWbR"></a>#### 代码实操```javapackage com.foreign.outputformat;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author fangke* @Description:* @Package* @date: 2022/4/13 3:23 下午* <p>*/public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {//直接输出一行数据 以 http://www.baidu.com 为keycontext.write(value, NullWritable.get());}}
package com.foreign.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author fangke
* @Description:
* @Package
* @date: 2022/4/13 3:25 下午
* <p>
*/
public class LogReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 防止有相同的数据,迭代写出
for (NullWritable value : values) {
context.write(key,NullWritable.get());
}
}
}
package com.foreign.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author fangke
* @Description:
* @Package
* @date: 2022/4/13 3:28 下午
* <p>
*/
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
//创建一个自定义的RecordWriter返回
LogRecordWriter lrw = new LogRecordWriter(job);
return (RecordWriter)lrw;
}
}
package com.foreign.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import javax.xml.soap.Text;
import java.io.IOException;
/**
* @author fangke
* @Description:
* @Package
* @date: 2022/4/13 3:29 下午
* <p>
*/
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext job) {
try {
//获取文件系统对象
FileSystem fs = FileSystem.get(job.getConfiguration());
//用文件系统对象创建两个输出流对应不同的目录
atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log"));
otherOut = fs.create(new Path("d:/hadoop/other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String log = key.toString();
//根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
if (log.contains("atguigu")) {
atguiguOut.writeBytes(log + "\n");
} else {
otherOut.writeBytes(log + "\n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//关流
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
package com.foreign.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author fangke
* @Description:
* @Package
* @date: 2022/4/13 3:35 下午
* <p>
*/
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义的outputformat
job.setOutputFormatClass(LogOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("D:\\input"));
//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
MapReduce内核源码解析
MapTask工作机制

- Read阶段
- MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value
- Map阶段
- 该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value
- Collect收集阶段
- 在用户编写map函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
- Spill阶段
- 溢写,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
- 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序的方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
- 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out (N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
- 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件
output/spillN.out.index中。
Merge阶段
- 当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
- 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index
- 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
- 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
ReduceTask工作机制

copy阶段
- ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- sort阶段
- 在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
reduce阶段
设置ReduceTask并行度(个数)
- ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:
job.setNumReduceTasks(4);默认值为1
测试ReduceTask多少合适
- 实验环境:1个Master节点,16个Slave节点:CPU:8GHZ,内存: 2G | MapTask =16 | | | | | | | | | | | | —- | —- | —- | —- | —- | —- | —- | —- | —- | —- | —- | | ReduceTask | 1 | 5 | 10 | 15 | 16 | 20 | 25 | 30 | 45 | 60 | | 总时间 | 892 | 146 | 110 | 92 | 88 | 100 | 128 | 101 | 145 | 104 |
注意事项
- ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。
- ReduceTask默认值就是1,所以输出文件个数为1个
- 如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
- ReduceTask数据并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。
- 具体多少个ReduceTask,需要根据集群性能而定
- 如果分区数不是1,但是ReduceTask为1,是否指向分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1,不大于1肯定不执行。
MapTask&ReduceTask源码解析
MapTask源码解析流程
ReduceTask源码解析流程
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出
- Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
Reduce Join案例实操
需求
order.txt
表4-4 订单数据表t_order
| id | pid | amount |
|---|---|---|
| 1001 | 01 | 1 |
| 1002 | 02 | 2 |
| 1003 | 03 | 3 |
| 1004 | 01 | 4 |
| 1005 | 02 | 5 |
| 1006 | 03 | 6 |
pd.txt
表4-5 商品信息表t_product
| pid | pname |
|---|---|
| 01 | 小米 |
| 02 | 华为 |
| 03 | 格力 |
表4-6 最终数据形式
| id | pname | amount |
|---|---|---|
| 1001 | 小米 | 1 |
| 1004 | 小米 | 4 |
| 1002 | 华为 | 2 |
| 1005 | 华为 | 5 |
| 1003 | 格力 | 3 |
| 1006 | 格力 | 6 |
需求分析
- 通过将关联条件作为Map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。
-
代码实现
创建商品和订单合并后的TableBean类 ```java package com.foreign.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/13 4:46 下午
*/ public class TableBean implements Writable {
private String id; //订单id private String pid; //产品id private int amount; //产品数量 private String pname; //产品名称 private String flag; //判断是order表还是pd表的标志字段
public TableBean() { }
public String getId() {
return id;}
public void setId(String id) {
this.id = id;}
public String getPid() {
return pid;}
public void setPid(String pid) {
this.pid = pid;}
public int getAmount() {
return amount;}
public void setAmount(int amount) {
this.amount = amount;}
public String getPname() {
return pname;}
public void setPname(String pname) {
this.pname = pname;}
public String getFlag() {
return flag;}
public void setFlag(String flag) {
this.flag = flag;}
@Override public String toString() {
return id + "\t" + pname + "\t" + amount;}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
}
- 创建TableMapper类
```java
package com.foreign.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* @author fangke
* @Description:
* @Package
* @date: 2022/4/13 4:47 下午
* <p>
*/
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
private String filename;
private Text outK = new Text();
private TableBean outV = new TableBean();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取对应文件名称
InputSplit split = context.getInputSplit();
FileSplit fileSplit = (FileSplit) split;
filename = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行
String line = value.toString();
//判断是哪个文件,然后针对文件进行不同的操作
if (filename.contains("order")) { //订单表的处理
String[] split = line.split("\t");
//封装outK
outK.set(split[1]);
//封装outV
outV.setId(split[0]);
outV.setPid(split[1]);
outV.setAmount(Integer.parseInt(split[2]));
outV.setPname("");
outV.setFlag("order");
} else { //商品表的处理
String[] split = line.split("\t");
//封装outK
outK.set(split[0]);
//封装outV
outV.setId("");
outV.setPid(split[0]);
outV.setAmount(0);
outV.setPname(split[1]);
outV.setFlag("pd");
}
//写出KV
context.write(outK, outV);
}
}
- 创建TableReducer类 ```java package com.foreign.reducejoin;
import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/13 4:53 下午
*/ public class TableReducer extends Reducer
{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { ArrayList<TableBean> orderBeans = new ArrayList<>(); TableBean pdBean = new TableBean(); for (TableBean value : values) { //判断数据来自哪个表 if ("order".equals(value.getFlag())) { //订单表 //创建一个临时TableBean对象接收value TableBean tmpOrderBean = new TableBean(); try { BeanUtils.copyProperties(tmpOrderBean, value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } //将临时TableBean对象添加到集合orderBeans orderBeans.add(tmpOrderBean); } else { //商品表 try { BeanUtils.copyProperties(pdBean, value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //遍历集合orderBeans,替换掉每个orderBean的pid为pname,然后写出 for (TableBean orderBean : orderBeans) { orderBean.setPname(pdBean.getPname()); //写出修改后的orderBean对象 context.write(orderBean, NullWritable.get()); }} } ```
- 创建TableDriver类 ```java package com.foreign.reducejoin;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
- @author fangke
- @Description:
- @Package
- @date: 2022/4/13 4:54 下午
*/ public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration()); job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("D:\\input")); FileOutputFormat.setOutputPath(job, new Path("D:\\output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1);}
}
<a name="hqmRj"></a>
#### 总结
- 这种方式,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
- 使用Map端实现数据合并
<a name="yBw9U"></a>
### Map Join
<a name="PM2Se"></a>
#### 使用场景
- Map Join适用于一张表十分小、一张表很大的场景。
<a name="uz5WK"></a>
#### 具体方法
- 在Mapper的setup阶段,将文件读取到缓存集合中
- 在Driver驱动类中加载缓存
```java
//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
需求分析
代码实现
现在MapJoinDriver驱动类中添加缓存文件
public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置加载jar包路径 job.setJarByClass(MapJoinDriver.class); // 3 关联mapper job.setMapperClass(MapJoinMapper.class); // 4 设置Map输出KV类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 加载缓存数据 job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt")); // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0 job.setNumReduceTasks(0); // 6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("D:\\input")); FileOutputFormat.setOutputPath(job, new Path("D:\\output")); // 7 提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }在MapJoinMapper类中的setup方法中读取缓存文件
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private Map<String, String> pdMap = new HashMap<>(); private Text text = new Text(); //任务开始前将pd数据缓存进pdMap @Override protected void setup(Context context) throws IOException, InterruptedException { //通过缓存文件得到小表数据pd.txt URI[] cacheFiles = context.getCacheFiles(); Path path = new Path(cacheFiles[0]); //获取文件系统对象,并开流 FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(path); //通过包装流转换为reader,方便按行读取 BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8")); //逐行读取,按行处理 String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { //切割一行 //01 小米 String[] split = line.split("\t"); pdMap.put(split[0], split[1]); } //关流 IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取大表数据 //1001 01 1 String[] fields = value.toString().split("\t"); //通过大表每行数据的pid,去pdMap里面取出pname String pname = pdMap.get(fields[1]); //将大表每行数据的pid替换为pname text.set(fields[0] + "\t" + pname + "\t" + fields[2]); //写出 context.write(text,NullWritable.get()); } }数据清洗(ETL)
“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
需求
-
输入数据
期望输出数据
-
需求分析
-
实现代码
编写WebLogMapper类
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取1行数据 String line = value.toString(); // 2 解析日志 boolean result = parseLog(line,context); // 3 日志不合法退出 if (!result) { return; } // 4 日志合法就直接写出 context.write(value, NullWritable.get()); } // 2 封装解析日志的方法 private boolean parseLog(String line, Context context) { // 1 截取 String[] fields = line.split(" "); // 2 日志长度大于11的为合法 if (fields.length > 11) { return true; }else { return false; } } }编写WebLogDriver类
```java public class WebLogDriver { public static void main(String[] args) throws Exception {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { “D:/input/inputlog”, “D:/output1” };
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加载jar包
job.setJarByClass(LogDriver.class);
// 3 关联map
job.setMapperClass(WebLogMapper.class);
// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置reducetask个数为0
job.setNumReduceTasks(0);
// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
<a name="VoKTz"></a>
## Hadoop数据压缩
<a name="qYomk"></a>
### 概述
<a name="TjV4w"></a>
#### 压缩的好处和坏处
- 压缩的优点:以减少磁盘IO、减少磁盘存储空间。
- 压缩的缺点:增加CPU开销。
<a name="PL7VG"></a>
#### 压缩原则
- 运算密集型的Job,少用压缩
- IO密集型的Job,多用压缩
<a name="XCGdR"></a>
### MR支持的压缩编码
<a name="ruk9k"></a>
#### 压缩算法对比介绍
| 压缩格式 | Hadoop自带? | 算法 | 文件扩展名 | 是否可切片 | 换成压缩格式后,原来的程序是否需要修改 |
| --- | --- | --- | --- | --- | --- |
| DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
| Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
| bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
| LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
| Snappy | 是,直接使用 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
<a name="YZAia"></a>
#### 压缩性能比较
| 压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
| --- | --- | --- | --- | --- |
| gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
| bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
| LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
<a name="cuGPz"></a>
### 压缩方式选择
> 压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。
<a name="neH8M"></a>
#### Gzip压缩
优点:压缩率比较高;<br />缺点:不支持Split;压缩/解压速度一般;
<a name="PiNII"></a>
#### Bzip2压缩
优点:压缩率高;支持Split;<br />缺点:压缩/解压速度慢。
<a name="JaI10"></a>
#### Lzo压缩
优点:压缩/解压速度比较快;支持Split;<br />缺点:压缩率一般;想支持切片需要额外创建索引。
<a name="b7JBe"></a>
#### Snappy压缩
优点:压缩和解压缩速度快;<br />缺点:不支持Split;压缩率一般;
<a name="QKVXY"></a>
#### 压缩位置选择
> 压缩可以在MapReduce作用的任意阶段启用。

- 为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器
| 压缩格式 | 对应的编码/解码器 |
| --- | --- |
| DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
| gzip | org.apache.hadoop.io.compress.GzipCodec |
| bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
| LZO | com.hadoop.compression.lzo.LzopCodec |
| Snappy | org.apache.hadoop.io.compress.SnappyCodec |
- 要在Hadoop中启用压缩,可以配置如下参数
| 参数 | 默认值 | 阶段 | 建议 |
| --- | --- | --- | --- |
| io.compression.codecs <br />(在core-site.xml中配置) | 无,这个需要在命令行输入hadoop checknative查看 | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
| mapreduce.map.output.compress(在mapred-site.xml中配置) | false | mapper输出 | 这个参数设为true启用压缩 |
| mapreduce.map.output.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 企业多使用LZO或Snappy编解码器在此阶段压缩数据 |
| mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) | false | reducer输出 | 这个参数设为true启用压缩 |
| mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
<a name="IyLuS"></a>
### 压缩实操案例
<a name="U4dcO"></a>
#### Map输出端采用压缩
- 即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置。
- 给大家提供的Hadoop源码支持的压缩格式有:BZip2Codec、DefaultCodec
```java
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 开启map端输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class);
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Map保持不变
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 循环写出 for(String word:words){ k.set(word); context.write(k, v); } } }Reducer保持不变
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 1 汇总 for(IntWritable value:values){ sum += value.get(); } v.set(sum); // 2 输出 context.write(key, v); } }Reduce输出端采用压缩
修改驱动
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置reduce端输出压缩开启 FileOutputFormat.setCompressOutput(job, true); // 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }Mapper和Reducer保持不变



