MapReduce概述

MapReduce定义

  1. MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架
  2. MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上

MapReduce优缺点

优点

  1. 1MapReduce易于编程
  2. 它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行
  3. 2)良好的扩展性
  4. 当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力
  5. 3)高容错性
  6. MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的
  7. 4)适合PB级以上海量数据的离线处理
  8. 可以实现上千台服务器集群并发工作,提供数据处理能力

缺点

  1. 1)不擅长实时计算
  2. MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果
  3. 2)不擅长流式计算
  4. 流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的
  5. 3)不擅长DAG(有向无环图)计算
  6. 多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下

MapReduce核心思想

  1. 1)分布式的运算程序往往需要分成至少2个阶段。
  2. 2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
  3. 3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
  4. 4MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行

理解MapReduce思想

  1. MapReduce思想在生活中处处可见,每个人或多或少都曾接触过这种思想。MapReduce的思想核心是“先分再合,分而治之”, 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果。
  2. 这种思想来源于日常生活与工作时的经验,同样也完全适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创
  3. Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
  4. Reduce负责“合”,即对map阶段的结果进行全局汇总。
  5. 这两个阶段合起来正是MapReduce思想的体现
  6. 一个比较形象的语言解释MapReduce:  
  7. 我们要数停车场中的所有的车数量。你数第一列,我数第二列。这就是“Map”。我们人越多,能够同时数车的人就越多,速度就越快。
  8. 数完之后,我们聚到一起,把所有人的统计数加在一起。这就是“Reduce

场景:如何模拟实现分布式计算

什么是分布式计算

  1. 分布式计算是一种计算方法,和集中式计算是相对的。
  2. 随着计算技术的发展,有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成。
  3. 分布式计算将该应用分解成许多小的部分,分配给多台计算机进行处理。这样可以节约整体计算时间,大大提高计算效率

MapReduce - 图1

大数据场景下模拟实现

MapReduce - 图2

MapReduce架构体系

  1. 一个完整的mapreduce程序在分布式运行时有三类实例进程:
  2. 1MRAppMaster:负责整个程序的过程调度及状态协调
  3. 2MapTask:负责map阶段的整个数据处理流程
  4. 3ReduceTask:负责reduce阶段的整个数据处理流程

MapReduce - 图3

Map Reduce工作执行流程

整个MapReduce工作流程可以分为3个阶段:map、shuffle、reduce
MapReduce - 图4

  1. map阶段:
  2. 负责把从数据源读取来到数据进行处理,默认情况下读取数据返回的是kv键值对类型,经过自定义map方法处理之后,输出的也应该是kv键值对类型。
  3. shuffle阶段:
  4. map输出的数据会经过分区、排序、分组等自带动作进行重组,相当于洗牌的逆过程。这是MapReduce的核心所在,也是难点所在。也是值得我们深入探究的所在。
  5. 默认分区规则:key相同的分在同一个分区,同一个分区被同一个reduce处理。
  6. 默认排序规则:根据key字典序排序
  7. 默认分组规则:key相同的分为一组,一组调用reduce处理一次。
  8. reduce阶段:
  9. 负责针对shuffle好的数据进行聚合处理。输出的结果也应该是kv键值对

MapReduce进程

  1. 一个完整的MapReduce程序在分布式运行时有三类实例进程:
  2. 1MrAppMaster:负责整个程序的过程调度及状态协调。
  3. 2MapTask:负责Map阶段的整个数据处理流程。
  4. 3ReduceTask:负责Reduce阶段的整个数据处理流程

Hadoop中数据类型

Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较

Hadoop 数据类型 Java数据类型 备注
BooleanWritable boolean 标准布尔型数值
ByteWritable byte 单字节数值
IntWritable int 整型数
FloatWritable float 浮点数
LongWritable long 长整型数
DoubleWritable double 双字节数值
Text String 使用UTF8格式存储的文本
MapWritable map 映射
ArrayWritable array 数组
NullWritable null 当中的key或value为空时使用

注意:如果需要将自定义的类放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序

MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver

Mapper

  1. 1Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
  2. 2Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
  3. 3Mapper 中的业务逻辑写在 map()方法中
  4. 4map()方法(maptask 进程)对每一个<K,V>调用一次
  5. 5)用户自定义的Mapper要继承自己的父类

Reducer

  1. 1Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
  2. 2Reducer 的业务逻辑写在 reduce()方法中
  3. 3Reducetask 进程对每一组相同 k 的<k,v>组调用一次reduce()方法
  4. 4)用户自定义的Reducer要继承自己的父类

Drvier

  1. 相当于Yarn集群的客户端,用于提交整个程序到Yarn集群,提交的是一个描述了各种必要信息的 job 对象

MapReduce经典入门案例

WordCount

WordCount中文叫做单词统计、词频统计,指的是使用程序统计某文本文件中,每个单词出现的总次数。这个是大数据计算领域经典的入门案例,虽然业务及其简单,但是希望能够通过案例感受背后MapReduce的执行流程和默认的行为机制,这才是关键。

