类似SQL中的join,需要将两个不同文件进行关联输出,例如一张是详情数据、一张是码值数据,需要关联输出的详情中进行了码值转换。
在 Reduce 端进行 join
Map端的主要工作:为来自不同表或文件的 key-value 对,打标签以区别不同来源的记录。然后用连接字段作为 key ,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就可以了。
示例,将订单表和产品表使用MapReduce做 join。
在Mapper中,将pid设置为 key,这样相同 key 的订单数据、产品数据 就都能进入同一个 reduce 方法中处理。
然后在这个reduce方法中,判断如果是产品表,就获取产品名称;如果是订单表,就先存入集合。最后再遍历订单集合,将产品名称设置进去。
订单表表数据:
| id(订单编号) | pid(产品编号) | amount(数量) |
|---|---|---|
| 1001 | 01 | 1 |
| 1002 | 02 | 1 |
| 1003 | 03 | 4 |
| 1004 | 01 | 2 |
| 1005 | 01 | 3 |
| 1006 | 02 | 5 |
产品表:
| pid(产品编号) | pname(产品名称) |
|---|---|
| 01 | 苹果 |
| 02 | 桔子 |
| 03 | 香蕉 |
最终输出结果:订单编号、产品名称、数量。
示例输入数据:
order.txt:
1001@01@11002@02@11003@03@41004@01@21005@01@31006@02@5
product.txt:
01@苹果02@桔子03@香蕉
编写JavaBean
package com.study.mapreduce.reducejoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/*** JavaBean结果*/public class OrderInfo implements Writable {private String id; // 订单编号private String pid; // 产品编号private Integer amount; // 数量private String pname; // 产品名称private String _from_table; // 标记该条数据的来源表:order、productpublic OrderInfo() {}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id); // 序列化String时,使用 writeUTF写出、readUTF读取out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(_from_table);}@Overridepublic void readFields(DataInput in) throws IOException {id = in.readUTF();pid = in.readUTF();amount = in.readInt();pname = in.readUTF();_from_table = in.readUTF();}// 重写toString,将JavaBean输出到文件时会调用toString方法输出@Overridepublic String toString() {return "OrderInfo{" +"id='" + id + '\'' +", pid='" + pid + '\'' +", amount=" + amount +", pname='" + pname + '\'' +", _from_table='" + _from_table + '\'' +'}';}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public Integer getAmount() {return amount;}public void setAmount(Integer amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String get_from_table() {return _from_table;}public void set_from_table(String _from_table) {this._from_table = _from_table;}}
编写Mapper
package com.study.mapreduce.reducejoin;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.log4j.Logger;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, OrderInfo> {private Logger logger = Logger.getLogger(ReduceJoinMapper.class);private Text outKey = new Text();private String fileName = ""; // 当前MapTask处理的文件名称/*** 重写初始化方法,在初始化方法中通过切片信息【获取到文件名称】,避免每次都在map方法中读取耗费资源* 因为每个切片对应一个MapTask,所以这个MapTask的初始化方法就是这个切片的初始化方法* 因为一个文件可以切多个片,但是一个切片只会对应一个文件,所以这个切片对应的文件信息是固定的*/@Overrideprotected void setup(Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {// JobSubmitter 的 writeNewSplits 中会调用 input.getSplits(job) 进行切片// 我们使用的默认InputFormat,也就是 TextInputFormat。TextInputFormat 的 getSplits 方法在 FileInputFormat(TextInputFormat的父类)中// FileInputFormat 的 getSplits 方法会调用makeSplit创建 FileSplit 类型的切片对象fileName = ((FileSplit) context.getInputSplit()).getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {String[] valueArray = value.toString().split("@");logger.info("---------------------------------------------------------");logger.info("fileName:" + fileName);OrderInfo orderInfo = new OrderInfo();if(fileName.startsWith("order")) { // 处理 order.txt文件的内容orderInfo.setId(valueArray[0]);orderInfo.setPid(valueArray[1]);orderInfo.setAmount(Integer.valueOf(valueArray[2]));orderInfo.set_from_table("_ORDER_"); // 设置来源表为order表orderInfo.setPname(""); // 属性值不能有null,否则会报NPE空指针异常。可以设置一个默认值outKey.set(valueArray[1]);} else { // 处理 product.txt文件的内容orderInfo.setPid(valueArray[0]);orderInfo.setPname(valueArray[1]);orderInfo.set_from_table("_PRODUCT_"); // 设置来源表为product// 防止空指针异常,对null属性设置一个默认值orderInfo.setId("");orderInfo.setAmount(-1);outKey.set(valueArray[0]);}context.write(outKey, orderInfo);logger.info(orderInfo);logger.info("-------------------------------------------");}}
编写Reducer
package com.study.mapreduce.reducejoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.apache.log4j.Logger;import java.io.IOException;import java.util.ArrayList;import java.util.Iterator;import java.util.List;public class ReduceJoinReducer extends Reducer<Text, OrderInfo, Text, OrderInfo> {private Logger logger = Logger.getLogger(ReduceJoinReducer.class);@Overrideprotected void reduce(Text key, Iterable<OrderInfo> values, Reducer<Text, OrderInfo, Text, OrderInfo>.Context context) throws IOException, InterruptedException {logger.info("=====================================");logger.info("key: " + key);List<OrderInfo> orderInfoList = new ArrayList<>();String pname = "";Iterator<OrderInfo> iterator = values.iterator();while (iterator.hasNext()) {OrderInfo orderInfo = iterator.next();logger.info(orderInfo);if("_PRODUCT_".equals(orderInfo.get_from_table())) {pname = orderInfo.getPname();} else {// 此处不能直接使用:orderInfoList.add(orderInfo);// Hadoop迭代器为了优化效率,使用了对象重用。// 迭代时value始终指向同一个内存地址,改变的只是引用地址中的字段属性// 即这个循环了这么多次的orderInfo,其实是同一个对象,只是对象的属性值在不断变化// 所以这里需要 new 一个新对象来存放这些属性,然后将这个新对象塞给集合OrderInfo orderInfoTemp = new OrderInfo();// orderInfoTemp.setId(orderInfo.getId());// orderInfoTemp.setPid(orderInfo.getPid());// orderInfoTemp.setAmount(orderInfo.getAmount());// orderInfoTemp.set_from_table(orderInfo.get_from_table());try {// 使用BeanUtils工具对同名属性进行赋值// 第一个参数:要设置值的目标对象// 第二个参数:源对象BeanUtils.copyProperties(orderInfoTemp, orderInfo);} catch (Exception e) {e.printStackTrace();}orderInfoList.add(orderInfoTemp);}}logger.info("**********************************");for (OrderInfo orderInfo : orderInfoList) {orderInfo.setPname(pname);context.write(key, orderInfo);logger.info(orderInfo);}logger.info("===========================================");}}
编写Driver
package com.study.mapreduce.reducejoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration config = new Configuration();Job job = Job.getInstance(config);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(ReduceJoinMapper.class);job.setReducerClass(ReduceJoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(OrderInfo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(OrderInfo.class);FileInputFormat.setInputPaths(job, new Path("/app/order/input"));FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output2"));boolean success = job.waitForCompletion(true);System.exit(success ? 0 : 1); // 程序退出}}
输出结果:
01 OrderInfo{id='1005', pid='01', amount=3, pname='苹果', _from_table='_ORDER_'}01 OrderInfo{id='1004', pid='01', amount=2, pname='苹果', _from_table='_ORDER_'}01 OrderInfo{id='1001', pid='01', amount=1, pname='苹果', _from_table='_ORDER_'}02 OrderInfo{id='1006', pid='02', amount=5, pname='桔子', _from_table='_ORDER_'}02 OrderInfo{id='1002', pid='02', amount=1, pname='桔子', _from_table='_ORDER_'}03 OrderInfo{id='1003', pid='03', amount=4, pname='香蕉', _from_table='_ORDER_'}
在 Map 端进行 Join
join合并的操作如果在Reduce段完成,Reduce端的处理压力就会很大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易发生数据倾斜。
因为一般情况下,MapTask的节点数量要多于ReduceTask,可以在Map段进行数据join合并。
适用场景:一张表很小、另一张表很大。先将小表读取加载进内存中,然后在Map端进行 join合并。
具体方法:采用DistributedCache
- 在Mapper的setup阶段,将文件读取到缓存集合中
在Driver驱动类中加载缓存:
// 缓存普通文件到Task运行节点job.addCacheFile(new URI("file:///app/cache/product.txt"));// 如果是集群运行,需要设置成 hdfs 路径
在Map端进行 join,此时不再需要Reduce阶段,可以设置 ReduceTask数量为0
// 因为在Map端已经完成了join,达到了目的,不再需要ReduceTask,可以设置取消ReduceTaskjob.setNumReduceTasks(0);
编写Driver
```java package com.study.mapreduce.mapjoin;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;
public class MapJoinDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { Configuration config = new Configuration(); Job job = Job.getInstance(config);
job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MapJoinMapper.class);job.setMapOutputKeyClass(Text.class);// 不需要汇总出Value,可以设置为null(NullWritable.get())job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 设置取消ReduceTaskjob.setNumReduceTasks(0);// 加载缓存数据job.addCacheFile(new URI("/app/order/input/product.txt"));FileInputFormat.setInputPaths(job, new Path("/app/order/input/order.txt"));FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output3"));boolean success = job.waitForCompletion(true);System.exit(success ? 0 : 1); // 程序退出}
}
<a name="vCMdj"></a>### 编写Mapper```javapackage com.study.mapreduce.mapjoin;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;import java.util.Map;public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private Map<String, String> productMap = new HashMap<>(); // product表数据private Text outKey = new Text();/*** 在MapTask初始化时加载缓存文件*/@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 获取product.txt缓存文件URI[] cacheFiles = context.getCacheFiles();URI productFile = cacheFiles[0];// 读取product.txt文件FileSystem fileSystem = FileSystem.get(context.getConfiguration());FSDataInputStream inputStream = fileSystem.open(new Path(productFile));BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));String line = "";while((line = br.readLine()) != null) {String[] strArray = line.split("@");productMap.put(strArray[0], strArray[1]);}IOUtils.closeStream(br);}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("@");String id = fields[0];String pid = fields[1];int amount = Integer.valueOf(fields[2]);String pname = productMap.get(pid);String resultKey = id +"\t" + pname + "\t" + amount;outKey.set(resultKey);context.write(outKey, NullWritable.get());}}
