场景说明: 统计HDFS中文本文件中每个单词出现的次数,并输出到结果文件中

示例代码:https://gitee.com/fanyc-big-data/hadoop-word-count

新建Maven工程

新建普通的maven工程

  1. <groupId>cn.faury.demo.hadoop.word.count</groupId>
  2. <artifactId>hadoop-word-count</artifactId>
  3. <version>1.0-SNAPSHOT</version>

pom文件基础配置

  1. 配置maven属性

     <properties>
         <java.version>1.8</java.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
    
  2. 配置阿里云仓库

         <!-- pull私服配置 -->
     <repositories>
         <!--<repository>-->
             <!--<id>nexus</id>-->
             <!--<name>zbiti nexus</name>-->
             <!--<url>http://192.168.103.223:8081/repository/maven-public/</url>-->
             <!--<releases><enabled>true</enabled></releases>-->
             <!--<snapshots><enabled>true</enabled></snapshots>-->
         <!--</repository>-->
         <repository>
             <id>public</id>
             <name>aliyun nexus</name>
             <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
             <releases>
                 <enabled>true</enabled>
             </releases>
         </repository>
     </repositories>
     <pluginRepositories>
         <!--<pluginRepository>-->
             <!--<id>nexus</id>-->
             <!--<name>zbiti nexus</name>-->
             <!--<url>http://192.168.103.223:8081/repository/maven-public/</url>-->
             <!--<releases><enabled>true</enabled></releases>-->
             <!--<snapshots><enabled>true</enabled></snapshots>-->
         <!--</pluginRepository>-->
         <pluginRepository>
             <id>public</id>
             <name>aliyun nexus</name>
             <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
             <releases>
                 <enabled>true</enabled>
             </releases>
             <snapshots>
                 <enabled>false</enabled>
             </snapshots>
         </pluginRepository>
     </pluginRepositories>
    

添加Hadoop依赖

配置hadoop-client依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.1</version>
            <!--Hadoop集群中有-->
            <scope>provided</scope>
        </dependency>
    </dependencies>

创建单词计数Mapper类

  1. 新建类:cn.faury.demo.hadoop.word.count.WordCountMapper
  2. 继承Hadoop的特定Mapper类,4个泛型参数依次为:
    1. KEYIN:LongWritable,表示输入的文件行号
    2. VALUEIN:Text,表示输入的文件对应行的内容
    3. KEYOUT:Text,表示统计输出的单词
    4. VALUEOUT:LongWritable,表示统计输出的单词出现的次数
  3. 重载map方法,map方法的参数依次为:
    1. LongWritable key:输入文件的行号
    2. Text value:输入文件对应行的内容
    3. Context context:Hadoop上下文
  4. 实现map方法
    1. 先将输入的每行内容按照空格进行分割,得到字符串数组
    2. 按照Hadoop的Map-Reduce格式输出每个单词出现1次到上下文中
  5. 完整代码如下所示 ```java package cn.faury.demo.hadoop.word.count;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

  • 自定义Mapper类:
  • K1:行号
  • V2:每行的内容
  • K2:单词
  • V2:次数 *
  • @author Fanyc
  • @date 2020/2/3 15:29 / public class WordCountMapper extends Mapper { /*
    • 实现Map函数 *
    • @param key 输入的k1,每行的行号
    • @param value 输入的v1,每行的内容
    • @param context 上下文
    • @throws IOException 异常
    • @throws InterruptedException 异常 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 把每行内容按照空格进行切分 String[] words = value.toString().split(“ “); // 迭代切割出来的单词数据 for (String word : words) {
       Text k2 = new Text(word);
       LongWritable v2 = new LongWritable(1L);
       // 输出结果
       context.write(k2, v2);
      
      } } } ``` 注意:父类为org.apache.hadoop.mapreduce.Mapper类

