WordCount程序实现

目的:在给定的文本文件中,统计输出每一个单词出现的总次数。

整体设计步骤:

Mapper类:

  1. 将MapTask传过来的Text类型文本内容转换成String(因为String的API更丰富,更容易操作)
  2. 根据空格将一整行内容进行split切分成独立的单词
  3. 将单词输出成 key-value 形式:<单词, 1>(key为单词名,value为出现次数)

Reducer类:

  1. 汇总各个key的个数
  2. 输出该key的总次数

Driver类:

  1. 获取配置信息,获取 job 对象实例
  2. 指定本程序的jar包所在的本地路径
  3. 关联 Mapper/Reducer 业务处理类
  4. 指定Mapper输出数据的 key-value 类型
  5. 指定最终输出的数据的 key-value 类型
  6. 指定 job 的输入原始文件所在目录
  7. 指定 job 的输出结果所在目录
  8. 提交作业

编写Demo

搭建环境

环境准备:参考 第08篇 配置Hadoop环境,否则会报错。

环境搭建:

  1. 创建Maven工程,加入依赖:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.hadoop</groupId>
    4. <artifactId>hadoop-client</artifactId>
    5. <version>3.2.3</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>junit</groupId>
    9. <artifactId>junit</artifactId>
    10. <version>4.12</version>
    11. </dependency>
    12. <dependency>
    13. <groupId>org.slf4j</groupId>
    14. <artifactId>slf4j-log4j12</artifactId>
    15. <version>1.7.30</version>
    16. </dependency>
    17. </dependencies>
  1. 配置日志:log4j.properties
    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/hadoop-client.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
    

编写程序

需要注意的是:MapperReducerFileInputFormatFileOutputFormat等类,在Hadoop的依赖中都有两份,

  • 一个是org.apache.hadoop.mapred包中的,适用于 Hadoop 1.x,其中的MapReduce既做数据处理,又做任务调度
  • 一个是org.apache.hadoop.mapreduce.*包中的,适用于 Hadoop 2.x及以上版本,其中的MapReduce只做数据处理,把任务调度交给了Yarn

编写Mapper:

package com.study.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Mapper泛型:
 * 输入的key类型:本程序需要将偏移量当做key,所以是LongWritable类型
 * 输入的value类型:一般都是文本字符串,所以是Text类型
 * 输出的key类型:本程序的Mapper输出的是单词数量,所以key是单词,Text类型
 * 输出的value类型:单词的个数,所以是IntWritable类型
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);  // 因为我们map阶段不聚合,每个单词出现一次就记一个1
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();  // 获取一行信息
        String[] words = line.split(" ");// 拆分一行内容中的单词

        for (String word : words) {
            outKey.set(word);  // 将word转换成Text类型
            // 将 key-value 输出到 context 中,供后面的Reducer使用
            context.write(outKey, outValue);   // xxx单词出现了1次
        }
    }
}

编写Reducer:

package com.study.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Reducer泛型:
 * 输入的key类型:单词字符串,即Mapper的输出的key类型
 * 输入的value类型:单词出现次数(因为map没有聚合,所以每个value都是1)即mapper的输出的value类型
 * 输出的key类型:单词,所以是Text类型
 * 输出的value类型:汇总的单词个数,所以是IntWritable类型
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    IntWritable outValue = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();  // value是IntWritable类型,需要调用get()进行类型转换
        }
        outValue.set(sum);
        context.write(key, outValue);
    }
}

编写Driver:

package com.study.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 获取job
        Configuration config = new Configuration();
        Job job = Job.getInstance(config);

        // 2.设置 jar 路径
        job.setJarByClass(WordCountDriver.class); // 可以直接通过当前类的全类名反射获取到jar包路径

        // 3.关联mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4.设置map输出的key、value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5.设置最终输出(最终输出不一定是Reducer输出)的key、value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6.设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("/app/WordCount/input"));  // 本地文件路径,可以输入多个
        FileOutputFormat.setOutputPath(job, new Path("/app/WordCount/output/output2"));  // 本地文件路径(需要是一个不存在的文件夹,否则会报错目录已存在)

        // 可以调用 job.submit() 提交作业
        // 但是为了调试,可以调用waitForCompletion,传入一个true,让程序输出监控信息
        // waitForCompletion内部也是调用了 job.submit()
        boolean success = job.waitForCompletion(true);

        System.exit(success ? 0 : 1);  // 程序退出
    }
}