本地测试

在给定的文本文件中统计输出每一个单词出现的总次数

  1. <!--文本中的数据-->
  2. hello hadoop hello hello
  3. hadoop allen hadoop
  1. <!--期望的输出-->
  2. hello 3
  3. hadoop 3
  4. allen 1

pom.xml中添加maven依赖
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.hadoop</groupId>
  4. <artifactId>hadoop-client</artifactId>
  5. <version>3.1.3</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>junit</groupId>
  9. <artifactId>junit</artifactId>
  10. <version>4.12</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.slf4j</groupId>
  14. <artifactId>slf4j-log4j12</artifactId>
  15. <version>1.7.30</version>
  16. </dependency>
  17. </dependencies>

添加日志配置

项目src/main/resources目录下,新建 log4j.properties 日志配置文件

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

编写程序

Mapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * LongWritable:  KEYIN,map阶段输入的kay的类型
 * Text: VALUEIN,map阶段输入的value类型
 * Text: KEYOUT,map阶段输出的key类型
 * IntWritable: VALUEOUT,map阶段输出的value类型
 * @Author wukai
 * @Date 2021/12/10
 **/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();
        // 2 切割
        String[] words = line.split("\\s+");
        // 3 输出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }

}

Reducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        // 1 累加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        // 2 输出
        v.set(sum);
        context.write(key,v);
    }

}

Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
public class WordCountDriver {
    /**
     * hadoop地址
     */
    private static final String HDFSPATH = "hdfs://hadoop03:8020";
    /**
     * 输入路径
     */
    private static final String INPUTPATH = "hdfs://hadoop03:8020/input";
    /**
     * 输出路径
     */
    private static final String OUTPATH = "hdfs://hadoop03:8020/output";
    /**
     * hadoop用户名 key
     */
    private static final String HADOOPUSERNAMEKEY = "HADOOP_USER_NAME";
    /**
     * hadoop用户名 value
     */
    private static final String HADOOPUSERNAMEVALUE = "root";


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //设置hadoop用户名
        System.setProperty(HADOOPUSERNAMEKEY,HADOOPUSERNAMEVALUE);

        // 1 获取配置信息以及获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 关联本Driver程序的jar
        job.setJarByClass(WordCountDriver.class);

        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileSystem fs = FileSystem.get(new URI(HDFSPATH), conf, HADOOPUSERNAMEVALUE);
        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPATH));

        if(fs.exists(new Path(OUTPATH))){
            fs.delete(new Path(OUTPATH),true);
        }
        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

执行程序后,在Hadoop web ui中查看结果
MapReduce - 图5

统计买家具有的商品数量

创建输入数据文件

mkdir shopData
cd cd shopData/
vim shopinfo.txt

买家ID    商品ID
10000   100282
10001   192102
12820   182919
17281   182819
10001   828191
28719   182911
10000   287719
10000   289299
17281   198191
10000   100212
10001   192432
12820   182439
17281   182239
10001   828541
28719   182941
10000   287739
10000   289239
17281   198141

在HDFS中创建数据

hadoop fs -mkdir /shopInputData
cd shopData/
hadoop fs -put shop.txt /shopInputData

编写程序

Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
public class ShopCountMapper extends Mapper<LongWritable, Text, Text, Text> {
    Text k = new Text();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();
        // 2 切割
        String[] words = line.split("\\s+");

        // 3 输出
        k.set(words[0]);
        v.set(words[1]);
        context.write(k,v);
    }

}

Reducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
public class ShopCountReducer extends Reducer<Text, Text, Text, IntWritable> {

    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
        int sum = 0;
        for (Text value : values) {
            sum++;
        }
        v.set(sum);
        context.write(key, v);
    }
}

Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
public class ShopCountDriver {
    /**
     * hadoop地址
     */
    private static final String HDFSPATH = "hdfs://hadoop03:8020";
    /**
     * 输入路径
     */
    private static final String INPUTPATH = "hdfs://hadoop03:8020/shopInputData";
    /**
     * 输出路径
     */
    private static final String OUTPATH = "hdfs://hadoop03:8020/shopOutData";
    /**
     * hadoop用户名 key
     */
    private static final String HADOOPUSERNAMEKEY = "HADOOP_USER_NAME";
    /**
     * hadoop用户名 value
     */
    private static final String HADOOPUSERNAMEVALUE = "root";


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //设置hadoop用户名
        System.setProperty(HADOOPUSERNAMEKEY,HADOOPUSERNAMEVALUE);

        // 1 获取配置信息以及获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 关联本Driver程序的jar
        job.setJarByClass(ShopCountDriver.class);

        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(ShopCountMapper.class);
        job.setReducerClass(ShopCountReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileSystem fs = FileSystem.get(new URI(HDFSPATH), conf, HADOOPUSERNAMEVALUE);
        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPATH));

        if(fs.exists(new Path(OUTPATH))){
            fs.delete(new Path(OUTPATH),true);
        }
        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

MapReduce - 图6

Hadoop序列化

序列化概述

