MapReduce 开发基本流程

  1. 编写 mapper 的代码
  2. 编写 reduce 的代码
  3. 把代码放置在Hadoop中运行

    1.编写Mapper类

  4. 新建一个mapper 继承 hadoop.mapreduce 中的 Mapper类

  5. 实现Mapper父类中的 setup() 方法和 map() 方法 ```java MapReduce mapper 怎么实现 /**

    • @version : 1.0
    • Mapper
    • 它的这个Mapper让你去定义四个泛型,为什么mapper里面需要四个泛型
    • 其实读文本文件的操作不用你来实现,框架已经帮你实现了,框架可以读这个文件
    • 然后每读一行,就会发给你这个map,让你去运行一次,所以它读一行是不是把数据传给你,
    • 那他传给map的时候,这个数据就意味着类型的一个协议,我以什么类型的数据给你,我是不是得事先定好啊
    • map接收的数据类型得和框架给他的数据类型一致,不然的话就会出现类型转换异常
    • 所以map里面得定数据类型,前面两个是map拿数据的类型,拿数据是以什么类型拿的,那么框架就是以这个类型传给你
    • 另外两个泛型是map的输出数据类型,即reduce也得有4个泛型,前面两个是reduce拿数据的泛型得和map输出的泛型类型一致
    • 剩下两个是reduce再输出的结果时的两个数据类型 / /
    • 4个泛型,前两个是指定mapper端输入数据的类型,为什么呢,mapper和reducer都一样
    • 拿数据,输出数据都是以的形式进行的—那么key,value都分别有一个数据类型
    • KEYIN:输入的key的类型
    • VALUEIN:输入的value的类型
    • KEYOUT:输出的key的数据类型
    • VALUEOUT:输出的value的数据累心
    • map reduce的数据输入输出都是以key,value对封装的
    • 至于输入的key,value形式我们是不能控制的,是框架传给我们的,
    • 框架传给我们是什么类型,我们这里就写什么数据类型
    • 默认情况下框架传给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,
    • 因为我们的框架是读一行就调用一次我们的偏移量
    • 那么就把一行的起始偏移量作为key,这一行的内容作为value
    • 那么输出端的数据类型是什么,由于我们输出的数
    • 那么它们的数据类型就显而易见了
    • 初步定义为:
    • Mapper
    • 但是不管是Long还是String,在MapReduce里面运行的时候,这个数据读到网络里面进行传递
    • 即各个节点之间会进行传递,那么要在网络里面传输,那么就意味着这个数据得序列化
    • Long、String对象,内存对象走网络都得序列化,Long、String,int序列化
    • 如果自己实现Serializable接口,那么附加的信息太多了
    • hadoop实现了自己的一套序列化机制
    • 所以就不要用Java里面的数据类型了,而是用它自己的封装一套数据类型
    • 这样就有助于提高效率,实现了自己的序列化接口
    • 在序列化传输的 时候走的就是自己的序列化方法来传递,少了很多负载信息,传递数据精简,
    • Long—-LongWritable
    • String也有自己的封装-Text
    • int—IntWritable */ public class WCMapper extends Mapper { // MapReduce框架每读一次数据,就会调用一次该方法 @Override protected void map(LongWritable key, Text value, Context context)

      1. throws IOException, InterruptedException {

      //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法参数中 //key—这一行数据的其实偏移量 value—这一行数据的文本内容 //1.先把单词拿出来,拿到一行 String line = value.toString(); //2.切分单词,这个是按照特定的分隔符 进行切分 String [] words = line.split(“ “); //3.把里面的单词发送出去 /*

      * 怎么发出去呢?我都不知道reduce在哪里运行
      * 其实呢,这个不用我们关心
      * 你只要把你的东西给那个工具就可以了
      * 剩下的就给那个框架去做
      * 那个工具在哪-----context
      * 它把那个工具放到那个context里面去了,即输出的工具
      * 所以你只要输出到context里面就行了
      * 剩下的具体往哪里走,是context的事情
      */
      

      //遍历单词数组,输出为形式 key是单词,value是1 for (String word : words) {

         //记得把key和value继续封装起来,即下面
         context.write(new Text(word), new IntWritable(1));
      

      } /*

      * map方法的执行频率:每读一行就调一次
      * 最后到reduce 的时候,应该是把某个单词里面所有的1都到,才能处理
      * 而且中间有一个缓存的过程,因为每个map的处理速度都不会完全一致
      * 等那个单词所有的1都到齐了才传给reduce
      */
      

      //每一组key,value都全了,才会去调用一次reduce,reduce直接去处理valuelist //接着就是写Reduce逻辑了

      } }

<a name="Va4Sn"></a>
## 2.编写 Reduce 类
WCReducer.java
```java
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Description: Reducer<br/>
 * Copyright (c) , 2018, xlj <br/>
 * This program is protected by copyright laws. <br/>
 * Program Name:WCReducer.java <br/>
 * 
 * @version : 1.0
 */
/*
 * 类型记得要对应
 */
public class WCReducer extends Reducer<Text, IntWritable, Text, Text> {
    //map处理之后,value传过来的是一个value的集合 
    //框架在map处理完成之后,将所有的KV对保存起来,进行分组,然后传递一个组,调用一次reduce
    //相同的key在一个组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        //遍历valuelist,进行了累加
        int count = 0;
        for (IntWritable value : values) {
            //get()方法就能拿到里面的值
            count += value.get();
        }
        //输出一组(一个单词)的统计结果
        //默认输出到HDFS的一个文件上面去,放在HDFS的某个目录下
        context.write(key, new Text(count+""));
        //但是还差一个描述类:用来描述整个逻辑

        /*
         * Map,Reducce都是个分散的,那集群运行的时候不知道运行哪些MapReduce
         * 
         * 处理业务逻辑的一个整体,叫做job
         * 我们就可以把那个job告诉那个集群,我们此次运行的是哪个job,
         * job里面用的哪个作为Mapper,哪个业务作为Reducer,我们得指定
         * 
         * 所以还得写一个类用来描述处理业务逻辑
         * 把一个特定的业务处理逻辑叫做一个job(作业),我们就可以把这个job告诉那个集群,
         * 
         */
    }
}
  1. 编写主执行方法 WCRunner ```java import java.io.IOException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

  • Description: 用来描述一个特定的作业
  • Copyright (c) , 2018, xlj
  • This program is protected by copyright laws.
  • Program Name:WCRunner.java
  • 该作业使用哪个类作为逻辑处理的map
  • 哪个作为reduce
  • 还可以指定该作业要处理的数据所在的路径
  • 还可以指定该作业输出的结果放到哪个路径
  • @version : 1.0 */

public class WCRunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //首先要描述一个作业,这些信息是挺多的,哪个是map,哪个是reduce,输入输出路径在哪 //一般来说这么多信息,就可以把它封装在一个对象里面,那么这个对象呢就是 ——Job对象 Job job = Job.getInstance(new Configuration());

    //job用哪个类作为Mapper 指定输入输出数据类型是什么
    job.setMapperClass(WCMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    //job用哪个类作为Reducer 指定数据输入输出类型是什么
    job.setReducerClass(WCReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    //指定原始数据存放在哪里
    //参数1:里面是对哪个参数进行指定
    //参数2:文件在哪个路径下,这个路径下的所有文件都会去读的
    FileInputFormat.setInputPaths(job, new Path("input/data1"));

    //指定处理结果的数据存放路径
    FileOutputFormat.setOutputPath(job, new Path("output1"));

    //提交
    int isok =  job.waitForCompletion(true)?0:-1;
    System.exit(isok);
}

}

```