WordCountMapper
package com.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* KEYIN:行首的偏移量,LongWritable(相当于java的long类型)
* VALUEIN:分片后的文本内容,Text类型(相当于java中的string)
* KEYOUT:输出的key,为一个单词(Text类型)
* VALUEOUT:输出的value(都是1,记作{"word",1},数字类型IntWritable(java中为int))
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outkey =new Text();
private IntWritable outvalue=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、把输入的文本(value)分割成多个单词
String data=value.toString();
//正则表达式:逗号、句号、空格
String words[]=data.split("[,\\.\\s]+");
//2、遍历这些单词,将每一个单词都转换成(key,value)
for(String word:words) {
outkey.set(word);
context.write(outkey, outvalue);
}
}
}
WordCountReducer
package com.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* KEYIN:reduse输入key(是一个单词,Text)
* VALUEIN:reduse输入value(是单词的次数IntWritable)
* KEYOUT:reduse输出的key(一个单词,Text)
* VALUEOUT:reduse输出的value(单词次数,IntWritable)
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outvalue = new IntWritable();
/*
* key,{value1,value2,...,valueN}
* THOUGHTS,{1,1}
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum=0;
for(IntWritable value:values) {
sum+=value.get();
}
outvalue.set(sum);
context.write(key, outvalue);
}
}
WordCountDriver
package com.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
// hadoop jar xxx.jar demo.wordcount /input /output
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.创建job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar路径
job.setJarByClass(WordCountDriver.class);
// 3.关联map与red
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的键值对类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终数据输出键值对类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入路径(FileInputFormat)和输出路径(FileOutputFormat)
FileInputFormat.setInputPaths(job, new Path("D:\\eclipse\\HdfsDemo\\input");
FileOutputFormat.setOutputPath(job,new Path("D:\\eclipse\\HdfsDemo\\output");
//FileInputFormat.setInputPaths(job, new Path(args[0]));
//FileOutputFormat.setOutputPath(job,new Path(args[1])); //输出路径必须是不存在的
// 7.提交job
boolean result = job.waitForCompletion(true);// true:打印运行信息
System.exit(result ? 0 : 1); // 0:正常退出 1:非正常退出
}
}
//本地测试
//查看运行结果
hadoop fs -cat /output/part-r-00000
//Hadoop测试
hadoop jar hdp.jar /wordcount /output