package com.learn.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* Mapper<输入key类型, 输入value类型, 输出key类型, 输出value类型>
*/
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
for (String s : arr) {
context.write(new Text(s),new IntWritable(1));
}
}
}
/**
* Reducer<Text, IntWritable,Text,IntWritable>
*/
public class MyReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key,new IntWritable(count));
}
}
/**
* Job的配置类
*/
public class WordCount extends Configuration {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String inputFile = "C:\\Users\\pg\\Desktop\\input.txt";
String inputFile2 = "C:\\Users\\pg\\Desktop\\input2.txt";
String outputFile = "C:\\Users\\pg\\Desktop\\out.txt";
//获取Job对象,组装8个步骤所需要的类
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCount");
job.setJarByClass(WordCount.class);
//第一步:设置读取文件的InputFormat,解析成key,value对,k1、v1
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(inputFile,inputFile2));
//第二步:自定义map逻辑,接收kv对进行处理输出新的kv对,k2、v2
job.setMapperClass(MyMapper.class);
//第三步:分区,相同key(k2)的数据发往同一个reduce里面去,key进行合并,value形成一个集合
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(2);//设置reduceTask数量
//第四步:排序,对k2进行排序,按照字典排序
//第五步:规约,combiner过程,调优步骤,可选
job.setCombinerClass(MyReducer.class);
//第六步:分组
job.setGroupingComparatorClass(WritableComparator.class);
//TODO 这四步是有默认的策略
//第七步:自定义reducer逻辑接收k2、v2,转换成新的k3、v3输出
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//第八步:输出k3、v3进行保存
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path(outputFile));
//第九步:结束job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learn</groupId>
<artifactId>HadoopLearn</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.6</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件可以生成一个带依赖和不带依赖的jar包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!--<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!– 主类的全限定名 –>
<mainClass>com.healchow.consumer.Main</mainClass>
</transformer>
</transformers>
</configuration>-->
</execution>
</executions>
</plugin>
<!-- 该插件会把jar放到dependency目录 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<!--<configuration>-->
<!--<outputDirectory>${project.build.directory}/lib-->
<!--</outputDirectory>-->
<!--</configuration>-->
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
日志:
log4j.rootLogger = debug,stdout
### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n