1. MapReduce原理
通过前面的Hadoop序列化 此处难以理解
2. InputFormat 数据输入
2.1. FileInputFormat切片源码
2.2. CombineTextInputFormat切片机制
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
2.3. 自定义InputFormat
Driver
package com.inputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
public class MyInputDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(MyInputDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("D:/input"));
FileOutputFormat.setOutputPath(job, new Path("D:/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
InputFormat
package com.inputformat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class MyInputFormat extends FileInputFormat<Text, BytesWritable> {
/**
* 返回一个自定义RecordReader
* @param inputSplit
* @param taskAttemptContext
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new MyRecordReader();
}
}
RecordReader
package com.inputformat;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
//负责将整个文件转化成一组Key Value对
public class MyRecordReader extends RecordReader<Text, BytesWritable> {
//文件是否读完 默认为false
private boolean isRead;
//键值对
private Text key=new Text();
private BytesWritable value= new BytesWritable();
FSDataInputStream inputStream;
FileSplit fs;
/**
* 初始化方法 一般执行一些初始化操作
*
* @param inputSplit
* @param taskAttemptContext
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//开流
fs = (FileSplit) inputSplit;//强转为实现子类
FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration()); //获取config对象
inputStream = fileSystem.open(fs.getPath());//获取路径
}
/**
* 读取下一个键值对 是否存在
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!isRead){
//读取这个文件
//填充key
key.set(fs.getPath().toString()); //key路径
//value
byte[] buffer = new byte[(int) fs.getLength()];
value.set(buffer,0,buffer.length);
//标记文件读完
isRead = true;
return true;
}
return false;
}
/*
获取当前key
*/
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
//获取当前value
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 显示进度
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return isRead ? 0 : 1;
}
/*
关闭方法
*/
@Override
public void close() throws IOException {
IOUtils.closeStream(inputStream); //关流
}
}
3. Shuffle(混洗) 整理数据
MapReduce框架会确保每一个Reducer的输入都是按Key进行排序的。一般,将排序以及Map的输出传输到Reduce的过程称为混洗(shuffle)。每一个Map都包含一个环形的缓存,默认100M,Map首先将输出写到缓存当中。当缓存的内容达到“阈值”时(阈值默认的大小是缓存的80%),一个后台线程负责将结果写到硬盘,这个过程称为“spill”。Spill过程中,Map仍可以向缓存写入结果,如果缓存已经写满,那么Map进行等待。
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
Shuffle将map中无序的键值对,分区 排序 归并后输出给Reduce
Shuffle阶段数据是存放在内存(栈)中,如果数据写满了缓冲区,则会进行分区 并排序 然后进行归并排序 并且写入磁盘的操作,以释放缓冲区 让新数据进入缓冲区
一次排序比多次排序效率要高 因为归并次数越多效率下降 但如果是数据集庞大 我们只有牺牲时间来换取空间
3.1. Partition分区
实体类
package com.flow;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public void set(long upFlow, long downFlow) {
this.downFlow = downFlow;
this.upFlow = upFlow;
this.sumFlow = upFlow + downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
/**
* 将对象数据写出到框架指定地方 序列化
*
* @param dataOutput 数据的容器
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
/**
* 从框架指定地方读取数据填充对象 反序列化
*
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
//读写顺序要一致
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
}
分区类
package com.partitioner;
import com.flow.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, FlowBean> {
/**
* 对每一个键值对 返回对应的分区号
*
* @param text 手机号
* @param flowBean 流量
* @param numPartitions
* @return
*/
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
switch (text.toString().substring(0, 3)) { //根据手机号前3位
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
驱动类
package com.partitioner;
import com.flow.FlowBean;
import com.flow.FlowMapper;
import com.flow.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class NewFlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(NewFlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setNumReduceTasks(5); //设置分区数/并行数
job.setPartitionerClass(MyPartitioner.class); //设置分区类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path("file:///d:/input"));
FileOutputFormat.setOutputPath(job, new Path("file:///d:/output"));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
3.2. WritableComparable 排序
WritableComparable 是MapReduce中默认的排序接口 实现类为WritableComparator
MapTask和ReduceTask均会对数据按照key进行排序 hadoop的默认行为 默认排序为字典顺序排序 底层为快速排序
如果要重写排序方法 则让实体类继承WritableComparable接口 并实现compareTo方法
实现类
package com.compare;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//实现WritableComparable接口
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public void set(long upFlow, long downFlow) {
this.downFlow = downFlow;
this.upFlow = upFlow;
this.sumFlow = upFlow + downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
/**
* 将对象数据写出到框架指定地方 序列化
*
* @param dataOutput 数据的容器
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
/**
* 从框架指定地方读取数据填充对象 反序列化
*
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
//读写顺序要一致
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//比较器
@Override
public int compareTo(FlowBean o) {
// if (this.sumFlow < o.sumFlow) {
// return 1; //降序
// } else if (
// this.sumFlow == o.sumFlow
// ) {
// return 0;
// }
// return -1; //升序
return Long.compare(o.sumFlow,this.sumFlow);
}
}
mapper类
package com.compare;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CompareMapper extends Mapper<LongWritable, Text,FlowBean,Text> {
private Text phone =new Text();
private FlowBean flow = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
//一行数据
String line = value.toString();
//切分
String[] fields = line.split("\t");
//封装
phone.set(fields[0]);
flow.setUpFlow(Long.parseLong(fields[1]));
flow.setDownFlow(Long.parseLong(fields[2]));
flow.setSumFlow(Long.parseLong(fields[3]));
//写到上下文
context.write(flow,phone);
}
}
Reducer类
package com.compare;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//收的数据为 流量key 手机号value 输出为 手机key 流量value
public class CompareReducer extends Reducer<FlowBean, Text,Text,FlowBean> {
/**
* Reduce收到的数据已经排完序了 我们只需要将键和值 反着输出到文件中就可以
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}
驱动类
package com.compare;
import com.partitioner.MyPartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CompareDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(CompareDriver.class);
job.setMapperClass(CompareMapper.class);
job.setReducerClass(CompareReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// job.setSortComparatorClass(WritableComparator.class); //默认排序
// job.setGroupingComparatorClass(WritableComparator.class); //分区排序也是使用这个Comparato类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path("file:///d:/output"));
FileOutputFormat.setOutputPath(job, new Path("file:///d:/output2"));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
3.3. RawComparator 排序
WritableComparable 类已经帮我实现好了RawComparator 排序中方法 所有我们可以直接继承WritableComparable 而不是实现RawComparator 接口
package com.compare;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class FlowComparator extends WritableComparator {
@Override
public int compare(WritableComparable a, WritableComparable b) {
FlowBean fa = (FlowBean) a;
FlowBean fb = (FlowBean) b;
return Long.compare(fb.getSumFlow(), fa.getSumFlow());
}
protected FlowComparator() {
super(FlowBean.class, true);
}
}
驱动类要set为自定义后的排序类
package com.compare;
import com.partitioner.MyPartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CompareDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(CompareDriver.class);
job.setMapperClass(CompareMapper.class);
job.setReducerClass(CompareReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// job.setSortComparatorClass(WritableComparator.class); //默认排序
// job.setGroupingComparatorClass(WritableComparator.class); //分区排序也是使用这个Comparator类
job.setSortComparatorClass(FlowComparator.class); //设置为重写的Comparator类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path("file:///d:/output"));
FileOutputFormat.setOutputPath(job, new Path("file:///d:/output2"));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
Mappring类和实体类一致 但实体类中的compareTo 因为job已经设置了自定义的排序类 所有不会执行实体类中的compareTo方法
3.4. Combiner 合并
Combiner是MR程序中Mapper和Reducer之外的一种组件
Combiner组件的父类就是Reducer
Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在堆叠节点运行
Reducer是接受全局所有Mapper的输出结果Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量
Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv应用更Reducer的输入kv类型对应起来
总结:Combiner就是在MapTask时 提前将数据分组归并 减少相同数据的分区 排序 再归并,但前提条件是合并后的数据不影响产生的结果 否则空间换取时间的做法不可取
使用 在driver中传入Reducer类启用 不影响Reducer的使用
job.setCombinerClass(CompareReducer.class); //提前归并分组 减少数据处理时间
3.5. GroupingComparator分组
GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。 如果reduce的key是自定义的bean,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了。
- 实体类继承WritableComparable接口 实现compareTo方法 ```java package com.grouping;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class OrderBean implements WritableComparable
private String orderId;
private String productId;
private double price;
@Override
public String toString() {
return orderId + "\t" + productId + "\t" + price;
}
public String getOrderId() {
return orderId;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
//先按订单排序再根据订单相同价格降序
@Override
public int compareTo(OrderBean o) {
int compare = this.orderId.compareTo(o.orderId); //比较订单号是否相同
if (compare != 0) {
return compare; //不相同则返回差值
} else {
return Double.compare(o.price, this.price); //相同按价格升序
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(orderId);
dataOutput.writeUTF(productId);
dataOutput.writeDouble(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
}
}
2.
mapper封装数据到实体类中
```java
package com.grouping;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//封装OrderBean
public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
private OrderBean order =new OrderBean();
//mapper封装方法
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
order.setOrderId(split[0]);
order.setProductId(split[1]);
order.setPrice(Double.parseDouble(split[2]));
//key为一个OrderBean
context.write(order,NullWritable.get());
}
}
- 比较器 继承WritableComparator实现类 重写compare和无参构造方法 ```java package com.grouping;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;
//按照订单编号对数据进行分组 public class OrderComparator extends WritableComparator { //按照相同订单进入一组进行比较 @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b;
return oa.getOrderId().compareTo(ob.getOrderId());
}
protected OrderComparator() {
super(OrderBean.class,true);
}
}
4.
Reducer类 此时key为实体类 value为null
```java
package com.grouping;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
//取每个订单的最高价格
public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException {
Iterator<NullWritable> iterator = values.iterator();
for (int i = 0; i < 2; i++) { //输出当前订单组中前两个最高价格
if (iterator.hasNext()){
iterator.next();
context.write(key,NullWritable.get());
}
}
}
}
- 驱动类 setGroupingComparatorClass开启分组 ```java package com.grouping;
import com.flow.FlowBean; import com.flow.FlowDriver; import com.flow.FlowMapper; import com.flow.FlowReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Job job = Job.getInstance(new Configuration());
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(OrderComparator.class); //分组比较器
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("file:///d:/input"));
FileOutputFormat.setOutputPath(job, new Path("file:///d:/output"));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
上面Reducer中获取当前订单组中前两个最高价格 利用了shuffle中数据序列化的特性 如果在写入到磁盘中每次输出一个值创建一个映射实体类那么效率太低下 进入Mapper后数据就默认内部序列化了 写入到磁盘时只需创建一次映射实体类通过序列化迭代下一个键值对改变实体类的值 这样无需多次创建实体类浪费资源
<a name="d1c91801"></a>
# 4. OutputFormat 数据输出
![](https://cdn.jsdelivr.net/gh/Iekrwh/images/md-images/image-20211017083334913.png#alt=image-20211017083334913)
1.
Record类
```java
package com.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
//将数据按照不包含 atguigu 的数据 分别输出到两个文件中
public class MyRecordWriter extends RecordWriter<LongWritable, Text> {
FSDataOutputStream atguigu = null;
FSDataOutputStream other = null;
public MyRecordWriter(TaskAttemptContext job) throws IOException {
Configuration configuration = job.getConfiguration();//通过job获取配置文件
FileSystem fileSystem = FileSystem.get(configuration); //通过配置文件获取文件对象
String outdir = configuration.get(FileOutputFormat.OUTDIR); //获取配置文件中的输出路径地址
atguigu = fileSystem.create(new Path(outdir + "/atguigu.log"));//拼接
other = fileSystem.create(new Path(outdir + "/other.log"));
}
/**
* 接受键值对 并按照值的不同输出到不同文件中
*
* @param key 读取的一行的偏移量
* @param value 这一行的内容
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(LongWritable key, Text value) throws IOException, InterruptedException {
String line = value.toString() + "\n";
if (line.contains("atguigu")) {//判断此行是否包含atguigu
//往atguigu文件写出数据
atguigu.write(line.getBytes(StandardCharsets.UTF_8));
} else {
//往other文件写出数据
other.write(line.getBytes(StandardCharsets.UTF_8));
}
}
//关闭资源
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(atguigu);
IOUtils.closeStream(other);
}
}
- OutputFormat类 ```java package com.outputformat;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
//mapping默认输出为LongWritable, Text
public class MyOutputFormat extends FileOutputFormat
3.
driver类
```java
package com.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OutputDrive {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(OutputDrive.class);
job.setOutputFormatClass(MyOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("d:/input"));
//必须保证配置文件配置正常才能正常运行
FileOutputFormat.setOutputPath(job, new Path("d:/output"));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
5. Reduce Join
reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签> (tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。> 在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list,
然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作.
Map
端的主要工作:为来自不同表或文件的key/value
对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。Reduce
端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。- 该方法的缺点:这种方式的缺点很明显就是会造成Map和Reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
- 创建实体类 并排序 ```java package com.reducejoin;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class OrderBean implements WritableComparable
private String id;
private String pid;
private int amount;
private String pname;
@Override
public String toString() {
return id + "\t" + pname + "\t" + amount;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
@Override
public int compareTo(OrderBean o) {
//按pid分组 组内按照pname降序排序
int i = this.pid.compareTo(o.pid);
if (i !=0){
return i;
}else {
return o.pname.compareTo(this.pname);
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(pname);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.pname = dataInput.readUTF();
}
}
2.
Mapper类 根据文件名的不同来封装实体类不同的实现
```java
package com.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
private OrderBean order = new OrderBean();
private String filename; //获取当前文件名
@Override
protected void setup(Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException {
//获取数据文件名
FileSplit fs = (FileSplit) context.getInputSplit();
filename = fs.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
//封装,按数据来源不同分别封装
if ("order.txt".equals(filename)){
//封装order
order.setId(split[0]);
order.setPid(split[1]);
order.setAmount(Integer.parseInt(split[2]));
order.setPname(""); //不能为null
}else{
//封装pd
order.setPid(split[0]);
order.setPname(split[1]);
order.setAmount(0); //不能为null
order.setId("");
}
context.write(order,NullWritable.get());
}
}
- comparator 根据pid进行分组 ```java package com.reducejoin;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;
//分组比较器 按照order对象的pid分组 public class OrderComparator extends WritableComparator {
protected OrderComparator() {
super(OrderBean.class,true);
}
//按照pid比较a和b
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean oa= (OrderBean) a;
OrderBean ob= (OrderBean) b;
return oa.getPid().compareTo(ob.getPid());
}
}
4.
Reducer 进行替换合并处理好/标志好数据
```java
package com.reducejoin;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
//数据替换工作 将pid换成对应的pname
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException {
/*for (NullWritable value : values) {
if (!"".equals(key.getPname())){
pName=key.getPname(); //遍历panme查找当前分组中有值的pname即品牌
break; //但迭代器无法进行第二次迭代遍历
}*/
//已经根据pname再次排序 并进行分组 第一个为需要的品牌名pname
Iterator<NullWritable> iterator = values.iterator();
iterator.next();
String pName = key.getPname(); //获取品牌名
while (iterator.hasNext()) {
iterator.next();
key.setPname(pName); //替换为对应的品牌名
context.write(key, NullWritable.get()); //写出
}
}
}
- 驱动类 ```java package com.reducejoin;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Job job = Job.getInstance(new Configuration());
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(OrderComparator.class); //分组比较器
FileInputFormat.setInputPaths(job, new Path("d:/input"));
FileOutputFormat.setOutputPath(job, new Path("d:/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
<a name="39b46b74"></a>
# 6. MapJoin
Map Join适用于一张表十分小、一张表很大的场景。在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
而使用MapJoin只需编写 driver和map类 无需编写reduce类 因为不涉及到reduce阶段 我们在map阶段就处理完成
1.
driver 开启分布式缓存并传递小文件路径
```java
package com.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class MJDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(MJDriver.class);
job.setMapperClass(MJMapper.class);
job.setNumReduceTasks(0); //Map端的join不需要Reduce阶段 所以设置ReduceTask数0
//添加分布式缓存可以添加多值 传递为数组
job.addCacheFile(URI.create("file:///d:/input/pd.txt")); //设置加载缓存数据
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:/input/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("d:/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
- mapper setup加载分布式缓存字节流put到map集合当中 在map中替换和处理要处理的数据 ```java package com.mapjoin;
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map;
public class MJMapper extends Mapper
private Map<String, String> pMap = new HashMap<>();
private Text k = new Text();
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
//读取pd.txt到pMap
//开流
URI[] cacheFiles = context.getCacheFiles(); //读取分布式缓存文件路径数组
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
FSDataInputStream pd = fileSystem.open(new Path(cacheFiles[0])); //pd文件
//将文件按行处理 读取到pMap中
BufferedReader br = new BufferedReader(new InputStreamReader(pd)); //将字节流转为字符流
String line;
while (StringUtils.isNotEmpty(line = br.readLine())) {
String[] split = line.split("\t");
pMap.put(split[0], split[1]); //转为map集合
}
IOUtils.closeStream(br);
}
//处理order.txt的数据
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
k.set(split[0] + "\t" + pMap.get(split[1]) + "\t" + split[2]); //从map中根据pid获取value替换
context.write(k,NullWritable.get());
}
}
![](https://cdn.jsdelivr.net/gh/Iekrwh/images/md-images/image-20211017103235651.png#alt=image-20211017103235651)
1. mapJoin效率比ReduceJoin高
2. mapJoin因为是提前缓存数据到内存中 如果数据量庞大那么则无法使用
<a name="25584f38"></a>
# 7. 数据清洗(ETL)和计数器
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序
1.
创建枚举类 方便构造计数器
```java
package com.etl;
public enum ETL {
PASS,FAIL
}
- mapper类 在setup方法中构造Counter 计数器 在map中通过数据清洗 计算出符合条件的条数 ```java package com.etl;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ETLMapper extends Mapper
private Counter pass;
private Counter fail;
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// pass = context.getCounter(“ETL”, “PASS”); //通过上下文构造一个计数器对象 // fail = context.getCounter(“ETL”, “Fail”); //通过key value赋值 pass = context.getCounter(ETL.PASS); //通过上下文构造一个计数器对象 fail = context.getCounter(ETL.FAIL); //通过枚举类来构造
}
//判断日志是否需要清洗
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(" ");
if (splits.length > 11) {
context.write(value, NullWritable.get());
pass.increment(1); //计数器+1
} else {
fail.increment(1); //不符合条件的计数器+1
}
//此处没有作上下文写入 默认为不改变传递给reduce
}
}
3.
驱动类
```java
package com.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ETLDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(ETLDriver.class);
job.setMapperClass(ETLMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("d:/input"));
FileOutputFormat.setOutputPath(job, new Path("d:/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
Fail为符合条件的内容条数
PASS为不符合条件的内容条数
8. 总结
输入数据接口: InputFormat
- 默认使用的实现是: TextInputFormat
- TextInputFormat的功能逻辑是: 一次读一行文本 然后将该行的起始偏移量作为key 行内容作为valuie返回
- KeyVlaueTextInputFormat每一行均为一条记录 被分隔符分割为key value 默认的分隔符为 \t
- NlineInputFormat 按照指定的行数N来划分切片
- CombineTextInputFormat可以把多个小文件合并成一个切片处理 提高处理效率
- 用户还可以自定义InputFormat
逻辑处理接口: Mapper
- 根据业务需求实现 map() setup cleanup() 这三个方法
Partitioner分区
默认实现类 HashPartitioner 逻辑是根据key的哈希值 和 numReduces来返回一个分区号
(key.hashCode() & Integer.MAXVALUE) % numReduces可以自定义分区
Comparable 排序
- 当我们用自定义的对象作为key来输出时 必须要实现 WritableComparable接口 重写其中的compareTo()方法
- 部分排序: 对最终输出的每个文件进行内部排序
- 全排序:对所有数据进行排序 通常只有一个Reduse
- 二次排序: 排序的条件有两个
Combiner 合并
- Combiner合并可以提高程序的效率,减少IO传输.但是使用时必须不能影响原有的业务处理结果
Reduce端分组
- GroupingComparator 在Reduce端对key进行分组 应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序
逻辑处理接口 Reducer
- 根据业务需求实现 reduce() setup cleanup() 这三个方法
输出数据接口 OutputFormat
- 默认实现类是TextOutputFormat 功能逻辑是 将每一个键值对 想目标文本文件输出一行
- 将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这个是一种比较好的输出格式,y我它格式紧凑 容易被压缩
- 可以自定义OutputFormat