前言:通过MR不同方式的Join编程是为了更加熟悉join的实现过程以及不同方式的优缺点,切记,生产中要杜绝写MR,本文只供学习参考
1.需求
有两张表,分表是产品信息数据以及用户页面点击日志数据如下:
#产品信息数据:product_info.txt
#c1=产品ID(id),c2=产品名称(name),c3=价格(privce),c4=生产国家(country)
p0001,华为,8000,中国
p0002,小米,3000,中国
p0003,苹果,1500,美国
p0004,三星,10000,韩国
#用户页面点击日志数据:page_click_log.txt
#c1=用户ID(id),c2=产品id(prod_id),c3=点击时间(click_time),c4=动作发生地区(area)
u0001,p0001,20190301040123,华中
u0002,p0002,20190302040124,华北
u0003,p0003,20190303040124,华南
u0004,p0004,20190304040124,华南
由于点击日志的数据量过去庞大,数据是存在HDFS上,故需要使用MR来实现如下的逻辑:
select b.id,b.name,b.privce,b.country,a.id,a.click_time,a.area
from page_click_log a join product_info b on a.prod_id=b.id
2.Map端实现Join
2.1思路分析
可以将小表数据分发到所有的map节点,然后可以与在本所读到的大表数据进行join并输出最终结果
优缺点:大大提高了jion的并发,速度快
2.2编程实现
数据封装类Info.java
package com.wsk.bigdata.pojo;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
public class Info implements Writable {
/**
* 产品唯一标识id
*/
private String pId;
/**
* 产品名称
*/
private String pName;
/**
* 产品价格
*/
private float price;
/**
* 产品生产地区
*/
private String produceArea;
/**
* 用户Id
*/
private String uId;
/**
* 用户点击时间戳:yyyyMMddHHmmss
*/
private String dateStr;
/**
* 用户点击发生地区
*/
private String clickArea;
/**
* flag=0,表示封装用户点击日志数据
* flag=1,表示封装产品信息
*/
private String flag;
public String getpId() {
return pId;
}
public void setpId(String pId) {
this.pId = pId;
}
public String getpName() {
return pName;
}
public void setpName(String pName) {
this.pName = pName;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
public String getProduceArea() {
return produceArea;
}
public void setProduceArea(String produceArea) {
this.produceArea = produceArea;
}
public String getuId() {
return uId;
}
public void setuId(String uId) {
this.uId = uId;
}
public String getDateStr() {
return dateStr;
}
public void setDateStr(String dateStr) {
this.dateStr = dateStr;
}
public String getClickArea() {
return clickArea;
}
public void setClickArea(String clickArea) {
this.clickArea = clickArea;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
public Info(String pId, String pName, float price, String produceArea, String uId, String dateStr, String clickArea, String flag) {
this.pId = pId;
this.pName = pName;
this.price = price;
this.produceArea = produceArea;
this.uId = uId;
this.dateStr = dateStr;
this.clickArea = clickArea;
this.flag = flag;
}
public Info() {
}
@Override
public String toString() {
String[] fileds = {this.pId,};
return "pid=" + this.pId + ",pName=" + this.pName + ",price=" + this.price
+ ",produceArea=" + this.produceArea
+ ",uId=" + this.uId + ",clickDate=" + this.dateStr + ",clickArea=" + this.clickArea;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.pId);
out.writeUTF(this.pName);
out.writeFloat(this.price);
out.writeUTF(this.produceArea);
out.writeUTF(this.uId);
out.writeUTF(this.dateStr);
out.writeUTF(this.clickArea);
out.writeUTF(this.flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.pId = in.readUTF();
this.pName = in.readUTF();
this.price = in.readFloat();
this.produceArea = in.readUTF();
this.uId = in.readUTF();
this.dateStr = in.readUTF();
this.clickArea = in.readUTF();
this.flag= in.readUTF();
}
}
map实现类FileMapJoinMapper.java
package com.wsk.bigdata.mapreduce.mapper;
import com.wsk.bigdata.pojo.Info;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
/**
* 文件间的Mapjoin
*/
public class FileMapJoinMapper extends Mapper<LongWritable, Text, Info, NullWritable> {
/**
* 产品信息信息集合,key=产品ID,value=产品信息
*/
private Map<String, Info> infos = new HashMap<>();
/**
* 执行Map方法前会调用一次setup方法,我们可以用于
* 初始化读取产品信息加到到内存中
*
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
System.out.println("--------MAP初始化:加载产品信息数据到内存------");
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(System.getProperty("product.info.dir"))));
String line;
while (StringUtils.isNotEmpty(line = br.readLine())) {
String[] fields = line.split(",");
if (fields != null && fields.length == 4) {
Info info = new Info(fields[0], fields[1], Float.parseFloat(fields[2]), fields[3], "", "", "", "1");
infos.put(fields[0], info);
}
}
br.close();
System.out.println("--------MAP初始化:共加载了" + infos.size() + "条产品信息数据------");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
if (fields != null && fields.length == 4) {
String pid = fields[1];
Info produceInfo = infos.get(pid);
if (produceInfo == null) {
return;
}
Info info = new Info(produceInfo.getpId(), produceInfo.getpName(), produceInfo.getPrice(), produceInfo.getProduceArea()
, fields[0], fields[2], fields[3], null);
context.write(info, NullWritable.get());
}
}
}
程序入口类MapJoinDriver.java
package com.wsk.bigdata.mapreduce.driver;
import com.wsk.bigdata.mapreduce.mapper.FileMapJoinMapper;
import com.wsk.bigdata.pojo.Info;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if(args.length != 3 ) {
System.err.println("please input 3 params: product_File page_click_file output_mapjoin directory");
System.exit(0);
}
String productInfo = args[0];
String input = args[1];
String output = args[2];
System.setProperty("hadoop.home.dir", "D:\\appanzhuang\\cdh\\hadoop-2.6.0-cdh5.7.0");
System.setProperty("product.info.dir",productInfo);
Configuration conf = new Configuration();
// 写代码:死去活来法
FileSystem fs = FileSystem.get(conf);
Path outputPath = new Path(output);
if(!fs.exists(new Path(productInfo))){
System.err.println("not found File "+productInfo);
System.exit(0);
}
if(fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(FileMapJoinMapper.class);
// 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Info.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
// map端join的逻辑不需要reduce阶段,设置reducetask数量为0
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
}
}
程序运行参数,分别是产品信息文件路径、页面点击日志数据路径、输出结果路径
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-78w2BgKy-1608566731456)(https://s2.ax1x.com/2019/04/28/EMZCv9.md.png)]
3.Reduce端实现Join
3.1思路
通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
优缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
3.2编程实现
map实现类FileReduceJoinMapper.java
package com.wsk.bigdata.mapreduce.mapper;
import com.wsk.bigdata.pojo.Info;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class FileReduceJoinMapper extends Mapper<LongWritable, Text, Text, Info> {
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String pid = "";
Info info = null;
// 通过文件名判断是哪种数据
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();
if (name.startsWith("product")) {
pid=fields[0];
info = new Info(pid,fields[1],Float.parseFloat(fields[2]),fields[3],"","","","1");
} else {
pid=fields[1];
info = new Info(pid,"",0,"",fields[0],fields[2],fields[3],"0");
}
if(info==null){
return;
}
k.set(pid);
System.out.println("map 输出"+info.toString());
context.write(k, info);
}
}
reducer实现类FileReduceJoinReducer.java
package com.wsk.bigdata.mapreduce.reduce;
import com.wsk.bigdata.pojo.Info;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class FileReduceJoinReducer extends Reducer<Text, Info, Info, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Info> values, Context context) throws IOException, InterruptedException {
Info pInfo = new Info();
List<Info> clickBeans = new ArrayList<Info>();
Iterator<Info> iterator = values.iterator();
while (iterator.hasNext()) {
Info bean = iterator.next();
System.out.println("reduce接收 "+bean);
if ("1".equals(bean.getFlag())) { //产品
try {
BeanUtils.copyProperties(pInfo, bean);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
} else {
Info clickBean = new Info();
try {
BeanUtils.copyProperties(clickBean, bean);
clickBeans.add(clickBean);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
}
// 拼接数据获取最终结果
for (Info bean : clickBeans) {
bean.setpName(pInfo.getpName());
bean.setPrice(pInfo.getPrice());
bean.setProduceArea(pInfo.getProduceArea());
System.out.println("reduce结果输出:"+bean.toString());
context.write(bean, NullWritable.get());
}
}
}
程序入口ReduceJoinDriver.java
package com.wsk.bigdata.mapreduce.driver;
import com.wsk.bigdata.mapreduce.mapper.FileReduceJoinMapper;
import com.wsk.bigdata.mapreduce.reduce.FileReduceJoinReducer;
import com.wsk.bigdata.pojo.Info;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ReduceJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.err.println("please input 2 params: inpt_data output_mapjoin directory");
System.exit(0);
}
String input = args[0];
String output = args[1];
System.setProperty("hadoop.home.dir", "D:\\appanzhuang\\cdh\\hadoop-2.6.0-cdh5.7.0");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path outputPath = new Path(output);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(ReduceJoinDriver.class);
job.setMapperClass(FileReduceJoinMapper.class);
job.setReducerClass(FileReduceJoinReducer.class);
// 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Info.class);
//定义Reducer输出数据的kv类型
job.setOutputKeyClass(Info.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
boolean res = job.waitForCompletion(true);
if (!res) {
System.err.println("error:作业执行失败");
}
}
}
程序运行的两个参数,第一是输入输入目录,第二个是输出数据目录
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Oo7xIw12-1608566731458)(https://s2.ax1x.com/2019/04/28/EMmjEt.md.png)]
备注踩坑:
- 代码引包时要注意引入的类是否正确
- domain类重写序列化方法,一定要包含所有的字段,不然会导致字段缺少值以及值串位