类似SQL中的join,需要将两个不同文件进行关联输出,例如一张是详情数据、一张是码值数据,需要关联输出的详情中进行了码值转换。
在 Reduce 端进行 join
Map端的主要工作:为来自不同表或文件的 key-value 对,打标签以区别不同来源的记录。然后用连接字段作为 key ,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就可以了。
示例,将订单表和产品表使用MapReduce做 join。
在Mapper中,将pid设置为 key,这样相同 key 的订单数据、产品数据 就都能进入同一个 reduce 方法中处理。
然后在这个reduce方法中,判断如果是产品表,就获取产品名称;如果是订单表,就先存入集合。最后再遍历订单集合,将产品名称设置进去。
订单表表数据:
id(订单编号) | pid(产品编号) | amount(数量) |
---|---|---|
1001 | 01 | 1 |
1002 | 02 | 1 |
1003 | 03 | 4 |
1004 | 01 | 2 |
1005 | 01 | 3 |
1006 | 02 | 5 |
产品表:
pid(产品编号) | pname(产品名称) |
---|---|
01 | 苹果 |
02 | 桔子 |
03 | 香蕉 |
最终输出结果:订单编号、产品名称、数量。
示例输入数据:
order.txt:
1001@01@1
1002@02@1
1003@03@4
1004@01@2
1005@01@3
1006@02@5
product.txt:
01@苹果
02@桔子
03@香蕉
编写JavaBean
package com.study.mapreduce.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* JavaBean结果
*/
public class OrderInfo implements Writable {
private String id; // 订单编号
private String pid; // 产品编号
private Integer amount; // 数量
private String pname; // 产品名称
private String _from_table; // 标记该条数据的来源表:order、product
public OrderInfo() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id); // 序列化String时,使用 writeUTF写出、readUTF读取
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(_from_table);
}
@Override
public void readFields(DataInput in) throws IOException {
id = in.readUTF();
pid = in.readUTF();
amount = in.readInt();
pname = in.readUTF();
_from_table = in.readUTF();
}
// 重写toString,将JavaBean输出到文件时会调用toString方法输出
@Override
public String toString() {
return "OrderInfo{" +
"id='" + id + '\'' +
", pid='" + pid + '\'' +
", amount=" + amount +
", pname='" + pname + '\'' +
", _from_table='" + _from_table + '\'' +
'}';
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public Integer getAmount() {
return amount;
}
public void setAmount(Integer amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String get_from_table() {
return _from_table;
}
public void set_from_table(String _from_table) {
this._from_table = _from_table;
}
}
编写Mapper
package com.study.mapreduce.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, OrderInfo> {
private Logger logger = Logger.getLogger(ReduceJoinMapper.class);
private Text outKey = new Text();
private String fileName = ""; // 当前MapTask处理的文件名称
/**
* 重写初始化方法,在初始化方法中通过切片信息【获取到文件名称】,避免每次都在map方法中读取耗费资源
* 因为每个切片对应一个MapTask,所以这个MapTask的初始化方法就是这个切片的初始化方法
* 因为一个文件可以切多个片,但是一个切片只会对应一个文件,所以这个切片对应的文件信息是固定的
*/
@Override
protected void setup(Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
// JobSubmitter 的 writeNewSplits 中会调用 input.getSplits(job) 进行切片
// 我们使用的默认InputFormat,也就是 TextInputFormat。TextInputFormat 的 getSplits 方法在 FileInputFormat(TextInputFormat的父类)中
// FileInputFormat 的 getSplits 方法会调用makeSplit创建 FileSplit 类型的切片对象
fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
String[] valueArray = value.toString().split("@");
logger.info("---------------------------------------------------------");
logger.info("fileName:" + fileName);
OrderInfo orderInfo = new OrderInfo();
if(fileName.startsWith("order")) { // 处理 order.txt文件的内容
orderInfo.setId(valueArray[0]);
orderInfo.setPid(valueArray[1]);
orderInfo.setAmount(Integer.valueOf(valueArray[2]));
orderInfo.set_from_table("_ORDER_"); // 设置来源表为order表
orderInfo.setPname(""); // 属性值不能有null,否则会报NPE空指针异常。可以设置一个默认值
outKey.set(valueArray[1]);
} else { // 处理 product.txt文件的内容
orderInfo.setPid(valueArray[0]);
orderInfo.setPname(valueArray[1]);
orderInfo.set_from_table("_PRODUCT_"); // 设置来源表为product
// 防止空指针异常,对null属性设置一个默认值
orderInfo.setId("");
orderInfo.setAmount(-1);
outKey.set(valueArray[0]);
}
context.write(outKey, orderInfo);
logger.info(orderInfo);
logger.info("-------------------------------------------");
}
}
编写Reducer
package com.study.mapreduce.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ReduceJoinReducer extends Reducer<Text, OrderInfo, Text, OrderInfo> {
private Logger logger = Logger.getLogger(ReduceJoinReducer.class);
@Override
protected void reduce(Text key, Iterable<OrderInfo> values, Reducer<Text, OrderInfo, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
logger.info("=====================================");
logger.info("key: " + key);
List<OrderInfo> orderInfoList = new ArrayList<>();
String pname = "";
Iterator<OrderInfo> iterator = values.iterator();
while (iterator.hasNext()) {
OrderInfo orderInfo = iterator.next();
logger.info(orderInfo);
if("_PRODUCT_".equals(orderInfo.get_from_table())) {
pname = orderInfo.getPname();
} else {
// 此处不能直接使用:orderInfoList.add(orderInfo);
// Hadoop迭代器为了优化效率,使用了对象重用。
// 迭代时value始终指向同一个内存地址,改变的只是引用地址中的字段属性
// 即这个循环了这么多次的orderInfo,其实是同一个对象,只是对象的属性值在不断变化
// 所以这里需要 new 一个新对象来存放这些属性,然后将这个新对象塞给集合
OrderInfo orderInfoTemp = new OrderInfo();
// orderInfoTemp.setId(orderInfo.getId());
// orderInfoTemp.setPid(orderInfo.getPid());
// orderInfoTemp.setAmount(orderInfo.getAmount());
// orderInfoTemp.set_from_table(orderInfo.get_from_table());
try {
// 使用BeanUtils工具对同名属性进行赋值
// 第一个参数:要设置值的目标对象
// 第二个参数:源对象
BeanUtils.copyProperties(orderInfoTemp, orderInfo);
} catch (Exception e) {
e.printStackTrace();
}
orderInfoList.add(orderInfoTemp);
}
}
logger.info("**********************************");
for (OrderInfo orderInfo : orderInfoList) {
orderInfo.setPname(pname);
context.write(key, orderInfo);
logger.info(orderInfo);
}
logger.info("===========================================");
}
}
编写Driver
package com.study.mapreduce.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ReduceJoinDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration config = new Configuration();
Job job = Job.getInstance(config);
job.setJarByClass(ReduceJoinDriver.class);
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(ReduceJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderInfo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(OrderInfo.class);
FileInputFormat.setInputPaths(job, new Path("/app/order/input"));
FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output2"));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1); // 程序退出
}
}
输出结果:
01 OrderInfo{id='1005', pid='01', amount=3, pname='苹果', _from_table='_ORDER_'}
01 OrderInfo{id='1004', pid='01', amount=2, pname='苹果', _from_table='_ORDER_'}
01 OrderInfo{id='1001', pid='01', amount=1, pname='苹果', _from_table='_ORDER_'}
02 OrderInfo{id='1006', pid='02', amount=5, pname='桔子', _from_table='_ORDER_'}
02 OrderInfo{id='1002', pid='02', amount=1, pname='桔子', _from_table='_ORDER_'}
03 OrderInfo{id='1003', pid='03', amount=4, pname='香蕉', _from_table='_ORDER_'}
在 Map 端进行 Join
join合并的操作如果在Reduce段完成,Reduce端的处理压力就会很大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易发生数据倾斜。
因为一般情况下,MapTask的节点数量要多于ReduceTask,可以在Map段进行数据join合并。
适用场景:一张表很小、另一张表很大。先将小表读取加载进内存中,然后在Map端进行 join合并。
具体方法:采用DistributedCache
- 在Mapper的setup阶段,将文件读取到缓存集合中
在Driver驱动类中加载缓存:
// 缓存普通文件到Task运行节点
job.addCacheFile(new URI("file:///app/cache/product.txt"));
// 如果是集群运行,需要设置成 hdfs 路径
在Map端进行 join,此时不再需要Reduce阶段,可以设置 ReduceTask数量为0
// 因为在Map端已经完成了join,达到了目的,不再需要ReduceTask,可以设置取消ReduceTask
job.setNumReduceTasks(0);
编写Driver
```java package com.study.mapreduce.mapjoin;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;
public class MapJoinDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { Configuration config = new Configuration(); Job job = Job.getInstance(config);
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
// 不需要汇总出Value,可以设置为null(NullWritable.get())
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置取消ReduceTask
job.setNumReduceTasks(0);
// 加载缓存数据
job.addCacheFile(new URI("/app/order/input/product.txt"));
FileInputFormat.setInputPaths(job, new Path("/app/order/input/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output3"));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1); // 程序退出
}
}
<a name="vCMdj"></a>
### 编写Mapper
```java
package com.study.mapreduce.mapjoin;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Map<String, String> productMap = new HashMap<>(); // product表数据
private Text outKey = new Text();
/**
* 在MapTask初始化时加载缓存文件
*/
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 获取product.txt缓存文件
URI[] cacheFiles = context.getCacheFiles();
URI productFile = cacheFiles[0];
// 读取product.txt文件
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
FSDataInputStream inputStream = fileSystem.open(new Path(productFile));
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String line = "";
while((line = br.readLine()) != null) {
String[] strArray = line.split("@");
productMap.put(strArray[0], strArray[1]);
}
IOUtils.closeStream(br);
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("@");
String id = fields[0];
String pid = fields[1];
int amount = Integer.valueOf(fields[2]);
String pname = productMap.get(pid);
String resultKey = id +"\t" + pname + "\t" + amount;
outKey.set(resultKey);
context.write(outKey, NullWritable.get());
}
}