需求
对2张表进行合并,并对产品id进行排序
maptask
map中处理的事情
- 获取输入文件类型
- 获取输入数据
- 不同文件分别处理
- 封装bean对象输出
加个flage标记是那个表
默认对产品id排序
reducetask
reduce方法缓存订单数据集合和产品表,然后合并
准备数据
order.txt
1001 01 1
1002 02 2
1003 03 3
1001 01 1
1002 02 2
1003 03 3
pd.txt
01 小米
02 华为
03 格力
代码
bean
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TableBean implements Writable {
//订单id
private String order_id;
//产品id
private String pid;
//产品数量
private int amount;
//产品名称
private String pName;
//标记是订单表(0)还是产品表(1)
private String flag;
public TableBean() {
super();
}
public TableBean(String order_id, String pid, int amount, String pName, String flag) {
super();
this.order_id = order_id;
this.pid = pid;
this.amount = amount;
this.pName = pName;
this.flag = flag;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pName);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readUTF();
pid = in.readUTF();
amount = in.readInt();
pName = in.readUTF();
flag = in.readUTF();
}
//getter/setter/toString
}
map阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
Text k = new Text();
TableBean v = new TableBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//区分两张表
FileSplit split = (FileSplit) context.getInputSplit();
String name = split.getPath().getName();
//获取一行
String line = value.toString();
if (name.startsWith("order")) {
//订单表
String[] fields = line.split(" ");
v.setOrder_id(fields[0]);
v.setPid(fields[1]);
v.setAmount(Integer.parseInt(fields[2]));
v.setpName("");
v.setFlag("0");
k.set(fields[1]);
} else {
//产品表
String[] fields = line.split(" ");
v.setOrder_id("");
v.setPid(fields[0]);
v.setAmount(0);
v.setpName(fields[1]);
v.setFlag("1");
k.set(fields[0]);
}
context.write(k, v);
}
}
reduce阶段
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
public class TableReucer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//准备存储订单的集合
ArrayList<TableBean> orderBeans = new ArrayList<>();
//准备bean对象
TableBean pdBean = new TableBean();
//把订单表放到集合中,产品表放到bean中
for (TableBean bean : values) {
if ("0".equals(bean.getFlag())) {
//订单表
//拷贝传递过来的每条订单数据到集合中
TableBean orderBean = new TableBean();
try {
BeanUtils.copyProperties(orderBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
//如果不放拷贝的在这边的话,放bean的话,这边会一直都是最后一个值
orderBeans.add(orderBean);
} else {
//产品表
try {
//拷贝传递过来的产品表到bean中
BeanUtils.copyProperties(pdBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//拼接表,循环链表,拼接数据
for (TableBean tableBean : orderBeans) {
tableBean.setpName(pdBean.getpName());
context.write(tableBean, NullWritable.get());
}
}
}
驱动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReucer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
//设置输入文件和输出路径
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input"));
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/data/output"));
job.waitForCompletion(true);
}
}
缺点
合并的操作是在reduce阶段完成的,reduce端处理压力太大,map节点的运算负载则很低,资源利用率不高,而且在reduce阶段容易产生数据倾斜