MapReduce概述
MapReduce定义
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
MapReduce优缺点
优点
1)MapReduce易于编程它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行2)良好的扩展性当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力3)高容错性MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的4)适合PB级以上海量数据的离线处理可以实现上千台服务器集群并发工作,提供数据处理能力
缺点
1)不擅长实时计算MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果2)不擅长流式计算流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的3)不擅长DAG(有向无环图)计算多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下
MapReduce核心思想
(1)分布式的运算程序往往需要分成至少2个阶段。(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行
理解MapReduce思想
MapReduce思想在生活中处处可见,每个人或多或少都曾接触过这种思想。MapReduce的思想核心是“先分再合,分而治之”, 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果。这种思想来源于日常生活与工作时的经验,同样也完全适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。Reduce负责“合”,即对map阶段的结果进行全局汇总。这两个阶段合起来正是MapReduce思想的体现一个比较形象的语言解释MapReduce:我们要数停车场中的所有的车数量。你数第一列,我数第二列。这就是“Map”。我们人越多,能够同时数车的人就越多,速度就越快。数完之后,我们聚到一起,把所有人的统计数加在一起。这就是“Reduce”
场景:如何模拟实现分布式计算
什么是分布式计算
分布式计算是一种计算方法,和集中式计算是相对的。随着计算技术的发展,有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成。分布式计算将该应用分解成许多小的部分,分配给多台计算机进行处理。这样可以节约整体计算时间,大大提高计算效率

大数据场景下模拟实现

MapReduce架构体系
一个完整的mapreduce程序在分布式运行时有三类实例进程:1、MRAppMaster:负责整个程序的过程调度及状态协调2、MapTask:负责map阶段的整个数据处理流程3、ReduceTask:负责reduce阶段的整个数据处理流程

Map Reduce工作执行流程
整个MapReduce工作流程可以分为3个阶段:map、shuffle、reduce
map阶段:负责把从数据源读取来到数据进行处理,默认情况下读取数据返回的是kv键值对类型,经过自定义map方法处理之后,输出的也应该是kv键值对类型。shuffle阶段:map输出的数据会经过分区、排序、分组等自带动作进行重组,相当于洗牌的逆过程。这是MapReduce的核心所在,也是难点所在。也是值得我们深入探究的所在。默认分区规则:key相同的分在同一个分区,同一个分区被同一个reduce处理。默认排序规则:根据key字典序排序默认分组规则:key相同的分为一组,一组调用reduce处理一次。reduce阶段:负责针对shuffle好的数据进行聚合处理。输出的结果也应该是kv键值对
MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:(1)MrAppMaster:负责整个程序的过程调度及状态协调。(2)MapTask:负责Map阶段的整个数据处理流程。(3)ReduceTask:负责Reduce阶段的整个数据处理流程
Hadoop中数据类型
Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较
| Hadoop 数据类型 | Java数据类型 | 备注 |
|---|---|---|
| BooleanWritable | boolean | 标准布尔型数值 |
| ByteWritable | byte | 单字节数值 |
| IntWritable | int | 整型数 |
| FloatWritable | float | 浮点数 |
| LongWritable | long | 长整型数 |
| DoubleWritable | double | 双字节数值 |
| Text | String | 使用UTF8格式存储的文本 |
| MapWritable | map | 映射 |
| ArrayWritable | array | 数组 |
| NullWritable | null | 当中的key或value为空时使用 |
注意:如果需要将自定义的类放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序
MapReduce编程规范
用户编写的程序分成三个部分:Mapper、Reducer和Driver
Mapper
(1)Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)(2)Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)(3)Mapper 中的业务逻辑写在 map()方法中(4)map()方法(maptask 进程)对每一个<K,V>调用一次(5)用户自定义的Mapper要继承自己的父类
Reducer
(1)Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV(2)Reducer 的业务逻辑写在 reduce()方法中(3)Reducetask 进程对每一组相同 k 的<k,v>组调用一次reduce()方法(4)用户自定义的Reducer要继承自己的父类
Drvier
相当于Yarn集群的客户端,用于提交整个程序到Yarn集群,提交的是一个描述了各种必要信息的 job 对象
MapReduce经典入门案例
WordCount
WordCount中文叫做单词统计、词频统计,指的是使用程序统计某文本文件中,每个单词出现的总次数。这个是大数据计算领域经典的入门案例,虽然业务及其简单,但是希望能够通过案例感受背后MapReduce的执行流程和默认的行为机制,这才是关键。
本地测试
在给定的文本文件中统计输出每一个单词出现的总次数
<!--文本中的数据-->hello hadoop hello hellohadoop allen hadoop
<!--期望的输出-->hello 3hadoop 3allen 1
pom.xml中添加maven依赖
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency></dependencies>
添加日志配置
项目src/main/resources目录下,新建 log4j.properties 日志配置文件
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
编写程序
Mapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* LongWritable: KEYIN,map阶段输入的kay的类型
* Text: VALUEIN,map阶段输入的value类型
* Text: KEYOUT,map阶段输出的key类型
* IntWritable: VALUEOUT,map阶段输出的value类型
* @Author wukai
* @Date 2021/12/10
**/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split("\\s+");
// 3 输出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
Reducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @Author wukai
* @Date 2021/12/10
**/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
context.write(key,v);
}
}
Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @Author wukai
* @Date 2021/12/10
**/
public class WordCountDriver {
/**
* hadoop地址
*/
private static final String HDFSPATH = "hdfs://hadoop03:8020";
/**
* 输入路径
*/
private static final String INPUTPATH = "hdfs://hadoop03:8020/input";
/**
* 输出路径
*/
private static final String OUTPATH = "hdfs://hadoop03:8020/output";
/**
* hadoop用户名 key
*/
private static final String HADOOPUSERNAMEKEY = "HADOOP_USER_NAME";
/**
* hadoop用户名 value
*/
private static final String HADOOPUSERNAMEVALUE = "root";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//设置hadoop用户名
System.setProperty(HADOOPUSERNAMEKEY,HADOOPUSERNAMEVALUE);
// 1 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileSystem fs = FileSystem.get(new URI(HDFSPATH), conf, HADOOPUSERNAMEVALUE);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPATH));
if(fs.exists(new Path(OUTPATH))){
fs.delete(new Path(OUTPATH),true);
}
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
执行程序后,在Hadoop web ui中查看结果

统计买家具有的商品数量
创建输入数据文件
mkdir shopData
cd cd shopData/
vim shopinfo.txt
买家ID 商品ID
10000 100282
10001 192102
12820 182919
17281 182819
10001 828191
28719 182911
10000 287719
10000 289299
17281 198191
10000 100212
10001 192432
12820 182439
17281 182239
10001 828541
28719 182941
10000 287739
10000 289239
17281 198141
在HDFS中创建数据
hadoop fs -mkdir /shopInputData
cd shopData/
hadoop fs -put shop.txt /shopInputData
编写程序
Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @Author wukai
* @Date 2021/12/10
**/
public class ShopCountMapper extends Mapper<LongWritable, Text, Text, Text> {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split("\\s+");
// 3 输出
k.set(words[0]);
v.set(words[1]);
context.write(k,v);
}
}
Reducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @Author wukai
* @Date 2021/12/10
**/
public class ShopCountReducer extends Reducer<Text, Text, Text, IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (Text value : values) {
sum++;
}
v.set(sum);
context.write(key, v);
}
}
Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @Author wukai
* @Date 2021/12/10
**/
public class ShopCountDriver {
/**
* hadoop地址
*/
private static final String HDFSPATH = "hdfs://hadoop03:8020";
/**
* 输入路径
*/
private static final String INPUTPATH = "hdfs://hadoop03:8020/shopInputData";
/**
* 输出路径
*/
private static final String OUTPATH = "hdfs://hadoop03:8020/shopOutData";
/**
* hadoop用户名 key
*/
private static final String HADOOPUSERNAMEKEY = "HADOOP_USER_NAME";
/**
* hadoop用户名 value
*/
private static final String HADOOPUSERNAMEVALUE = "root";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//设置hadoop用户名
System.setProperty(HADOOPUSERNAMEKEY,HADOOPUSERNAMEVALUE);
// 1 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(ShopCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(ShopCountMapper.class);
job.setReducerClass(ShopCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileSystem fs = FileSystem.get(new URI(HDFSPATH), conf, HADOOPUSERNAMEVALUE);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPATH));
if(fs.exists(new Path(OUTPATH))){
fs.delete(new Path(OUTPATH),true);
}
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

