上一篇文章我们已经对shuffle阶段学习了分区和排序,现在来学习下规约。每一个map都可能善生大量的输出,conbiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高IO性能,是mapReduce的一种优化手段之一。就是会把相同的键值进行合并,这就是所谓的规约
- combiner是MR程序中Mapper和Reducer之外的一个组件
- combiner组件的父类就是Reducer
combiner和reducer的区别在于运行的位置
- combiner是在每一个mapTask所在的节点运行
2. Reducer是接收全局所有Mapper的输出结果
- combiner是在每一个mapTask所在的节点运行
combiner的意义就是对每一个mapTask的输出进行局部汇总,减少网络IO传输
实现步骤
- 自定义一个combiner集成Reducer,重写reduce方法
- 在job设置job.setCombinerClass(CustomCombiner.class)
combiner能够应用的前提是不能影响最终的业务逻辑,而且combiner的输出kv应该跟reducer的输入kv类型要对应起来.
案例
还是拿我们统计单词出现的次数举例:案例示例地址
我们要做的仅仅是在map到reduce阶段增加一个规约处理,以此来减少网络传输量,来提高执行效率。
代码实现
WordCountMapper 不做改变
package com.dongnaoedu.network.hadoop.mapreduce.combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
@Override
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
String[] split = line.split(",");
for(String word : split){
context.write(new Text(word), new LongWritable(1));
}
}
}
增加的MyCombiner
package com.dongnaoedu.network.hadoop.mapreduce.combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author heian
* @date 2022/3/24 12:25 下午
* @description 规约
*/
public class MyCombiner extends Reducer<Text, LongWritable, Text,LongWritable> {
/**
* 自定义我们的reduce逻辑
* 所有key都是我们的单词,所有的values都是我们单词出现的次数
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,InterruptedException{
long count = 0;
for(LongWritable value : values){
count += value.get();
}
context.write(key,new LongWritable(count));
}
}
SortReducer 不做改变
package com.dongnaoedu.network.hadoop.mapreduce.combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReduce extends Reducer<Text,LongWritable, Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,InterruptedException{
long count = 0;
// 此时集合经过规约处理,元素只有一个而已
for(LongWritable value : values){
count += value.get();
}
context.write(key,new LongWritable(count));
}
}
设置JobMain的规约类
package com.dongnaoedu.network.hadoop.mapreduce.combiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool{
@Override
public int run(String[] strings) throws Exception{
Job job = Job.getInstance(super.getConf(), "mapReduce_combiner");
//打包放在集群下运行,需要做一个配置
job.setJarByClass(JobMain.class);
// 第一步:设置读取文件的类:k1 v1
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8082/wordcount"));
// 第二步:设置mapper类
job.setMapperClass(WordCountMapper.class);
// 设置map阶段输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 第三 四 五 六步采用默认方式(分区 排序 规约 分组)
job.setCombinerClass(MyCombiner.class);
// 第七步:设置Reduce类
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 第八步:设置输出类
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径
TextOutputFormat.setOutputPath(job, new Path("hafs://node01:8082/wordcount_cout"));
boolean b = job.waitForCompletion(true);
return b ? 0:1;
}
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
// 启动一个任务 run=0成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}