需求
对2张表进行合并,并对产品id进行排序
maptask
map中处理的事情
- 获取输入文件类型
- 获取输入数据
- 不同文件分别处理
- 封装bean对象输出

加个flage标记是那个表
默认对产品id排序

reducetask
reduce方法缓存订单数据集合和产品表,然后合并

准备数据
order.txt
1001 01 11002 02 21003 03 31001 01 11002 02 21003 03 3
pd.txt
01 小米02 华为03 格力
代码
bean
import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class TableBean implements Writable {//订单idprivate String order_id;//产品idprivate String pid;//产品数量private int amount;//产品名称private String pName;//标记是订单表(0)还是产品表(1)private String flag;public TableBean() {super();}public TableBean(String order_id, String pid, int amount, String pName, String flag) {super();this.order_id = order_id;this.pid = pid;this.amount = amount;this.pName = pName;this.flag = flag;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(order_id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pName);out.writeUTF(flag);}@Overridepublic void readFields(DataInput in) throws IOException {order_id = in.readUTF();pid = in.readUTF();amount = in.readInt();pName = in.readUTF();flag = in.readUTF();}//getter/setter/toString}
map阶段
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {Text k = new Text();TableBean v = new TableBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//区分两张表FileSplit split = (FileSplit) context.getInputSplit();String name = split.getPath().getName();//获取一行String line = value.toString();if (name.startsWith("order")) {//订单表String[] fields = line.split(" ");v.setOrder_id(fields[0]);v.setPid(fields[1]);v.setAmount(Integer.parseInt(fields[2]));v.setpName("");v.setFlag("0");k.set(fields[1]);} else {//产品表String[] fields = line.split(" ");v.setOrder_id("");v.setPid(fields[0]);v.setAmount(0);v.setpName(fields[1]);v.setFlag("1");k.set(fields[0]);}context.write(k, v);}}
reduce阶段
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
public class TableReucer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//准备存储订单的集合
ArrayList<TableBean> orderBeans = new ArrayList<>();
//准备bean对象
TableBean pdBean = new TableBean();
//把订单表放到集合中,产品表放到bean中
for (TableBean bean : values) {
if ("0".equals(bean.getFlag())) {
//订单表
//拷贝传递过来的每条订单数据到集合中
TableBean orderBean = new TableBean();
try {
BeanUtils.copyProperties(orderBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
//如果不放拷贝的在这边的话,放bean的话,这边会一直都是最后一个值
orderBeans.add(orderBean);
} else {
//产品表
try {
//拷贝传递过来的产品表到bean中
BeanUtils.copyProperties(pdBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//拼接表,循环链表,拼接数据
for (TableBean tableBean : orderBeans) {
tableBean.setpName(pdBean.getpName());
context.write(tableBean, NullWritable.get());
}
}
}
驱动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReucer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
//设置输入文件和输出路径
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input"));
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/data/output"));
job.waitForCompletion(true);
}
}
缺点
合并的操作是在reduce阶段完成的,reduce端处理压力太大,map节点的运算负载则很低,资源利用率不高,而且在reduce阶段容易产生数据倾斜
