WordCount程序实现
目的:在给定的文本文件中,统计输出每一个单词出现的总次数。
整体设计步骤:
Mapper类:
- 将MapTask传过来的Text类型文本内容转换成String(因为String的API更丰富,更容易操作)
- 根据空格将一整行内容进行split切分成独立的单词
- 将单词输出成 key-value 形式:<单词, 1>(key为单词名,value为出现次数)
Reducer类:
- 汇总各个key的个数
- 输出该key的总次数
Driver类:
- 获取配置信息,获取 job 对象实例
- 指定本程序的jar包所在的本地路径
- 关联 Mapper/Reducer 业务处理类
- 指定Mapper输出数据的 key-value 类型
- 指定最终输出的数据的 key-value 类型
- 指定 job 的输入原始文件所在目录
- 指定 job 的输出结果所在目录
- 提交作业
编写Demo
搭建环境
环境准备:参考 第08篇 配置Hadoop环境,否则会报错。
环境搭建:
创建Maven工程,加入依赖:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
配置日志:log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/hadoop-client.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
编写程序
需要注意的是:Mapper、Reducer、FileInputFormat、FileOutputFormat等类,在Hadoop的依赖中都有两份,
- 一个是org.apache.hadoop.mapred包中的,适用于 Hadoop 1.x,其中的MapReduce既做数据处理,又做任务调度
- 一个是org.apache.hadoop.mapreduce.*包中的,适用于 Hadoop 2.x及以上版本,其中的MapReduce只做数据处理,把任务调度交给了Yarn
编写Mapper:
package com.study.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Mapper泛型:
* 输入的key类型:本程序需要将偏移量当做key,所以是LongWritable类型
* 输入的value类型:一般都是文本字符串,所以是Text类型
* 输出的key类型:本程序的Mapper输出的是单词数量,所以key是单词,Text类型
* 输出的value类型:单词的个数,所以是IntWritable类型
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1); // 因为我们map阶段不聚合,每个单词出现一次就记一个1
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString(); // 获取一行信息
String[] words = line.split(" ");// 拆分一行内容中的单词
for (String word : words) {
outKey.set(word); // 将word转换成Text类型
// 将 key-value 输出到 context 中,供后面的Reducer使用
context.write(outKey, outValue); // xxx单词出现了1次
}
}
}
编写Reducer:
package com.study.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Reducer泛型:
* 输入的key类型:单词字符串,即Mapper的输出的key类型
* 输入的value类型:单词出现次数(因为map没有聚合,所以每个value都是1)即mapper的输出的value类型
* 输出的key类型:单词,所以是Text类型
* 输出的value类型:汇总的单词个数,所以是IntWritable类型
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get(); // value是IntWritable类型,需要调用get()进行类型转换
}
outValue.set(sum);
context.write(key, outValue);
}
}
编写Driver:
package com.study.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 获取job
Configuration config = new Configuration();
Job job = Job.getInstance(config);
// 2.设置 jar 路径
job.setJarByClass(WordCountDriver.class); // 可以直接通过当前类的全类名反射获取到jar包路径
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的key、value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出(最终输出不一定是Reducer输出)的key、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("/app/WordCount/input")); // 本地文件路径,可以输入多个
FileOutputFormat.setOutputPath(job, new Path("/app/WordCount/output/output2")); // 本地文件路径(需要是一个不存在的文件夹,否则会报错目录已存在)
// 可以调用 job.submit() 提交作业
// 但是为了调试,可以调用waitForCompletion,传入一个true,让程序输出监控信息
// waitForCompletion内部也是调用了 job.submit()
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1); // 程序退出
}
}
此时的 MapReduce 任务是在本地运行的,所以不需要连接我们的 HDFS 集群。
程序运行后,便可以在指定的输出结果路径下生成结果文件。
输出的结果文件
MapReduce程序执行完成之后,输出的结果文件有以下几种:
- part-r-00000:中间的r表示这个是由ReduceTask生成的结果。如果程序只有MapTask,没有ReduceTask(ReduceTask数量设置为0),则由MapTask生成结果文件为part-m-00000
- .part-r-00000.crc:生成结果的crc校验文件
- part-r-00001:如果ReduceTask数量设置的大于1,产生多个结果文件,则会依次命名为 0号、1号、2号…..。也会相应的产生该文件的crc校验文件
- _SUCCESS:执行成功的标志文件
-
在Hadoop集群上运行
上面编写的程序是在本地运行的,实际中的程序应该在Hadoop集群上运行,输入的文件也应该是 hdfs 中的文件。
所以需要将程序打包,将输入路径、输出路径变为可配置参数,上传到Hadoop集群,使用hadoop jar xxx.jar命令运行。将Driver中的输入路径输出路径改为参数
因为 hadoop 集群中的目录和本地路径不一致,所以需要将输入文件路径和结果输出路径修改为args参数形式传入:
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
打成jar包
如果想要把程序放到服务器上运行,还需要打包成jar包。
在 pom 中加入打包的插件:<build>
<plugins>
<!-- 仅打包程序,不打包依赖 -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- 打出来带有以来的jar包 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
执行package后,会在target下打出来两个jar包:
MapReduceDemo-1.0-SNAPSHOT.jar:maven-compiler-plugin插件打出来的不带依赖的jar包
- MapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jar:maven-assembly-plugin插件打出来的带有依赖的jar包
打成jar包之后,先在本地测试一下:
hadoop jar MapReduceDemo-1.0-SNAPSHOT.jar com.study.mapreduce.wordcount.WordCountDriver
将jar包上传hadoop集群服务器
我们在Idea开发时:因为本地的Hadoop没有修改默认配置,mapred-default.xml中的mapreduce.framework.name使用的默认值local,所以使用的本地运行模式。而且 fs.defaultFS 默认值是本地Linux路径,所以输入路径、输出路径如果不添加协议前缀(file://、hdfs://),默认用的都是本地路径。
而我们的hadoop102在搭建完全分布式机集群时:在mapred-site.xml中将mapreduce.framework.name修改成了yarn,所以如果打成jar包在hadoop102上运行时,使用的是Yarn运行的,而且 fs.defaultFS改成了hdfs地址,所以输入路径、输出路径如果不加协议前缀,默认都应该配置成 hdfs 路径,而不是本地路径。
因为我们要在hadoop集群服务器上运行,而hadoop集群服务器上已经安装了hadoop,里面带有hadoop客户端依赖,所以我们只需要上传不带依赖的jar包就行。
将MapReduceDemo-1.0-SNAPSHOT.jar上传到hadoop服务器(路径无所谓),执行:
# hadoop jar <jar包名称> <主启动类> <hdfs输入文件路径> <hdfs结果输出路径(该路径不能存在,否则会报错路径已存在,程序执行时会自动创建该路径)>
hadoop jar MapReduceDemo-1.0-SNAPSHOT.jar com.study.mapreduce.wordcount.WordCountDriver /input/ /output/
input为Hadoop上的文件夹路径
实际开发中,一般就是像上面步骤一样:
- 在Windows上搭建一个Hadoop环境,然后在Windows上开发好程序代码
- 在Windows上测试没有问题了,就打成jar包,上传到Hadoop集群服务器
- 然后在服务器上执行命令运行
但是实际中要在Hadoop上运行的程序可能有很多很多,此时就需要编写一个脚本,然后交给Azkaban(阿兹卡班)等Hadoop批处理调度器进行总调度