类似SQL中的join,需要将两个不同文件进行关联输出,例如一张是详情数据、一张是码值数据,需要关联输出的详情中进行了码值转换。

在 Reduce 端进行 join

Map端的主要工作:为来自不同表或文件的 key-value 对,打标签以区别不同来源的记录。然后用连接字段作为 key ,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就可以了。

示例,将订单表和产品表使用MapReduce做 join。
在Mapper中,将pid设置为 key,这样相同 key 的订单数据、产品数据 就都能进入同一个 reduce 方法中处理。
然后在这个reduce方法中,判断如果是产品表,就获取产品名称;如果是订单表,就先存入集合。最后再遍历订单集合,将产品名称设置进去。
订单表表数据:

id(订单编号) pid(产品编号) amount(数量)
1001 01 1
1002 02 1
1003 03 4
1004 01 2
1005 01 3
1006 02 5

产品表:

pid(产品编号) pname(产品名称)
01 苹果
02 桔子
03 香蕉

最终输出结果:订单编号、产品名称、数量。
示例输入数据:
order.txt:

  1. 1001@01@1
  2. 1002@02@1
  3. 1003@03@4
  4. 1004@01@2
  5. 1005@01@3
  6. 1006@02@5

product.txt:

  1. 01@苹果
  2. 02@桔子
  3. 03@香蕉

编写JavaBean

  1. package com.study.mapreduce.reducejoin;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. /**
  7. * JavaBean结果
  8. */
  9. public class OrderInfo implements Writable {
  10. private String id; // 订单编号
  11. private String pid; // 产品编号
  12. private Integer amount; // 数量
  13. private String pname; // 产品名称
  14. private String _from_table; // 标记该条数据的来源表:order、product
  15. public OrderInfo() {
  16. }
  17. @Override
  18. public void write(DataOutput out) throws IOException {
  19. out.writeUTF(id); // 序列化String时,使用 writeUTF写出、readUTF读取
  20. out.writeUTF(pid);
  21. out.writeInt(amount);
  22. out.writeUTF(pname);
  23. out.writeUTF(_from_table);
  24. }
  25. @Override
  26. public void readFields(DataInput in) throws IOException {
  27. id = in.readUTF();
  28. pid = in.readUTF();
  29. amount = in.readInt();
  30. pname = in.readUTF();
  31. _from_table = in.readUTF();
  32. }
  33. // 重写toString,将JavaBean输出到文件时会调用toString方法输出
  34. @Override
  35. public String toString() {
  36. return "OrderInfo{" +
  37. "id='" + id + '\'' +
  38. ", pid='" + pid + '\'' +
  39. ", amount=" + amount +
  40. ", pname='" + pname + '\'' +
  41. ", _from_table='" + _from_table + '\'' +
  42. '}';
  43. }
  44. public String getId() {
  45. return id;
  46. }
  47. public void setId(String id) {
  48. this.id = id;
  49. }
  50. public String getPid() {
  51. return pid;
  52. }
  53. public void setPid(String pid) {
  54. this.pid = pid;
  55. }
  56. public Integer getAmount() {
  57. return amount;
  58. }
  59. public void setAmount(Integer amount) {
  60. this.amount = amount;
  61. }
  62. public String getPname() {
  63. return pname;
  64. }
  65. public void setPname(String pname) {
  66. this.pname = pname;
  67. }
  68. public String get_from_table() {
  69. return _from_table;
  70. }
  71. public void set_from_table(String _from_table) {
  72. this._from_table = _from_table;
  73. }
  74. }