创建单词计数器Reducer类

  1. 新建类:cn.faury.demo.hadoop.word.count.WordCountReducer
  2. 继承Hadoop的Reducer类,4个泛型参数依次为:
    1. KEYIN:Text,表示Map阶段输出的单词
    2. VALUEIN:LongWritable,表示Map阶段输出的单词出现的次数
    3. KEYOUT:Text,表示Reduce之后输出的单词
    4. VALUEOUT:LongWritable,表示Reduce之后输出的单词所有出现的次数
  3. 重载reduce方法,reduce方法的参数依次为:
    1. Text key:Map阶段输出的单词
    2. Iterable values:Map阶段输出的单词出现的次数,按照同单词聚合成数组
    3. Context context:Hadoop上下文
  4. 实现reduce方法
    1. 将不同的map输出的单词次数进行累加
    2. 输出单词对应的累加后出现总次数到Hadoop上下文中
  5. 完整代码如下所示 ```java package cn.faury.demo.hadoop.word.count;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**

  • 自定义Reduce类 *
  • @author Fanyc
  • @date 2020/2/3 15:30 / public class WordCountReducer extends Reducer { /*
    • 分组后的数据 *
    • @param key 输入的K2,单词
    • @param values 输入的V2,每个单词在map阶段出现的次数列表
    • @param context 上下文
    • @throws IOException 异常
    • @throws InterruptedException 异常 */ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 累加同一个key的次数 Long sum = 0L; for (LongWritable value : values) {
       sum = sum + value.get();
      
      } // 组装结果 Text k3 = key; LongWritable v3 = new LongWritable(sum); // 输出结果 context.write(k3, v3); } } ``` 注意:父类为org.apache.hadoop.mapreduce.Reducer

创建单词计数Job类

  1. 新建类:cn.faury.demo.hadoop.word.count.WordCountJob
  2. 创建main方法用于配置和执行job
  3. main函数需要2个参数:

    1. 参数1:用于指定统计的文件文件夹路径
    2. 参数2:用于指定输出统计结果文件夹路径
      // 至少两个参数,参数1:输入文件路径,参数2:输出文件路径
      if (args == null || args.length < 2) {
      System.exit(0);
      }
      
  4. 获取默认Job配置对象:

    // job的配置参数
    Configuration configuration = new Configuration();
    

    注意:Configuration类为org.apache.hadoop.conf.Configuration

  5. 创建Job实例:

    // 创建一个Job
    Job job = Job.getInstance(configuration);
    

    注意:Job类为org.apache.hadoop.mapreduce.Job

  6. 配置Job运行的类

    // 指定Job类
    job.setJarByClass(WordCountJob.class);
    
  7. 配置输入输出路径

    // 指定输入路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    // 指定输出路径
    FileOutputFormat.setOutputPath(job,new Path(args[1]));
    

    注意:

  • FileInputFormat类为org.apache.hadoop.mapreduce.lib.input.FileInputFormat
  • FileOutputFormat类为org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
  1. 配置Map信息

    // 指定Map类
    job.setMapperClass(WordCountMapper.class);
    // 指定K2的类型
    job.setMapOutputKeyClass(Text.class);
    // 指定V2的类型
    job.setMapOutputValueClass(LongWritable.class);
    

    注意:此处的WordCountMapper为前面自定义的Mapper类,key、value的类型要与类中一致

  2. 配置Reducer信息

    // 指定Reducer类
    job.setReducerClass(WordCountReducer.class);
    // 指定输出K3的类型
    job.setOutputKeyClass(Text.class);
    // 指定输出V3的类型
    job.setOutputValueClass(LongWritable.class);
    

    注意:此处的WordCountReducer为前面自定义的Reducer类,key、value类型要与类中一致

  3. 提交Job到Hadoop执行

    // 提交job
    job.waitForCompletion(true);
    
  4. 完整代码如下所示 ```java package cn.faury.demo.hadoop.word.count;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**

  • 单词计数
  • 需求:读取hdfs上的test.txt文件,计算文件中每个单词出现的次数
  • *
  • @author Fanyc
  • @date 2020/2/3 11:25 */ public class WordCountJob {

    /**

    • 组装Job = Mapper + Reduce */ public static void main(String[] args) { // 至少两个参数,参数1:输入文件路径,参数2:输出文件路径 if (args == null || args.length < 2) {

       System.exit(0);
      

      }

      // job的配置参数 Configuration configuration = new Configuration(); try {

       // 创建一个Job
       Job job = Job.getInstance(configuration);
       // 指定Job类
       job.setJarByClass(WordCountJob.class);
      
       // 指定输入路径
       FileInputFormat.setInputPaths(job, new Path(args[0]));
       // 指定输出路径
       FileOutputFormat.setOutputPath(job,new Path(args[1]));
      
       // 指定Map类
       job.setMapperClass(WordCountMapper.class);
       // 指定K2的类型
       job.setMapOutputKeyClass(Text.class);
       // 指定V2的类型
       job.setMapOutputValueClass(LongWritable.class);
      
       // 指定Reducer类
       job.setReducerClass(WordCountReducer.class);
       // 指定输出K3的类型
       job.setOutputKeyClass(Text.class);
       // 指定输出V3的类型
       job.setOutputValueClass(LongWritable.class);
      
       // 提交job
       job.waitForCompletion(true);
      

      } catch (IOException | InterruptedException | ClassNotFoundException e) {

       e.printStackTrace();
      

      } } } ```