Hadoop序列化
序列化概述
什么事序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象
为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机
什么不用Java提供的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)
hadoop序列化的特点
(1)紧凑 :高效使用存储空间
(2)快速:读写数据的额外开销小
(3)互操作:支持多语言的交互
序列化案例实操
数据统计
创建数据输入文件
vim flowData
cd flowData/
vim flow.txt
1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200
注意不要有空行
hadoop fs -mkdir /flowInputData
hadoop fs -put flow.txt /flowInputData
编写程序
FlowBean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @Author wukai
* @Date 2021/12/10
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowBean implements Writable {
/**
* 上行流量
*/
private Long upFlow;
/**
* 下行流量
*/
private Long downFlow;
/**
* 总流量
*/
private Long sumFlow;
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
}
Mapper
import com.hadoop01.entity.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @Author wukai
* @Date 2021/12/10
**/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 获取一行数据,转成字符串
String line = value.toString();
//2 切割数据
String[] split = line.split("\\s+");
//3 抓取我们需要的数据:手机号,上行流量,下行流量
String phone = split[1];
String up = split[split.length - 3];
String down = split[split.length - 2];
//4 封装outK outV
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
//5 写出outK outV
context.write(outK, outV);
}
}
Reducer
import com.hadoop01.entity.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @Author wukai
* @Date 2021/12/10
**/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long totalUp = 0;
long totalDown = 0;
//1 遍历values,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
totalUp += flowBean.getUpFlow();
totalDown += flowBean.getDownFlow();
}
//2 封装outKV
outV.setUpFlow(totalUp);
outV.setDownFlow(totalDown);
outV.setSumFlow();
//3 写出outK outV
context.write(key,outV);
}
}
Driver
import com.hadoop01.entity.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @Author wukai
* @Date 2021/12/10
**/
public class FlowDriver {
/**
* hadoop地址
*/
private static final String HDFSPATH = "hdfs://hadoop03:8020";
/**
* 输入路径
*/
private static final String INPUTPATH = "hdfs://hadoop03:8020/flowInputData";
/**
* 输出路径
*/
private static final String OUTPATH = "hdfs://hadoop03:8020/flowOutData";
/**
* hadoop用户名 key
*/
private static final String HADOOPUSERNAMEKEY = "HADOOP_USER_NAME";
/**
* hadoop用户名 value
*/
private static final String HADOOPUSERNAMEVALUE = "root";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//设置hadoop用户名
System.setProperty(HADOOPUSERNAMEKEY,HADOOPUSERNAMEVALUE);
// 1 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(FlowDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileSystem fs = FileSystem.get(new URI(HDFSPATH), conf, HADOOPUSERNAMEVALUE);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPATH));
if(fs.exists(new Path(OUTPATH))){
fs.delete(new Path(OUTPATH),true);
}
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

MapReduce框架原理
InputFormat数据输入
切片与MapTask并行度决定机制
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度
MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask
MapReduce执行流程
Map阶段执行过程
* 1. 第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)
* 2. 第二阶段是对切片中的数据按照一定的规则解析成对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat)
* 3. 第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个,调用一次map方法。每次调用map方法会输出零个或多个键值对。
* 4. 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。
* 5. 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中。
* 6. 第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。
Redue阶段执行过程
* 第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。
* 第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
* 第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。
数据清洗
“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序