编写Mapper

  1. package com.study.mapreduce.reducejoin;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  6. import org.apache.log4j.Logger;
  7. import java.io.IOException;
  8. public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, OrderInfo> {
  9. private Logger logger = Logger.getLogger(ReduceJoinMapper.class);
  10. private Text outKey = new Text();
  11. private String fileName = ""; // 当前MapTask处理的文件名称
  12. /**
  13. * 重写初始化方法,在初始化方法中通过切片信息【获取到文件名称】,避免每次都在map方法中读取耗费资源
  14. * 因为每个切片对应一个MapTask,所以这个MapTask的初始化方法就是这个切片的初始化方法
  15. * 因为一个文件可以切多个片,但是一个切片只会对应一个文件,所以这个切片对应的文件信息是固定的
  16. */
  17. @Override
  18. protected void setup(Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
  19. // JobSubmitter 的 writeNewSplits 中会调用 input.getSplits(job) 进行切片
  20. // 我们使用的默认InputFormat,也就是 TextInputFormat。TextInputFormat 的 getSplits 方法在 FileInputFormat(TextInputFormat的父类)中
  21. // FileInputFormat 的 getSplits 方法会调用makeSplit创建 FileSplit 类型的切片对象
  22. fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
  23. }
  24. @Override
  25. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
  26. String[] valueArray = value.toString().split("@");
  27. logger.info("---------------------------------------------------------");
  28. logger.info("fileName:" + fileName);
  29. OrderInfo orderInfo = new OrderInfo();
  30. if(fileName.startsWith("order")) { // 处理 order.txt文件的内容
  31. orderInfo.setId(valueArray[0]);
  32. orderInfo.setPid(valueArray[1]);
  33. orderInfo.setAmount(Integer.valueOf(valueArray[2]));
  34. orderInfo.set_from_table("_ORDER_"); // 设置来源表为order表
  35. orderInfo.setPname(""); // 属性值不能有null,否则会报NPE空指针异常。可以设置一个默认值
  36. outKey.set(valueArray[1]);
  37. } else { // 处理 product.txt文件的内容
  38. orderInfo.setPid(valueArray[0]);
  39. orderInfo.setPname(valueArray[1]);
  40. orderInfo.set_from_table("_PRODUCT_"); // 设置来源表为product
  41. // 防止空指针异常,对null属性设置一个默认值
  42. orderInfo.setId("");
  43. orderInfo.setAmount(-1);
  44. outKey.set(valueArray[0]);
  45. }
  46. context.write(outKey, orderInfo);
  47. logger.info(orderInfo);
  48. logger.info("-------------------------------------------");
  49. }
  50. }

编写Reducer

  1. package com.study.mapreduce.reducejoin;
  2. import org.apache.commons.beanutils.BeanUtils;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import org.apache.log4j.Logger;
  6. import java.io.IOException;
  7. import java.util.ArrayList;
  8. import java.util.Iterator;
  9. import java.util.List;
  10. public class ReduceJoinReducer extends Reducer<Text, OrderInfo, Text, OrderInfo> {
  11. private Logger logger = Logger.getLogger(ReduceJoinReducer.class);
  12. @Override
  13. protected void reduce(Text key, Iterable<OrderInfo> values, Reducer<Text, OrderInfo, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
  14. logger.info("=====================================");
  15. logger.info("key: " + key);
  16. List<OrderInfo> orderInfoList = new ArrayList<>();
  17. String pname = "";
  18. Iterator<OrderInfo> iterator = values.iterator();
  19. while (iterator.hasNext()) {
  20. OrderInfo orderInfo = iterator.next();
  21. logger.info(orderInfo);
  22. if("_PRODUCT_".equals(orderInfo.get_from_table())) {
  23. pname = orderInfo.getPname();
  24. } else {
  25. // 此处不能直接使用:orderInfoList.add(orderInfo);
  26. // Hadoop迭代器为了优化效率,使用了对象重用。
  27. // 迭代时value始终指向同一个内存地址,改变的只是引用地址中的字段属性
  28. // 即这个循环了这么多次的orderInfo,其实是同一个对象,只是对象的属性值在不断变化
  29. // 所以这里需要 new 一个新对象来存放这些属性,然后将这个新对象塞给集合
  30. OrderInfo orderInfoTemp = new OrderInfo();
  31. // orderInfoTemp.setId(orderInfo.getId());
  32. // orderInfoTemp.setPid(orderInfo.getPid());
  33. // orderInfoTemp.setAmount(orderInfo.getAmount());
  34. // orderInfoTemp.set_from_table(orderInfo.get_from_table());
  35. try {
  36. // 使用BeanUtils工具对同名属性进行赋值
  37. // 第一个参数:要设置值的目标对象
  38. // 第二个参数:源对象
  39. BeanUtils.copyProperties(orderInfoTemp, orderInfo);
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. orderInfoList.add(orderInfoTemp);
  44. }
  45. }
  46. logger.info("**********************************");
  47. for (OrderInfo orderInfo : orderInfoList) {
  48. orderInfo.setPname(pname);
  49. context.write(key, orderInfo);
  50. logger.info(orderInfo);
  51. }
  52. logger.info("===========================================");
  53. }
  54. }

编写Driver

  1. package com.study.mapreduce.reducejoin;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.io.IOException;
  9. public class ReduceJoinDriver {
  10. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  11. Configuration config = new Configuration();
  12. Job job = Job.getInstance(config);
  13. job.setJarByClass(ReduceJoinDriver.class);
  14. job.setMapperClass(ReduceJoinMapper.class);
  15. job.setReducerClass(ReduceJoinReducer.class);
  16. job.setMapOutputKeyClass(Text.class);
  17. job.setMapOutputValueClass(OrderInfo.class);
  18. job.setOutputKeyClass(Text.class);
  19. job.setOutputValueClass(OrderInfo.class);
  20. FileInputFormat.setInputPaths(job, new Path("/app/order/input"));
  21. FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output2"));
  22. boolean success = job.waitForCompletion(true);
  23. System.exit(success ? 0 : 1); // 程序退出
  24. }
  25. }