配置打包插件

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.7.0</version>
            <configuration>
                <source>${java.version}</source>
                <target>${java.version}</target>
                <encoding>${project.build.sourceEncoding}</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.2.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <!--配置生成源码包-->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-source-plugin</artifactId>
            <version>3.1.0</version>
            <executions>
                <execution>
                    <id>attach-sources</id>
                    <goals>
                        <goal>jar</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

打包jar

  1. 在Maven Projects面板中,选择clean+compile+package,再执行打包

    image.png

  2. 在工程目录下target中找到打包好的jar包

image.png
注:如果有第三方依赖的使用下面的dependencies.jar

测试程序

  1. 上传jar包到Hadoop集群某个节点或者客户端上
  2. 编辑测试文件并上传到HDFS中

    [root@hadoop101 soft]# cat test.txt 
    hello fanyc
    hello china
    hello world
    

    成功上传测试文件

    [root@hadoop101 soft]# hdfs dfs -ls /
    Found 2 items
    drwxr-xr-x   - root supergroup          0 2020-02-03 10:37 /data
    -rw-r--r--   2 root supergroup         36 2020-02-03 15:07 /test.txt
    
  3. 执行Job程序

    hadoop jar ${jar文件} ${Job类全路径} ${输入文件} ${输出目录}

[root@hadoop103 jars]# hadoop jar hadoop-word-count-1.0-SNAPSHOT.jar cn.faury.demo.hadoop.word.count.WordCountJob /test.txt /out
2020-02-03 15:07:56,013 INFO client.RMProxy: Connecting to ResourceManager at hadoop101/10.150.8.101:8032
......
2020-02-03 15:07:58,932 INFO mapreduce.Job: Running job: job_1580693126708_0001
2020-02-03 15:08:10,395 INFO mapreduce.Job: Job job_1580693126708_0001 running in uber mode : false
2020-02-03 15:08:10,396 INFO mapreduce.Job:  map 0% reduce 0%
2020-02-03 15:08:18,709 INFO mapreduce.Job:  map 100% reduce 0%
2020-02-03 15:08:26,929 INFO mapreduce.Job:  map 100% reduce 100%
2020-02-03 15:08:26,944 INFO mapreduce.Job: Job job_1580693126708_0001 completed successfully
2020-02-03 15:08:27,046 INFO mapreduce.Job: Counters: 54
......
  1. 再次查看HDFS目录

    [root@hadoop101 soft]# hdfs dfs -ls /
    Found 4 items
    drwxr-xr-x   - root supergroup          0 2020-02-03 10:37 /data
    drwxr-xr-x   - root supergroup          0 2020-02-03 15:08 /out
    -rw-r--r--   2 root supergroup         36 2020-02-03 15:07 /test.txt
    drwx------   - root supergroup          0 2020-02-03 15:07 /tmp
    

    其中:/out为计算结果输出目录

  2. 查看/out目录结果

    [root@hadoop101 soft]# hdfs dfs -ls /out
    Found 2 items
    -rw-r--r--   2 root supergroup          0 2020-02-03 15:08 /out/_SUCCESS
    -rw-r--r--   2 root supergroup         32 2020-02-03 15:08 /out/part-r-00000
    
  3. 查看统计结果

    [root@hadoop101 soft]# hdfs dfs -cat /out/part-r-00000
    2020-02-03 15:09:04,764 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
    china    1
    fanyc    1
    hello    3
    world    1
    

    其中:第2行为安全提示信息,3-6为统计结果


至此,完成了Hadoop的单词统计程序示例。