上一篇文章我们已经对shuffle阶段学习了分区和排序,现在来学习下规约。每一个map都可能善生大量的输出,conbiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高IO性能,是mapReduce的一种优化手段之一。就是会把相同的键值进行合并,这就是所谓的规约
- combiner是MR程序中Mapper和Reducer之外的一个组件
- combiner组件的父类就是Reducer
combiner和reducer的区别在于运行的位置
- combiner是在每一个mapTask所在的节点运行
2. Reducer是接收全局所有Mapper的输出结果
- combiner是在每一个mapTask所在的节点运行
combiner的意义就是对每一个mapTask的输出进行局部汇总,减少网络IO传输
实现步骤
- 自定义一个combiner集成Reducer,重写reduce方法
- 在job设置job.setCombinerClass(CustomCombiner.class)
combiner能够应用的前提是不能影响最终的业务逻辑,而且combiner的输出kv应该跟reducer的输入kv类型要对应起来.
案例
还是拿我们统计单词出现的次数举例:案例示例地址
我们要做的仅仅是在map到reduce阶段增加一个规约处理,以此来减少网络传输量,来提高执行效率。
代码实现
WordCountMapper 不做改变
package com.dongnaoedu.network.hadoop.mapreduce.combiner;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {@Overridepublic void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{String line = value.toString();String[] split = line.split(",");for(String word : split){context.write(new Text(word), new LongWritable(1));}}}
增加的MyCombiner
package com.dongnaoedu.network.hadoop.mapreduce.combiner;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author heian* @date 2022/3/24 12:25 下午* @description 规约*/public class MyCombiner extends Reducer<Text, LongWritable, Text,LongWritable> {/*** 自定义我们的reduce逻辑* 所有key都是我们的单词,所有的values都是我们单词出现的次数*/@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,InterruptedException{long count = 0;for(LongWritable value : values){count += value.get();}context.write(key,new LongWritable(count));}}
SortReducer 不做改变
package com.dongnaoedu.network.hadoop.mapreduce.combiner;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReduce extends Reducer<Text,LongWritable, Text,LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,InterruptedException{long count = 0;// 此时集合经过规约处理,元素只有一个而已for(LongWritable value : values){count += value.get();}context.write(key,new LongWritable(count));}}
设置JobMain的规约类
package com.dongnaoedu.network.hadoop.mapreduce.combiner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class JobMain extends Configured implements Tool{@Overridepublic int run(String[] strings) throws Exception{Job job = Job.getInstance(super.getConf(), "mapReduce_combiner");//打包放在集群下运行,需要做一个配置job.setJarByClass(JobMain.class);// 第一步:设置读取文件的类:k1 v1job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://node01:8082/wordcount"));// 第二步:设置mapper类job.setMapperClass(WordCountMapper.class);// 设置map阶段输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 第三 四 五 六步采用默认方式(分区 排序 规约 分组)job.setCombinerClass(MyCombiner.class);// 第七步:设置Reduce类job.setReducerClass(WordCountReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 第八步:设置输出类job.setOutputFormatClass(TextOutputFormat.class);// 设置输出路径TextOutputFormat.setOutputPath(job, new Path("hafs://node01:8082/wordcount_cout"));boolean b = job.waitForCompletion(true);return b ? 0:1;}public static void main(String[] args) throws Exception{Configuration configuration = new Configuration();// 启动一个任务 run=0成功int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}}
