在上一篇我们的一个案例中,我们对出现单词的各个统计进行了mapreduce的入门学习,其中重要的两个步骤就是对map阶段和reduce阶段进行了代码的编写。但是在shuffle阶段使用了默认的分区、排序、规约、分组。在我们工作当中应该更多的是将不同的分区交给不同的reduce,从而产生不同的结果。
在MapReduce中,通过我们指定的分区,会将同一个分区的数据发送到同一个reduce当中去处理。例如:为了数据的统计,可以把一批类似的数据发送到同一个reduce当中,在同一个reduce当中统计相同类型的数据,就可以实现类似的数据分区和统计等。其实就死相同的类型数据,有共性的数据送到一起出来,reduce当中默认的分区只有一个。
单词统计任务拓展
还是跟上篇案例类似,假设有个a.txt文件300M,在单词统计的基础上要求对单词长度>=5放到一个结果集,<5放到另外一个结果集。此时的reduceTask就不是下图这个了,而是存在两个reduceTask,也就是说同一个分区的数据会放到同一个reduce,同一个reduce会产生一个文件
代码实现
新增PartitionerOwn类
要实现分区必须要重写Partitioner,然后自定义我们的分区规则
package com.dongnaoedu.network.hadoop.mapreduce.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionerOwn extends Partitioner<Text, LongWritable>{
/**
* text:表示k2
* longWritable:表示v2
* i:reduce个数
*/
@Override
public int getPartition(Text text,LongWritable longWritable,int i) {
// 如果单词的长度>=5进入第一个分区==>第一个reduceTask==>reduce编号是0
if(text.toString().length() >= 5){
return 0;
}else{
// 如果长度<5,进入第二个分区==>第二个reduceTask==>reduce编号是1
return 1;
}
}
}
主方法设置分区代码
package com.dongnaoedu.network.hadoop.mapreduce.partition;
import com.dongnaoedu.network.hadoop.mapreduce.WordCountMapper;
import com.dongnaoedu.network.hadoop.mapreduce.WordCountReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool{
@Override
public int run(String[] strings) throws Exception{
Job job = Job.getInstance(super.getConf(), "mapReduce_wordCount");
//打包放在集群下运行,需要做一个配置
job.setJarByClass(JobMain.class);
// 第一步:设置读取文件的类:k1 v1
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8082/wordcount"));
// 第二步:设置mapper类
job.setMapperClass(WordCountMapper.class);
// 设置map阶段输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 第三 四 五 六步采用默认方式(分区 排序 规约 分组)
// 设置我们的分区类
job.setPartitionerClass(PartitionerOwn.class);
// 第七步:设置Reduce类
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置reduce的个数j
job.setNumReduceTasks(2);
// 第八步:设置输出类
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径
TextOutputFormat.setOutputPath(job, new Path("hafs://node01:8082/wordcount_cout"));
boolean b = job.waitForCompletion(true);
return b ? 0:1;
}
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
// 启动一个任务 run=0成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
运行结果
查看我们的hdfs结果,会发现有两个文件,编号为0的就是我们长度>=5(第一个分区),编号为1的就是我们长度<5的(第二个分区)