Join转换两个文件码值

类似SQL中的 join,需要将两个不同文件进行关联输出,例如一张是详情数据、一张是码值数据,需要关联输出的详情中进行了码值转换。

在 Reduce 端进行 join

Map端的主要工作:为来自不同表或文件的 key-value 对,打标签以区别不同来源的记录。然后用连接字段作为 key ,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在Reduce端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就可以了。

示例,将订单表和产品表使用MapReduce做 join。

在Mapper中,将 pid 设置为 key,这样相同 key 的订单数据、产品数据 就都能进入同一个 reduce 方法中处理。

然后在这个reduce方法中,判断如果是产品表,就获取产品名称;如果是订单表,就先存入集合。最后再遍历订单集合,将产品名称设置进去。

订单表表数据:

id(订单编号) pid(产品编号) amount(数量)
1001 01 1
1002 02 1
1003 03 4
1004 01 2
1005 01 3
1006 02 5

产品表:

pid(产品编号) pname(产品名称)
01 苹果
02 桔子
03 香蕉

最终输出结果:订单编号、产品名称、数量。

示例输入数据:

order.txt:

  1. 1001@01@1
  2. 1002@02@1
  3. 1003@03@4
  4. 1004@01@2
  5. 1005@01@3
  6. 1006@02@5

product.txt

01@苹果
02@桔子
03@香蕉

编写JavaBean:

package com.study.mapreduce.reducejoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * JavaBean结果
 */
public class OrderInfo implements Writable {
    private String id;  // 订单编号
    private String pid;  // 产品编号
    private Integer amount;  // 数量
    private String pname;  // 产品名称
    private String _from_table;  // 标记该条数据的来源表:order、product

    public OrderInfo() {
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);  // 序列化String时,使用 writeUTF写出、readUTF读取
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(_from_table);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readUTF(); 
        pid = in.readUTF();
        amount = in.readInt();
        pname = in.readUTF();
        _from_table = in.readUTF();
    }


    // 重写toString,将JavaBean输出到文件时会调用toString方法输出
    @Override
    public String toString() {
        return "OrderInfo{" +
                "id='" + id + '\'' +
                ", pid='" + pid + '\'' +
                ", amount=" + amount +
                ", pname='" + pname + '\'' +
                ", _from_table='" + _from_table + '\'' +
                '}';
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public Integer getAmount() {
        return amount;
    }

    public void setAmount(Integer amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String get_from_table() {
        return _from_table;
    }

    public void set_from_table(String _from_table) {
        this._from_table = _from_table;
    }
}

编写Mapper:

package com.study.mapreduce.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;

import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, OrderInfo> {
    private Logger logger = Logger.getLogger(ReduceJoinMapper.class);

    private Text outKey = new Text();
    private String fileName = ""; // 当前MapTask处理的文件名称

    /**
     * 重写初始化方法,在初始化方法中通过切片信息获取到文件名称,避免每次都在map方法中读取耗费资源
     * 因为每个切片对应一个MapTask,所以这个MapTask的初始化方法就是这个切片的初始化方法
     * 因为一个文件可以切多个片,但是一个切片只会对应一个文件,所以这个切片对应的文件信息是固定的
     */
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
        // JobSubmitter 的 writeNewSplits 中会调用 input.getSplits(job) 进行切片
        // 我们使用的默认InputFormat,也就是 TextInputFormat。TextInputFormat 的 getSplits 方法在 FileInputFormat(TextInputFormat的父类)中
        // FileInputFormat 的 getSplits 方法会调用makeSplit创建 FileSplit 类型的切片对象
        fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderInfo>.Context context) throws IOException, InterruptedException {
        String[] valueArray = value.toString().split("@");

        logger.info("---------------------------------------------------------");
        logger.info("fileName:" + fileName);

        OrderInfo orderInfo = new OrderInfo();
        if(fileName.startsWith("order")) {  // 处理 order.txt文件的内容
            orderInfo.setId(valueArray[0]);
            orderInfo.setPid(valueArray[1]);
            orderInfo.setAmount(Integer.valueOf(valueArray[2]));
            orderInfo.set_from_table("_ORDER_");  // 设置来源表为order表
            orderInfo.setPname("");  // 属性值不能有null,否则会报NPE空指针异常。可以设置一个默认值

            outKey.set(valueArray[1]);
        } else {   // 处理 product.txt文件的内容
            orderInfo.setPid(valueArray[0]);
            orderInfo.setPname(valueArray[1]);
            orderInfo.set_from_table("_PRODUCT_");  // 设置来源表为product
            // 防止空指针异常,对null属性设置一个默认值
            orderInfo.setId("");
            orderInfo.setAmount(-1);

            outKey.set(valueArray[0]);
        }
        context.write(outKey, orderInfo);

        logger.info(orderInfo);
        logger.info("-------------------------------------------");
    }
}

编写Reducer:

