需求

对2张表进行合并,并对产品id进行排序
image.png

maptask

map中处理的事情

  1. 获取输入文件类型
  2. 获取输入数据
  3. 不同文件分别处理
  4. 封装bean对象输出

image.png

加个flage标记是那个表

默认对产品id排序

image.png

reducetask

reduce方法缓存订单数据集合和产品表,然后合并

image.png

准备数据

order.txt

  1. 1001 01 1
  2. 1002 02 2
  3. 1003 03 3
  4. 1001 01 1
  5. 1002 02 2
  6. 1003 03 3

pd.txt

  1. 01 小米
  2. 02 华为
  3. 03 格力

代码

bean

  1. import org.apache.hadoop.io.Writable;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. public class TableBean implements Writable {
  6. //订单id
  7. private String order_id;
  8. //产品id
  9. private String pid;
  10. //产品数量
  11. private int amount;
  12. //产品名称
  13. private String pName;
  14. //标记是订单表(0)还是产品表(1)
  15. private String flag;
  16. public TableBean() {
  17. super();
  18. }
  19. public TableBean(String order_id, String pid, int amount, String pName, String flag) {
  20. super();
  21. this.order_id = order_id;
  22. this.pid = pid;
  23. this.amount = amount;
  24. this.pName = pName;
  25. this.flag = flag;
  26. }
  27. @Override
  28. public void write(DataOutput out) throws IOException {
  29. out.writeUTF(order_id);
  30. out.writeUTF(pid);
  31. out.writeInt(amount);
  32. out.writeUTF(pName);
  33. out.writeUTF(flag);
  34. }
  35. @Override
  36. public void readFields(DataInput in) throws IOException {
  37. order_id = in.readUTF();
  38. pid = in.readUTF();
  39. amount = in.readInt();
  40. pName = in.readUTF();
  41. flag = in.readUTF();
  42. }
  43. //getter/setter/toString
  44. }

map阶段

  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Mapper;
  4. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  5. import java.io.IOException;
  6. public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
  7. Text k = new Text();
  8. TableBean v = new TableBean();
  9. @Override
  10. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  11. //区分两张表
  12. FileSplit split = (FileSplit) context.getInputSplit();
  13. String name = split.getPath().getName();
  14. //获取一行
  15. String line = value.toString();
  16. if (name.startsWith("order")) {
  17. //订单表
  18. String[] fields = line.split(" ");
  19. v.setOrder_id(fields[0]);
  20. v.setPid(fields[1]);
  21. v.setAmount(Integer.parseInt(fields[2]));
  22. v.setpName("");
  23. v.setFlag("0");
  24. k.set(fields[1]);
  25. } else {
  26. //产品表
  27. String[] fields = line.split(" ");
  28. v.setOrder_id("");
  29. v.setPid(fields[0]);
  30. v.setAmount(0);
  31. v.setpName(fields[1]);
  32. v.setFlag("1");
  33. k.set(fields[0]);
  34. }
  35. context.write(k, v);
  36. }
  37. }

reduce阶段

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

public class TableReucer extends Reducer<Text, TableBean, TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        //准备存储订单的集合
        ArrayList<TableBean> orderBeans = new ArrayList<>();
        //准备bean对象
        TableBean pdBean = new TableBean();

        //把订单表放到集合中,产品表放到bean中
        for (TableBean bean : values) {
            if ("0".equals(bean.getFlag())) {
                //订单表
                //拷贝传递过来的每条订单数据到集合中
                TableBean orderBean = new TableBean();
                try {
                    BeanUtils.copyProperties(orderBean, bean);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //如果不放拷贝的在这边的话,放bean的话,这边会一直都是最后一个值
                orderBeans.add(orderBean);
            } else {
                //产品表
                try {
                    //拷贝传递过来的产品表到bean中
                    BeanUtils.copyProperties(pdBean, bean);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        //拼接表,循环链表,拼接数据
        for (TableBean tableBean : orderBeans) {
            tableBean.setpName(pdBean.getpName());
            context.write(tableBean, NullWritable.get());
        }

    }
}

驱动类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReucer.class);

        //告诉框架,我们程序输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        //设置输入文件和输出路径
        FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input"));
        FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/data/output"));

        job.waitForCompletion(true);
    }
}

缺点

合并的操作是在reduce阶段完成的,reduce端处理压力太大,map节点的运算负载则很低,资源利用率不高,而且在reduce阶段容易产生数据倾斜