此时的 MapReduce 任务是在本地运行的,所以不需要连接我们的 HDFS 集群。

程序运行后,便可以在指定的输出结果路径下生成结果文件。

输出的结果文件

MapReduce程序执行完成之后,输出的结果文件有以下几种:

  • part-r-00000:中间的r表示这个是由ReduceTask生成的结果。如果程序只有MapTask,没有ReduceTask(ReduceTask数量设置为0),则由MapTask生成结果文件为 part-m-00000
  • .part-r-00000.crc:生成结果的crc校验文件
  • part-r-00001:如果ReduceTask数量设置的大于1,产生多个结果文件,则会依次命名为 0号、1号、2号…..。也会相应的产生该文件的crc校验文件
  • _SUCCESS:执行成功的标志文件
  • ._SUCCESS.crc:生成标志文件的crc校验文件

在Hadoop集群上运行

上面编写的程序是在本地运行的,实际中的程序应该在Hadoop集群上运行,输入的文件也应该是 hdfs 中的文件。

所以需要将程序打包,将输入路径、输出路径变为可配置参数,上传到Hadoop集群,使用hadoop jar xxx.jar命令运行。

将Driver中的输入路径输出路径改为参数

因为 hadoop 集群中的目录和本地路径不一致,所以需要将输入文件路径和结果输出路径修改为args参数形式传入:

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

打成jar包

如果想要把程序放到服务器上运行,还需要打包成jar包。

在 pom 中加入打包的插件:

<build>
    <plugins>
        <!-- 仅打包程序,不打包依赖 -->
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>

        <!-- 打出来带有以来的jar包 -->
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

执行package后,会在target下打出来两个jar包:

  • MapReduceDemo-1.0-SNAPSHOT.jarmaven-compiler-plugin插件打出来的不带依赖的jar包
  • MapReduceDemo-1.0-SNAPSHOT-jar-with-dependencies.jarmaven-assembly-plugin插件打出来的带有依赖的jar包

打成jar包之后,先在本地测试一下:

hadoop jar MapReduceDemo-1.0-SNAPSHOT.jar com.study.mapreduce.wordcount.WordCountDriver

将jar包上传hadoop集群服务器

我们在Idea开发时:因为本地的Hadoop没有修改默认配置,mapred-default.xml中的mapreduce.framework.name使用的默认值local,所以使用的本地运行模式。而且 fs.defaultFS 默认值是本地Linux路径,所以输入路径、输出路径如果不添加协议前缀(file://hdfs://),默认用的都是本地路径。

而我们的hadoop102在搭建完全分布式机集群时:在mapred-site.xml中将mapreduce.framework.name修改成了yarn,所以如果打成jar包在hadoop102上运行时,使用的是Yarn运行的,而且 fs.defaultFS改成了hdfs地址,所以输入路径、输出路径如果不加协议前缀,默认都应该配置成 hdfs 路径,而不是本地路径。

因为我们要在hadoop集群服务器上运行,而hadoop集群服务器上已经安装了hadoop,里面带有hadoop客户端依赖,所以我们只需要上传不带依赖的jar包就行。

MapReduceDemo-1.0-SNAPSHOT.jar 上传到hadoop服务器(路径无所谓),执行:

# hadoop jar <jar包名称> <主启动类> <hdfs输入文件路径> <hdfs结果输出路径(该路径不能存在,否则会报错路径已存在,程序执行时会自动创建该路径)>
hadoop jar MapReduceDemo-1.0-SNAPSHOT.jar com.study.mapreduce.wordcount.WordCountDriver /input/ /output/

实际开发中,一般就是像上面步骤一样:

  1. 在Windows上搭建一个Hadoop环境,然后在Windows上开发好程序代码
  2. 在Windows上测试没有问题了,就打成jar包,上传到Hadoop集群服务器
  3. 然后在服务器上执行命令运行

但是实际中要在Hadoop上运行的程序可能有很多很多,此时就需要编写一个脚本,然后交给Azkaban(阿兹卡班)等Hadoop批处理调度器进行总调度