MapReduce编程规范

MapReduce的开发有八个阶段,其中Map阶段分为2个步骤,Shuffle阶段4个步骤,Reduce阶段分为2个步骤

Map的2个阶段

  1. 设置inputFormat类,将数据切分为k1和v1,输入到第二步
  2. 自定义map逻辑,将第一步的结果转换为另外的k2和v2,对结果进行输出

    Shuffle(洗牌)的4个阶段

  3. 对输出的k2和v2进行分区

  4. 对不同分区的数据按照相同的key排序
  5. (可选)对分组过的数据初步规约,降低数据的网络拷贝
  6. 对数据进行分组,相同的key的value放入到同一个集合

    Reduce的2个阶段

  7. 对多个map任务结果进行排序以及合并,编写reduce函数实现自己的逻辑,对输入的key-value进行处理,转为新的key-value(k3和v3)

  8. 设置outputFaormat处理并保存reduce输出的key-value数据

案例

比如存在一个a.text,里面记录了一些各个单词,通过TextInputFormat组件读每一行数据(无需我们操作,组件自动完成) ,形成一个kv键值对数据,k为每个数据的偏移量,v为我们要记录的每行数据。(完成了第一步骤)
image.png
上述已经拿到k1和v1,现在要自定义有个Mapper类重写map方法,在map方法中将k1和v1转为k2和v2,至于转成什么样的,需要根据我们的业务逻辑。我们假设要统计源文件各个单词出现的次数。将v1读取到的数字按照“逗号”分割,将读取到的每个值变成k2 v2的形式,k2的值即为单词,v2可以理解为固定的值1。(完成了第二步骤)。
image.png
现在到了我们的shuffle阶段,它会陆续的进行分区、排序、规约、分组(java8 stream语法很多类似名词)。最终它会形成新的k2 v2,
image.png
到了reduce阶段,肯定是要形成一个k3和v3,格式肯定是单词作为键值,出现次数肯定是出现的次数了,然后通过OutPutFormat的子类TextOutPutFormat,它会将我们的结果保存成文件
image.png

案例准备工作

  1. 创建一个新的文件

    1. cd /export/servers
    2. vim wordcount.txt
  2. 向其中放入一下内容并保存

    1. helloworld,hadoop
    2. hive,sqoop,flume,hello
    3. kitty,tom,jerry,world
    4. hadoop
  3. 上传到hdfs

    1. hdfs dfs -mkdir /wordcount/ #在根目录创建文件夹
    2. hdfs dfs -put wordcount.txt /wordcount/ #在对应文件夹放入对应文件

    代码阶段

    map阶段方法

    重点注意:Mapper的四个泛型对应着我们的k1 v1和k2 v2的泛型,复写的map方法中的参数key和value是我们k1和v1,然后转成k2和v2传递给context ```java package com.dongnaoedu.network.hadoop.mapreduce;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper { @Override public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] split = line.split(“,”); for(String word : split){ context.write(new Text(word), new LongWritable(1)); } } }

  1. <a name="F0nhH"></a>
  2. ##### reduce方法
  3. 重点注意:Reduce的四个泛型对应着我们的k2 v2和k3 v3的泛型,复写的reduce方法中传递第一个和第二个参数分别是我们的k2和一个集合(记录了v2的值,因为它是经过分组排序后的结果)
  4. ```java
  5. package com.dongnaoedu.network.hadoop.mapreduce;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Reducer;
  9. import java.io.IOException;
  10. public class WordCountReduce extends Reducer<Text,LongWritable, Text,LongWritable> {
  11. /**
  12. * 自定义我们的reduce逻辑
  13. * 所有key都是我们的单词,所有的values都是我们单词出现的次数
  14. */
  15. @Override
  16. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,InterruptedException{
  17. long count = 0;
  18. for(LongWritable value : values){
  19. count += value.get();
  20. }
  21. context.write(key,new LongWritable(count));
  22. }
  23. }

定义主类,描述job并提交job
package com.dongnaoedu.network.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class JobMain extends Configured implements Tool{

    @Override
    public int run(String[] strings) throws Exception{
        Job job = Job.getInstance(super.getConf(), "mapReduce_wordCount");

        //打包放在集群下运行,需要做一个配置
        job.setJarByClass(JobMain.class);

        // 第一步:设置读取文件的类:k1 v1
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://node01:8082/wordcount"));

        // 第二步:设置mapper类
        job.setMapperClass(WordCountMapper.class);
        // 设置map阶段输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 第三 四 五 六步采用默认方式(分区 排序 规约 分组)

        // 第七步:设置Reduce类
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 第八步:设置输出类
        job.setOutputFormatClass(TextOutputFormat.class);
        // 设置输出路径
        TextOutputFormat.setOutputPath(job, new Path("hafs://node01:8082/wordcount_cout"));

        boolean b = job.waitForCompletion(true);

        return b ? 0:1;
    }

    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        // 启动一个任务  run=0成功
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);  
    }

}
  1. 打成jar包运行主程序

打成jar包后上传到集群任意一个节点服务器,使用hadoop命令运启动hadoop jar jar包名 cn.itcast.mapreduce.JobMain,运行后控制台会输出一下信息,并在我们根目录创建wordcount_cout文件夹,点进去就会有我们输出统 计结果文件
image.pngimage.png
image.png
image.png