什么事序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输 
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象

为什么要序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机

什么不用Java提供的序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)

hadoop序列化的特点

(1)紧凑 :高效使用存储空间
(2)快速:读写数据的额外开销小
(3)互操作:支持多语言的交互

序列化案例实操

数据统计

创建数据输入文件
vim flowData
cd flowData/
vim flow.txt
1       13736230513     192.196.100.1   www.atguigu.com 2481    24681   200
2       13846544121     192.196.100.2                   264     0       200
3       13956435636     192.196.100.3                   132     1512    200
4       13966251146     192.168.100.1                   240     0       404
5       18271575951     192.168.100.2   www.atguigu.com 1527    2106    200
6       84188413        192.168.100.3   www.atguigu.com 4116    1432    200
7       13590439668     192.168.100.4                   1116    954     200
8       15910133277     192.168.100.5   www.hao123.com  3156    2936    200
9       13729199489     192.168.100.6                   240     0       200
10      13630577991     192.168.100.7   www.shouhu.com  6960    690     200
11      15043685818     192.168.100.8   www.baidu.com   3659    3538    200
12      15959002129     192.168.100.9   www.atguigu.com 1938    180     500
13      13560439638     192.168.100.10                  918     4938    200
14      13470253144     192.168.100.11                  180     180     200
15      13682846555     192.168.100.12  www.qq.com      1938    2910    200
16      13992314666     192.168.100.13  www.gaga.com    3008    3720    200
17      13509468723     192.168.100.14  www.qinghua.com 7335    110349  404
18      18390173782     192.168.100.15  www.sogou.com   9531    2412    200
19      13975057813     192.168.100.16  www.baidu.com   11058   48243   200
20      13768778790     192.168.100.17                  120     120     200
21      13568436656     192.168.100.18  www.alibaba.com 2481    24681   200
22      13568436656     192.168.100.19                  1116    954     200
注意不要有空行
hadoop fs -mkdir /flowInputData
hadoop fs -put flow.txt /flowInputData

编写程序

FlowBean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowBean implements Writable {
    /**
     * 上行流量
     */
    private Long upFlow;
    /**
     * 下行流量
     */
    private Long downFlow;
    /**
     * 总流量
     */
    private Long sumFlow;



    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);

    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();


    }
}

Mapper
import com.hadoop01.entity.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    private Text outK = new Text();
    private FlowBean outV = new FlowBean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1 获取一行数据,转成字符串
        String line = value.toString();

        //2 切割数据
        String[] split = line.split("\\s+");
        //3 抓取我们需要的数据:手机号,上行流量,下行流量
        String phone = split[1];
        String up = split[split.length - 3];
        String down = split[split.length - 2];

        //4 封装outK outV
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(up));
        outV.setDownFlow(Long.parseLong(down));
        outV.setSumFlow();

        //5 写出outK outV
        context.write(outK, outV);
    }
}

Reducer
import com.hadoop01.entity.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outV = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        long totalUp = 0;
        long totalDown = 0;

        //1 遍历values,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            totalUp += flowBean.getUpFlow();
            totalDown += flowBean.getDownFlow();
        }

        //2 封装outKV
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();

        //3 写出outK outV
        context.write(key,outV);

    }

}

Driver
import com.hadoop01.entity.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @Author wukai
 * @Date 2021/12/10
 **/
public class FlowDriver {
    /**
     * hadoop地址
     */
    private static final String HDFSPATH = "hdfs://hadoop03:8020";
    /**
     * 输入路径
     */
    private static final String INPUTPATH = "hdfs://hadoop03:8020/flowInputData";
    /**
     * 输出路径
     */
    private static final String OUTPATH = "hdfs://hadoop03:8020/flowOutData";
    /**
     * hadoop用户名 key
     */
    private static final String HADOOPUSERNAMEKEY = "HADOOP_USER_NAME";
    /**
     * hadoop用户名 value
     */
    private static final String HADOOPUSERNAMEVALUE = "root";


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //设置hadoop用户名
        System.setProperty(HADOOPUSERNAMEKEY,HADOOPUSERNAMEVALUE);

        // 1 获取配置信息以及获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 关联本Driver程序的jar
        job.setJarByClass(FlowDriver.class);

        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileSystem fs = FileSystem.get(new URI(HDFSPATH), conf, HADOOPUSERNAMEVALUE);
        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPATH));

        if(fs.exists(new Path(OUTPATH))){
            fs.delete(new Path(OUTPATH),true);
        }
        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

MapReduce - 图7

MapReduce框架原理

InputFormat数据输入

切片与MapTask并行度决定机制

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度

MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask

MapReduce执行流程

Map阶段执行过程

* 1. 第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)
* 2. 第二阶段是对切片中的数据按照一定的规则解析成对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat)
* 3. 第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个,调用一次map方法。每次调用map方法会输出零个或多个键值对。
* 4. 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。
* 5. 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中。
* 6. 第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。

Redue阶段执行过程

* 第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。
* 第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
* 第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。

数据清洗

“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序