一、MapReduce概述
1.1 MapReduce定义
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。<br />MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1.2 MapReduce优缺点
1.2.1 优点
1)MapReduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
4)适合PB级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
1.2.2 缺点
1)不擅长实时计算
MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
2)不擅长流式计算
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
3)不擅长DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
1.3 MapReduce核心思想
(1)分布式的运算程序往往需要分成至少2个阶段。
(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
1.4 MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程。
1.5 常用数据序列化类型
| Java类型 | Hadoop Writable类型 |
|---|---|
| Boolean | BooleanWritable |
| Byte | ByteWritable |
| Int | IntWritable |
| Float | FloatWritable |
| Long | LongWritable |
| Double | DoubleWritable |
| String | Text |
| Map | MapWritable |
| Array | ArrayWritable |
| Null | NullWritable |
1.6 MapReduce编程规范
【实例】统计文件中单词出现的次数。
如:
1、文件hello1.txt内容:
Hello hadoop hadoop
Hello world hadoop
Hello mapper hadoop
2、得到输出结果:
Hello 3
hadoop 4
world 1
mapper 1
1.6.1 MapReduce核心思想
1.6.2 编写程序
用户编写的程序分成三个部分:Mapper、Reducer和Driver。
【准备】
创建一个Maven程序,在pom文件中加入hadoop依赖
1、编写Mapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* LongWritable: map输入键值对的键类型,在默认的读取数据组件下,叫InputFormat,
* 它的行为是一行一行读取待处理的数据,读取一行,返回一行给我们的mr程序,
* 在这种情况下,键值是一行的起始偏移量 ,数据的类型是long
* Text: map输入键值对的值类型,在默认的读取数据组件下,值就是读取的这一行内容,数据类型是String
* Text: map输出键值对的键类型,在本案例当中,输出的key是单词,类型是String
* IntWritable: map输出键值对的值类型,在本案例当中,输出的key是单词的次数,类型是Integer
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 定义输出key
Text k = new Text();
// 定义输出value
IntWritable v = new IntWritable(1);
/**
* mapper阶段业务逻辑实现方法,该方法的调用取决于读取数据的组件有没有给Mr传入数据,
* 如果有,每传入一个《k,v》对 该方法就调用一次
* @param key 表示传入的行偏移,类型是LongWritable
* @param value 表示传入的行的值,类型是Text
* @param context上下文,传入reduce程序的中介
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
/*
* 3 输出
*/
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
2、编写Reduce
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Text: reducer输入键值对的键类型,对应mapper的输出key类型,在本例中就是单词 Text
* IntWritable: reducer输入键值对的值类型,对应mapper的输出value类型,在本例中就是单词 次数IntWritable
* Text: reducer输出键值对的键类型,在本例中就是单词 Text
* IntWritable: reducer输出键值对的值类型,在本例中就是单词 次数IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum; // 单词计数
IntWritable v = new IntWritable(); // 输出的值
/**
* reduce接收所有来自map阶段处理的数据之后,
* @param key 按照key的字典序进行排序
* 按照key是否相同(这里是单词相同)作为一组去调用reduce方法
* 本方法的key就是这一组相同kv对的共同key
* 如<hello:1>,<hello:12>,<hello:4>,组成迭代器<hello:[1,12,4]>
* @param values 把这一组的所有v作为一个迭代器传入我们的reduce方法
* @param context 上下文,传入文件的中介
*/
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
/*
* 1 累加求和
* 遍历一组迭代器, 把每一个数量1累加起来就构成一个单词的总次数
*/
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
/*
* 2 输出
*/
v.set(sum);
context.write(key, v);
}
}
3、编写Driver
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
public class WordCountDriver implements Tool{
// 定义hadoop文件服务器路径
public static final String HDFS_PATH = "hdfs://hadoop01:8020";
// 获取配置信息以及获取job对象
private Configuration configuration;
@Override
public Configuration getConf() {
return this.configuration;
}
@Override
public void setConf(Configuration configuration) {
this.configuration = configuration;
}
public WordCountDriver(){
// 获取job对象
this.configuration = new Configuration();
}
@Override
public int run(String[] arg0) throws Exception {
//1.获取Job对象
Job job = Job.getInstance( configuration );
//2.设置Jar包路径
job.setJarByClass( WordCountDriver.class );
//3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置Mapper类&输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置Reducer输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入输出的路径
Path inPath = new Path(HDFS_PATH+"/user/input/wordcount");
FileInputFormat.setInputPaths( job,inPath );
Path outPath = new Path(HDFS_PATH+"/user/output/wordcount");
//如果输出路径存在,删除路径
FileSystem fs = FileSystem.get(new URI(HDFS_PATH), configuration);
if(fs.exists(outPath)){
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath( job,outPath);
//6.提交任务
boolean result = job.waitForCompletion( true );
return result ? 0 : 1;
}
}
4、测试
(1)上传hello.txt到hdfs
(2)运行Driver
import org.apache.hadoop.util.ToolRunner;
import org.jbit.hadoop.mapreduce.wordcount.WordCountDriver;
import org.junit.Test;
/**
* 测试程序
* @author Administrator
*
*/
public class WordCountTest {
@Test
public void driver() throws Exception{
ToolRunner.run(new WordCountDriver(), null);
}
}
1.6.3 编程练习
将file1.txt,file2.txt合并成一个文件
file.txt内容:
1001,20,男
1002,21,女
1003,19,女
1004,21,男
file2.txt内容:
1001,江西南昌
1002,湖南长沙
1003,北京
1004,湖北武汉
合并之后:
1001,江西南昌,男,20
1002,女,21,湖南长沙
1003,北京,女,19
1004,男,21,湖北武汉
1、编写Mapper
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FileJoinMapper extends Mapper<LongWritable, Text, Text, Text>{
//输出key
Text k = new Text();
//输出value
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//读取行,处理汉字乱码
String line = new String(value.getBytes(),0,value.getLength(),"GBK");
//分离行
String[] fields = line.split(",");
/*
* 将每个字段以编号为key存储
*/
for(int i=1; i<fields.length; i++){
k.set(fields[0]);
v.set(fields[i]);
context.write(k, v);
}
}
}
2、编写Reducer
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FileJoinReducer extends Reducer<Text, Text, NullWritable, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
/*
* 从values取出值,拼接成行
*/
String line = key.toString();
for(Text field : values){
line += "," + field;
}
Text v = new Text(line);
context.write(NullWritable.get(), v);
}
}
3、编写Driver
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
public class FileJoinDriver implements Tool {
// 定义hadoop文件服务器路径
public static final String HDFS_PATH = "hdfs://hadoop01:8020";
// 获取配置信息以及获取job对象
private Configuration configuration;
public Configuration getConf() {
return this.configuration;
}
public void setConf(Configuration configuration) {
this.configuration = configuration;
}
public FileJoinDriver() {
// 获取job对象
this.configuration = new Configuration();
}
public int run(String[] arg0) throws Exception {
// 1.获取Job对象
Job job = Job.getInstance(configuration);
// 2.设置Jar包路径
job.setJarByClass(FileJoinDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(FileJoinMapper.class);
job.setReducerClass(FileJoinReducer.class);
// 4.设置Mapper类&输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 4.设置Reducer输出的KV类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// 5.设置输入输出的路径
Path inPath = new Path(HDFS_PATH + "/user/input/files");
FileInputFormat.setInputPaths(job, inPath);
Path outPath = new Path(HDFS_PATH + "/user/output/files");
// 如果输出路径存在,删除路径
FileSystem fs = FileSystem.get(new URI(HDFS_PATH), configuration);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
// 6.提交任务
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
}
4、测试
@Test
public void driver() throws Exception{
ToolRunner.run(new FileJoinDriver(), null);
}
上传文件file1.txt,file2.txt至hdfs的/user/input/files文件夹下
从hdfs的/user/output/files文件夹下载part-r-00000文件查看结果
二、MapReduce数据清洗
2.1 ETL数据清洗
数据清洗, 是整个数据分析过程中不可缺少的一个环节,其结果质量直接关系到模型效果和最终结论。在实际操作中,数据清洗通常会占据分析过程的50%—80%的时间。我们可以先将日志数据采集到HDFS中,之后再进一步使用MapReduce、Hive等来对数据进行分析。
2.2 ETL数据清洗过程
刚刚采集到HDFS中的原生数据,我们也称为不规整数据,即目前来说,该数据的格式还无法满足我们对数据处理的基本要求,需要对其进行预处理,转化为我们后面工作所需要的较为规整的数据。步骤如下:
2.2.1 预处理阶段
预处理阶段主要做两件事情:
1、将数据导入处理工具。通常来说,建议使用数据库,单机跑数搭建MySQL环境即可。如果数据量大(千万级以上),可以使用文本文件存储+Python操作的方式。
2、看数据。这里包含两个部分:一是看元数据,包括字段解释、数据来源、代码表等等一切描述数据的信息;二是抽取一部分数据,使用人工查看方式,对数据本身有一个直观的了解,并且初步发现一些问题,为之后的处理做准备。
如:
得到上示数据进行数据清洗
2.2.2 缺失值清洗
缺失值是最常见的数据问题,处理缺失值也有很多方法,建议按照以下四个步骤进行:
1、确定缺失值范围:对每个字段都计算其缺失值比例,然后按照缺失比例和字段重要性,分别制定策略
2、去除不需要的字段:这一步很简单,直接删掉即可
3、填充缺失内容:某些缺失值可以进行填充,方法有以下三种:
- 以业务知识或经验推测填充缺失值
- 以同一指标的计算结果(均值、中位数、众数等)填充缺失值
- 以不同指标的计算结果填充缺失值
前两种方法比较好理解。关于第三种方法,举个最简单的例子:年龄字段缺失,可以根据身份证号计算年龄
4、重新取数:如果某些指标非常重要又缺失率高,那就需要和取数人员或业务人员了解,是否有其他渠道可以取到相关数据。
如果数据是由系统日志而来,那么通常在格式和内容方面,会与元数据的描述一致。而如果数据是由人工收集或用户填写而来,则有很大可能性在格式和内容上存在一些问题,简单来说,格式内容问题有以下几类:
2.2.3 格式内容清洗
1、时间、日期、数值、全半角等显示格式不一致
这种问题通常与输入端有关,在整合多来源数据时也有可能遇到,将其处理成一致的某种格式即可。
2、内容中有不该存在的字符
某些内容可能只包括一部分字符,比如身份证号是数字+字母,中国人姓名是汉字。最典型的就是头、尾、中间的空格,也可能出现姓名中存在数字符号、身份证号中出现汉字等问题。这种情况下,需要以半自动校验半人工方式来找出可能存在的问题,并去除不需要的字符。
3、内容与该字段应有内容不符
姓名写了性别,身份证号写了手机号等等,均属这种问题。 但该问题特殊性在于:并不能简单的以删除来处理,因为成因有可能是人工填写错误,也有可能是前端没有校验,还有可能是导入数据时部分或全部存在列没有对齐的问题,因此要详细识别问题类型。
格式内容问题是比较细节的问题,但很多分析失误都是栽在这个坑上,比如跨表关联或VLOOKUP失败(多个空格导致工具认为“张三”和“张 三”不是一个人)、统计值不全(数字里掺个字母当然求和时结果有问题)、模型输出失败或效果不好(数据对错列了,把日期和年龄混了)。
2.2.4 逻辑错误清洗
这部分的工作是去掉一些使用简单逻辑推理就可以直接发现问题的数据,防止分析结果走偏。主要包含以下几个步骤:
1、去重
有的分析师喜欢把去重放在第一步,但我强烈建议把去重放在格式内容清洗之后,原因已经说过了(多个空格导致工具认为“张三”和“张 三”不是一个人,去重失败)。去重也不能以一个字段来判断,如南昌的北京路和武汉的北京路就不是一条信息。
2、去除不合理值
一句话就能说清楚:有人填表时候瞎填,年龄200岁,年收入100000万(估计是没看见”万“字),这种的就要么删掉,要么按缺失值处理。
3、修正矛盾内容
有些字段是可以互相验证的,举例:身份证号是1101031980XXXXXXXX,然后年龄填18岁,我们虽然理解人家永远18岁的想法,但得知真实年龄可以给用户提供更好的服务啊。在这种时候,需要根据字段的数据来源,来判定哪个字段提供的信息更为可靠,去除或重构不可靠的字段。
逻辑错误除了以上列举的情况,还有很多未列举的情况,在实际操作中要酌情处理。另外,这一步骤在之后的数据分析建模过程中有可能重复,因为即使问题很简单,也并非所有问题都能够一次找出,我们能做的是使用工具和方法,尽量减少问题出现的可能性,使分析过程更为高效。
2.2.5 非需求数据清洗
这一步说起来非常简单:把不要的字段删了。
如:静安-不夜城-芷江西路453弄,只需要地址“芷江西路453弄”,静安和不夜城就可以删除了。
2.2.6 关联性验证
如果你的数据有多个来源,那么有必要进行关联性验证。例如,你有汽车的线下购买信息,也有电话客服问卷信息,两者通过姓名和手机号关联,那么要看一下,同一个人线下登记的车辆信息和线上问卷问出来的车辆信息是不是同一辆,如果不是,那么需要调整或去除数据。
2.3 MapReduce数据清洗
MapReduce数据清洗一个Map程序就能完成几种数据的清洗,但也有需要Reduce程序配合清洗。
2.3.1 数据清洗代码
1、ETL数据清洗工具类
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
/**
* ETL数据清洗工具类
* @author Administrator
*
*/
public class ETLUtils {
/**
* 分离字符串成字符数组
* @param line
* @param separator 分隔符
* @return
*/
public static String[] split(String line,String separator){
return line.split(separator);
}
/**
* 字符数组连接成字符串
* @param fields
* @return
*/
public static String join(String[] fields){
return StringUtils.join(fields,",");
}
/**
* 判断值是否为空
* @param vlaue
* @return
*/
public static boolean isNull(String value){
/*
* 如果数据=NAN,"",null," ",undefined视为缺失值
*/
if(value.toUpperCase().trim().equals("NAN")
&& value.toUpperCase().trim().equals("")
&& value.toUpperCase().trim().equals("NULL")
&& value.toUpperCase().trim().equals("UNDEFINED")){
return true;
}
return false;
}
/**
* 统计总列数
* @param line
* @param separator 字段分隔符
* @return
*/
public static int countColumns(String line,String separator){
//分割字段
String[] fields = line.split(separator);
return fields.length;
}
/**
* 检查缺失字段的数
* @param line
* @param separator 字段分隔符
* @return
*/
public static int sumNull(String line,String separator){
int count=0;
//分割字段
String[] fields = line.split(separator);
//统计空值字段
for(String value:fields){
if(isNull(value)){
count += 1;
}
}
return count;
}
/**
* 删除空值行,按照指定列为空删除行
* @param line
* @param separator 字段分隔符
* @param columns
* @return
*/
public static String dropNullRow(String line,String separator,int column){
// 分割字段
String[] fields = line.split(separator);
// 如果指定列为空,删除该行
if(isNull(fields[column])){
return null;
}
return line;
}
/**
* 删除空值行,按照空值列占比删除行
* @param line
* @param separator 字段分隔符
* @param percentage 百分比
* @return
*/
public static String dropNullRow(String line,String separator,double percentage){
//字段总数
int count = countColumns(line, separator);
//缺失值字段数
int sum = sumNull(line, separator);
/*
* 如果缺失值字段占比>percentage,删除该行数据
*/
if(sum / count - percentage >= 1e-6){
return null;
}
return line;
}
/**
* 取hash值
* @param line
* @param separator
* @param columns
* @return
*/
public static Long getHashCode(String line,String separator,int[] columns){
String[] fields = line.split(separator);
Long hashCode = 0L;
for(int i : columns){
hashCode += fields[i].hashCode();
}
return hashCode;
}
/**
* 处理异常值,用正则表达式
* @param line
* @param separator
* @param regEx 正则表达式
* @param column 列
* @return
*/
public static String handleOutlier(String line,String separator,String regEx,int column){
String[] fields = line.split(separator);
//正则表达式
Pattern pattern = Pattern.compile(regEx);
Matcher matcher = pattern.matcher(fields[column]);
fields[column] = matcher.replaceAll("").trim();
return StringUtils.join(fields,",");
}
/**
* 删除不需要的列
* @param line
* @param separator
* @param columns 多列
* @return
*/
public static String removeColum(String line,String separator,int[] columns){
String[] fields = line.split(separator);
List<String> list = new ArrayList<>(Arrays.asList(fields));
for(int column : columns){
list.remove(column);
}
return StringUtils.join(list.toArray(),",");
}
/**
* 提取所需列
* @param line
* @param separator
* @param column
* @return
*/
public static String getColumn(String line,String separator,int column){
String[] fields = line.split(separator);
return fields[column];
}
}
2、ETL数据清洗Mapper
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.jbit.dataweb.util.ETLUtils;
public class ETLMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
// 定义全局的key,value
private LongWritable k = new LongWritable();
private Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 获取数据: 中文乱码处理:new String(value.getBytes(),0,value.getLength(),"GBK");
String line = value.toString();
// 去空值行
double percentage =2.0/7;
line = ETLUtils.dropNullRow(line, ",", percentage);
//异常值处理
String regEx = "[^(\\d|\\.)]";
line = ETLUtils.handleOutlier(line, ",", regEx, 2);
//提取地址列
line = getAddress(line);
//提取方式和名称列
line = getMethodAndName(line);
// 去重
int[] columns ={0,3,6};
k = new LongWritable(ETLUtils.getHashCode(line, ",", columns));
v.set(line);
context.write(k, v);
}
/**
* 提取方式和名称列
* @param line
* @return
*/
private String getMethodAndName(String line){
String[] fields = ETLUtils.split(line, ",");
//提取所需列
String strColumn = ETLUtils.getColumn(fields[0], " ", 0);
//处理方式和名称列
String[] strColumns = ETLUtils.split(strColumn, "·");
//将方式和名称列连接字符串
String str = ETLUtils.join(strColumns);
//删除行的第一列
int[] columns ={0};
line = ETLUtils.removeColum(line, ",", columns);
//连接字符串
line = str + "," + line;
return line;
}
/**
* 提取地址
* @param line
* @return
*/
private String getAddress(String line){
//提取所需列
String strColumn = ETLUtils.getColumn(line, ",", 3);
//提取地址列
String addr = ETLUtils.getColumn(strColumn, "-", 2);
//替换地址
String[] fields = ETLUtils.split(line, ",");
fields[3] = addr;
//数组转换字符串
line = ETLUtils.join(fields);
return line;
}
}
3、ETL数据清洗Reducer
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ETLReducer extends Reducer<LongWritable, Text, Text, NullWritable>{
Text k = new Text();
@Override
protected void reduce(LongWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text line : values){
k=line;
break; //去重
}
context.write(k,NullWritable.get());
}
}
4、ETL数据清洗Driver
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
public class ETLDriver implements Tool{
public static final String HDFS_PATH = "hdfs://hadoop01:8020";
private Configuration configuration;
public ETLDriver(){
//构建配置信息
this.configuration = new Configuration();
}
@Override
public Configuration getConf() {
return this.configuration;
}
@Override
public void setConf(Configuration configuration) {
this.configuration = configuration;
}
@Override
public int run(String[] arg0) throws Exception {
//1.获取Job对象
Job job = Job.getInstance( configuration );
//2.设置Jar包路径
job.setJarByClass( ETLDriver.class );
//3 关联Mapper和Reducer的jar
job.setMapperClass(ETLMapper.class);
job.setReducerClass(ETLReducer.class);
//4.设置Mapper类&输出KV类型
job.setMapperClass( ETLMapper.class );
job.setMapOutputKeyClass( LongWritable.class );
job.setMapOutputValueClass( Text.class );
//4.设置最终输出的KV类型
job.setOutputKeyClass( Text.class );
job.setOutputValueClass( NullWritable.class );
//5.设置输入输出的路径
Path inPath = new Path(HDFS_PATH+"/user/input/test");
Path outPath = new Path(HDFS_PATH+"/user/output/test");
FileInputFormat.setInputPaths( job, inPath);
//如果输出路径存在,删除路径
FileSystem fs = FileSystem.get(new URI(HDFS_PATH), configuration);
if(fs.exists(outPath)){
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath( job,outPath);
//6.提交任务
boolean result = job.waitForCompletion( true );
return result ? 0 : 1;
}
}
5、测试数据
整租·芷江西路453弄 1室0厅 南,4300,36.36㎡,静安-不夜城-芷江西路453弄,南,1室0厅1卫,静安
整租·慧芝湖花园 2室2厅 南,12500,90.42㎡,静安-大宁-慧芝湖花园,南,2室2厅1卫,静安
整租·中山北路197弄 1室1厅 南,5000,32.00㎡,静安-闸北公园-中山北路197弄,南,1室1厅1卫,静安
整租·东方莱茵 2室2厅 南,21000,138.00㎡,静安-不夜城-东方莱茵,南,2室2厅2卫,静安
整租·长安大厦 2室1厅 南/西南,8800,78.02㎡,静安-不夜城-长安大厦,南,2室1厅1卫,静安
整租·万航渡路433弄 1室1厅 南,5900,34.02㎡,静安-曹家渡-万航渡路433弄,南,1室1厅1卫,静安
整租·君御豪庭 2室2厅 南,21000,115.61㎡,静安-江宁路-君御豪庭,南,2室2厅2卫,静安
整租·陕西北路342弄 1室0厅 南,7500,24.00㎡,静安-南京西路-陕西北路342弄,南,1室0厅1卫,静安
整租·上海壹街区 3室2厅 南,21500,134.00㎡,静安-南京西路-上海壹街区,南,3室2厅2卫,静安
整租·静安府西区 3室2厅 南,23000,121.08㎡,静安-大宁-静安府西区,南,3室2厅2卫,静安
整租·中凯城市之光(静安) 5室2厅 南,35000,188.00㎡,静安-南京西路-中凯城市之光(静安),南,5室2厅2卫,静安
整租·远中风华园(公寓) 1室1厅 南,15500,68.00㎡,静安-南京西路-远中风华园(公寓),南,1室1厅1卫,静安
整租·丽都新贵 1室1厅 南,12500,87.98㎡,静安-南京西路-丽都新贵,南,1室1厅1卫,静安
整租·河滨豪园 4室2厅 南,27000,173.41㎡,静安-西藏北路-河滨豪园,南,4室2厅2卫,静安
整租·洛川中路1100弄 2室1厅 南/北,6700,51.14㎡,静安-大宁-洛川中路1100弄,南,2室1厅1卫,静安
整租·森凯苑 2室2厅 南,16500,90.00㎡,静安-曹家渡-森凯苑,南,2室2厅1卫,静安
整租·陕西北路342弄 2室1厅 南/北,12000,56.00㎡,静安-南京西路-陕西北路342弄,南,2室1厅1卫,静安
整租·明园森林都市 3室1厅 南,15000,147.64㎡,静安-大宁-明园森林都市,南,3室1厅2卫,静安
整租·宝华现代城 4室2厅 南,32000,190.90㎡,静安-大宁-宝华现代城,南,4室2厅2卫,静安
整租·延平大厦 2室1厅 东/南,14000,113.00㎡,静安-曹家渡-延平大厦,东,2室1厅1卫,静安
6、测试结果
整租,陕西北路342弄,7500,24.00,陕西北路342弄,南,1室0厅1卫,静安
整租,中山北路197弄,5000,32.00,中山北路197弄,南,1室1厅1卫,静安
整租,万航渡路433弄,5900,34.02,万航渡路433弄,南,1室1厅1卫,静安
整租,芷江西路453弄,4300,36.36,芷江西路453弄,南,1室0厅1卫,静安
整租,远中风华园(公寓),15500,68.00,远中风华园(公寓),南,1室1厅1卫,静安
整租,丽都新贵,12500,87.98,丽都新贵,南,1室1厅1卫,静安
……