package com.study.mapreduce.reducejoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class ReduceJoinReducer extends Reducer<Text, OrderInfo, Text, OrderInfo> {

    private Logger logger = Logger.getLogger(ReduceJoinReducer.class);

    @Override
    protected void reduce(Text key, Iterable<OrderInfo> values, Reducer<Text, OrderInfo, Text, OrderInfo>.Context context) throws IOException, InterruptedException {

        logger.info("=====================================");
        logger.info("key: " + key);

        List<OrderInfo> orderInfoList = new ArrayList<>();
        String pname = "";

        Iterator<OrderInfo> iterator = values.iterator();
        while (iterator.hasNext()) {
            OrderInfo orderInfo = iterator.next();
            logger.info(orderInfo);
            if("_PRODUCT_".equals(orderInfo.get_from_table())) {
                pname = orderInfo.getPname();
            } else {

                // 此处不能直接使用:orderInfoList.add(orderInfo);
                // Hadoop迭代器为了优化效率,使用了对象重用。
                // 迭代时value始终指向同一个内存地址,改变的只是引用地址中的字段属性
                // 即这个循环了这么多次的orderInfo,其实是同一个对象,只是对象的属性值在不断变化
                // 所以这里需要 new 一个新对象来存放这些属性,然后将这个新对象塞给集合

                OrderInfo orderInfoTemp = new OrderInfo();
//                orderInfoTemp.setId(orderInfo.getId());
//                orderInfoTemp.setPid(orderInfo.getPid());
//                orderInfoTemp.setAmount(orderInfo.getAmount());
//                orderInfoTemp.set_from_table(orderInfo.get_from_table());
                try {
                    // 使用BeanUtils工具对同名属性进行赋值
                    // 第一个参数:要设置值的目标对象
                    // 第二个参数:源对象
                    BeanUtils.copyProperties(orderInfoTemp, orderInfo);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                orderInfoList.add(orderInfoTemp);
            }
        }

        logger.info("**********************************");
        for (OrderInfo orderInfo : orderInfoList) {
            orderInfo.setPname(pname);
            context.write(key, orderInfo);
            logger.info(orderInfo);
        }
        logger.info("===========================================");
    }
}

编写Driver:

package com.study.mapreduce.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReduceJoinDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration config = new Configuration();
        Job job = Job.getInstance(config);

        job.setJarByClass(ReduceJoinDriver.class);

        job.setMapperClass(ReduceJoinMapper.class);
        job.setReducerClass(ReduceJoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(OrderInfo.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(OrderInfo.class);

        FileInputFormat.setInputPaths(job, new Path("/app/order/input"));
        FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output2"));

        boolean success = job.waitForCompletion(true);

        System.exit(success ? 0 : 1);  // 程序退出
    }
}

输出结果:

01    OrderInfo{id='1005', pid='01', amount=3, pname='苹果', _from_table='_ORDER_'}
01    OrderInfo{id='1004', pid='01', amount=2, pname='苹果', _from_table='_ORDER_'}
01    OrderInfo{id='1001', pid='01', amount=1, pname='苹果', _from_table='_ORDER_'}
02    OrderInfo{id='1006', pid='02', amount=5, pname='桔子', _from_table='_ORDER_'}
02    OrderInfo{id='1002', pid='02', amount=1, pname='桔子', _from_table='_ORDER_'}
03    OrderInfo{id='1003', pid='03', amount=4, pname='香蕉', _from_table='_ORDER_'}

在 Map 端进行 Join

join合并的操作如果在Reduce段完成,Reduce端的处理压力就会很大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易发生数据倾斜。

因为一般情况下,MapTask的节点数量要多于ReduceTask,可以在Map段进行数据join合并。

适用场景:一张表很小、另一张表很大。先将小表读取加载进内存中,然后在Map端进行 join合并。

具体方法:采用DistributedCache

  1. Mappersetup阶段,将文件读取到缓存集合中

  2. Driver驱动类中加载缓存:

    // 缓存普通文件到Task运行节点
    job.addCacheFile(new URI("file:///app/cache/product.txt"));
    // 如果是集群运行,需要设置成 hdfs 路径
    
  1. 在Map端进行 join,此时不再需要Reduce阶段,可以设置 ReduceTask数量为0
    // 因为在Map端已经完成了join,达到了目的,不再需要ReduceTask,可以设置取消ReduceTask
    job.setNumReduceTasks(0);
    

示例:

编写Driver:

package com.study.mapreduce.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
        Configuration config = new Configuration();
        Job job = Job.getInstance(config);

        job.setJarByClass(MapJoinDriver.class);

        job.setMapperClass(MapJoinMapper.class);

        job.setMapOutputKeyClass(Text.class);
        // 不需要汇总出Value,可以设置为null(NullWritable.get())
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 设置取消ReduceTask
        job.setNumReduceTasks(0);

        // 加载缓存数据
        job.addCacheFile(new URI("/app/order/input/product.txt"));

        FileInputFormat.setInputPaths(job, new Path("/app/order/input/order.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/app/order/output/output3"));

        boolean success = job.waitForCompletion(true);

        System.exit(success ? 0 : 1);  // 程序退出
    }
}

编写Mapper:

package com.study.mapreduce.mapjoin;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private Map<String, String> productMap = new HashMap<>();  // product表数据

    private Text outKey = new Text();

    /**
     * 在MapTask初始化时加载缓存文件
     */
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 获取product.txt缓存文件
        URI[] cacheFiles = context.getCacheFiles();
        URI productFile = cacheFiles[0];

        // 读取product.txt文件
        FileSystem fileSystem = FileSystem.get(context.getConfiguration());
        FSDataInputStream inputStream = fileSystem.open(new Path(productFile));
        BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
        String line = "";
        while((line = br.readLine()) != null) {
            String[] strArray = line.split("@");
            productMap.put(strArray[0], strArray[1]);
        }

        IOUtils.closeStream(br);
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("@");
        String id = fields[0];
        String pid = fields[1];
        int amount = Integer.valueOf(fields[2]);
        String pname = productMap.get(pid);

        String resultKey = id  +"\t" + pname + "\t" + amount;
        outKey.set(resultKey);
        context.write(outKey, NullWritable.get());
    }
}