输出结果:

  1. 01 OrderInfo{id='1005', pid='01', amount=3, pname='苹果', _from_table='_ORDER_'}
  2. 01 OrderInfo{id='1004', pid='01', amount=2, pname='苹果', _from_table='_ORDER_'}
  3. 01 OrderInfo{id='1001', pid='01', amount=1, pname='苹果', _from_table='_ORDER_'}
  4. 02 OrderInfo{id='1006', pid='02', amount=5, pname='桔子', _from_table='_ORDER_'}
  5. 02 OrderInfo{id='1002', pid='02', amount=1, pname='桔子', _from_table='_ORDER_'}
  6. 03 OrderInfo{id='1003', pid='03', amount=4, pname='香蕉', _from_table='_ORDER_'}

在 Map 端进行 Join

join合并的操作如果在Reduce段完成,Reduce端的处理压力就会很大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易发生数据倾斜。
因为一般情况下,MapTask的节点数量要多于ReduceTask,可以在Map段进行数据join合并。
适用场景:一张表很小、另一张表很大。先将小表读取加载进内存中,然后在Map端进行 join合并。

具体方法:采用DistributedCache

  1. 在Mapper的setup阶段,将文件读取到缓存集合中
  2. 在Driver驱动类中加载缓存:

    1. // 缓存普通文件到Task运行节点
    2. job.addCacheFile(new URI("file:///app/cache/product.txt"));
    3. // 如果是集群运行,需要设置成 hdfs 路径
  3. 在Map端进行 join,此时不再需要Reduce阶段,可以设置 ReduceTask数量为0

    1. // 因为在Map端已经完成了join,达到了目的,不再需要ReduceTask,可以设置取消ReduceTask
    2. job.setNumReduceTasks(0);

    编写Driver

    ```java package com.study.mapreduce.mapjoin;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;

public class MapJoinDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { Configuration config = new Configuration(); Job job = Job.getInstance(config);

  1. job.setJarByClass(MapJoinDriver.class);
  2. job.setMapperClass(MapJoinMapper.class);
  3. job.setMapOutputKeyClass(Text.class);
  4. // 不需要汇总出Value,可以设置为null(NullWritable.get())
  5. job.setMapOutputValueClass(NullWritable.class);
  6. job.setOutputKeyClass(Text.class);
  7. job.setOutputValueClass(NullWritable.class);
  8. // 设置取消ReduceTask
  9. job.setNumReduceTasks(0);
  10. // 加载缓存数据
  11. job.addCacheFile(new URI("/app/order/input/product.txt"));
  12. FileInputFormat.setInputPaths(job, new Path("/app/order/input/order.txt"));
  13. FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output3"));
  14. boolean success = job.waitForCompletion(true);
  15. System.exit(success ? 0 : 1); // 程序退出
  16. }

}

  1. <a name="vCMdj"></a>
  2. ### 编写Mapper
  3. ```java
  4. package com.study.mapreduce.mapjoin;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IOUtils;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import java.io.BufferedReader;
  14. import java.io.IOException;
  15. import java.io.InputStreamReader;
  16. import java.net.URI;
  17. import java.util.HashMap;
  18. import java.util.Map;
  19. public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  20. private Map<String, String> productMap = new HashMap<>(); // product表数据
  21. private Text outKey = new Text();
  22. /**
  23. * 在MapTask初始化时加载缓存文件
  24. */
  25. @Override
  26. protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
  27. // 获取product.txt缓存文件
  28. URI[] cacheFiles = context.getCacheFiles();
  29. URI productFile = cacheFiles[0];
  30. // 读取product.txt文件
  31. FileSystem fileSystem = FileSystem.get(context.getConfiguration());
  32. FSDataInputStream inputStream = fileSystem.open(new Path(productFile));
  33. BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
  34. String line = "";
  35. while((line = br.readLine()) != null) {
  36. String[] strArray = line.split("@");
  37. productMap.put(strArray[0], strArray[1]);
  38. }
  39. IOUtils.closeStream(br);
  40. }
  41. @Override
  42. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
  43. String line = value.toString();
  44. String[] fields = line.split("@");
  45. String id = fields[0];
  46. String pid = fields[1];
  47. int amount = Integer.valueOf(fields[2]);
  48. String pname = productMap.get(pid);
  49. String resultKey = id +"\t" + pname + "\t" + amount;
  50. outKey.set(resultKey);
  51. context.write(outKey, NullWritable.get());
  52. }
  53. }