各类异常汇总
https://www.cnblogs.com/spicy/p/9645792.html
https://blog.csdn.net/diyangxia/article/details/78982867
https://www.cnblogs.com/LeachBlog/p/9968052.html
步骤
继承Mapper,编写map的业务逻辑
继承Reducer,编写reduce的业务逻辑
编写Driver驱动类
Mapper类
package com.plat.mapreduce;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>* KEYIN - 是指框架读取到的数据的key类型* 在默认的读取数据组件InputFormat下,读取的key是一行文本的偏移量,所以key的类型是long类型的* VALUEIN - 是指框架读取到的数据的value类型* 在默认的读取数据组件InputFormat下,读到的value就是一行文本的内容,所以value的类型是String类型的* KEYOUT - 是指用户自定义逻辑方法返回的数据中key的类型,这个是由用户业务逻辑决定的* 在我们单词统计当中,我们输出的是单词数量作为value,所以类型是String* VALUEOUT - 是指用户自定义逻辑方法返回的数据中value的类型,这个是由用户逻辑决定的* 在我们做单词统计当中,我们输出的单词数量作为value,所以这个类型是Integer* String、long是jdk自带的数据类型,序列化效率低,因此hadoop为了提高序列化速度,自己定义了一套数据结构* String --- Text* Integer --- IntWritable* null --- nullWritable*/// 注意引入mapper的包public class WordcountMapper extends Mapper {@Overrideprotected void map(Object key, Object value, Context context) throws IOException, InterruptedException {//key为每行的起始偏移量System.out.println("每行的key值 ============ " + key);//value为每行的数据System.out.println("每行的value值" + value.toString());String line = value.toString();String[] strArr = line.split(" ");for (String str : strArr) {context.write(new Text(str), new IntWritable(1));}}}
Reducer类
package com.plat.mapreduce;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** reduceTask在调用我们的reduce方法* reduceTask应该接收map阶段所有maptask输出的数据中的一部分* (key.hashcode%numReduceTask==本ReduceTask编号)* 接收到的所有kv会按照key进行分组,然后将kv传给reduce****/public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {// key为map阶段输出的key// value为上一阶段相同key输出对应的value集合@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for(IntWritable v :values){count = count + v.get();}context.write(key, new IntWritable(count));}}
编写Driver驱动类
package com.plat.mapreduce;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class WordcountDriver {public static void main(String[] args) throws Exception{Configuration configuration = new Configuration();//如果不做任何配置,默认采用本地文件系统及本地yarn//采用指定的hdfs文件系统configuration.set("fs.defaultFS", "hdfs://sunjz-01:9000");//configuration.set("mapreduce.framework.name", "yarn");//configuration.set("yarn.resourcemanager.hostname", "sunjz-02");//configuration.set("mapreduce.app-submission.cross-platform", "true");Job job = Job.getInstance(configuration);//告诉程序jar包所在的位置//job.setJar("/root/wordcount.jar"); //手动写死job.setJarByClass(WordcountDriver.class); //自动扫描//告诉框架我们所需的map类和reduce类job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReducer.class);//告诉框架我们程序输出的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//告诉程序我们使用的数据读取组件,结果输出组件是什么job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);//告诉框架我们要处理的数据文件放在哪个路径//FileInputFormat.setInputPaths(job, new Path("D:\\wordcount\\input"));//FileOutputFormat.setOutputPath(job, new Path("D:\\wordcount\\output"));FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));boolean flag = job.waitForCompletion(true);System.exit(flag ? 0 : 1);}}
