前言:通过MR不同方式的Join编程是为了更加熟悉join的实现过程以及不同方式的优缺点,切记,生产中要杜绝写MR,本文只供学习参考

1.需求

有两张表,分表是产品信息数据以及用户页面点击日志数据如下:

  1. #产品信息数据:product_info.txt
  2. #c1=产品IDid),c2=产品名称(name),c3=价格(privce),c4=生产国家(country)
  3. p0001,华为,8000,中国
  4. p0002,小米,3000,中国
  5. p0003,苹果,1500,美国
  6. p0004,三星,10000,韩国
  7. #用户页面点击日志数据:page_click_log.txt
  8. #c1=用户IDid),c2=产品id(prod_id),c3=点击时间(click_time),c4=动作发生地区(area)
  9. u0001,p0001,20190301040123,华中
  10. u0002,p0002,20190302040124,华北
  11. u0003,p0003,20190303040124,华南
  12. u0004,p0004,20190304040124,华南

由于点击日志的数据量过去庞大,数据是存在HDFS上,故需要使用MR来实现如下的逻辑:

  1. select b.id,b.name,b.privce,b.country,a.id,a.click_time,a.area
  2. from page_click_log a join product_info b on a.prod_id=b.id

2.Map端实现Join

2.1思路分析

可以将小表数据分发到所有的map节点,然后可以与在本所读到的大表数据进行join并输出最终结果
优缺点:大大提高了jion的并发,速度快

2.2编程实现

数据封装类Info.java

  1. package com.wsk.bigdata.pojo;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. import java.util.ArrayList;
  7. public class Info implements Writable {
  8. /**
  9. * 产品唯一标识id
  10. */
  11. private String pId;
  12. /**
  13. * 产品名称
  14. */
  15. private String pName;
  16. /**
  17. * 产品价格
  18. */
  19. private float price;
  20. /**
  21. * 产品生产地区
  22. */
  23. private String produceArea;
  24. /**
  25. * 用户Id
  26. */
  27. private String uId;
  28. /**
  29. * 用户点击时间戳:yyyyMMddHHmmss
  30. */
  31. private String dateStr;
  32. /**
  33. * 用户点击发生地区
  34. */
  35. private String clickArea;
  36. /**
  37. * flag=0,表示封装用户点击日志数据
  38. * flag=1,表示封装产品信息
  39. */
  40. private String flag;
  41. public String getpId() {
  42. return pId;
  43. }
  44. public void setpId(String pId) {
  45. this.pId = pId;
  46. }
  47. public String getpName() {
  48. return pName;
  49. }
  50. public void setpName(String pName) {
  51. this.pName = pName;
  52. }
  53. public float getPrice() {
  54. return price;
  55. }
  56. public void setPrice(float price) {
  57. this.price = price;
  58. }
  59. public String getProduceArea() {
  60. return produceArea;
  61. }
  62. public void setProduceArea(String produceArea) {
  63. this.produceArea = produceArea;
  64. }
  65. public String getuId() {
  66. return uId;
  67. }
  68. public void setuId(String uId) {
  69. this.uId = uId;
  70. }
  71. public String getDateStr() {
  72. return dateStr;
  73. }
  74. public void setDateStr(String dateStr) {
  75. this.dateStr = dateStr;
  76. }
  77. public String getClickArea() {
  78. return clickArea;
  79. }
  80. public void setClickArea(String clickArea) {
  81. this.clickArea = clickArea;
  82. }
  83. public String getFlag() {
  84. return flag;
  85. }
  86. public void setFlag(String flag) {
  87. this.flag = flag;
  88. }
  89. public Info(String pId, String pName, float price, String produceArea, String uId, String dateStr, String clickArea, String flag) {
  90. this.pId = pId;
  91. this.pName = pName;
  92. this.price = price;
  93. this.produceArea = produceArea;
  94. this.uId = uId;
  95. this.dateStr = dateStr;
  96. this.clickArea = clickArea;
  97. this.flag = flag;
  98. }
  99. public Info() {
  100. }
  101. @Override
  102. public String toString() {
  103. String[] fileds = {this.pId,};
  104. return "pid=" + this.pId + ",pName=" + this.pName + ",price=" + this.price
  105. + ",produceArea=" + this.produceArea
  106. + ",uId=" + this.uId + ",clickDate=" + this.dateStr + ",clickArea=" + this.clickArea;
  107. }
  108. @Override
  109. public void write(DataOutput out) throws IOException {
  110. out.writeUTF(this.pId);
  111. out.writeUTF(this.pName);
  112. out.writeFloat(this.price);
  113. out.writeUTF(this.produceArea);
  114. out.writeUTF(this.uId);
  115. out.writeUTF(this.dateStr);
  116. out.writeUTF(this.clickArea);
  117. out.writeUTF(this.flag);
  118. }
  119. @Override
  120. public void readFields(DataInput in) throws IOException {
  121. this.pId = in.readUTF();
  122. this.pName = in.readUTF();
  123. this.price = in.readFloat();
  124. this.produceArea = in.readUTF();
  125. this.uId = in.readUTF();
  126. this.dateStr = in.readUTF();
  127. this.clickArea = in.readUTF();
  128. this.flag= in.readUTF();
  129. }
  130. }

map实现类FileMapJoinMapper.java

  1. package com.wsk.bigdata.mapreduce.mapper;
  2. import com.wsk.bigdata.pojo.Info;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.NullWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import java.io.BufferedReader;
  9. import java.io.FileInputStream;
  10. import java.io.IOException;
  11. import java.io.InputStreamReader;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. /**
  15. * 文件间的Mapjoin
  16. */
  17. public class FileMapJoinMapper extends Mapper<LongWritable, Text, Info, NullWritable> {
  18. /**
  19. * 产品信息信息集合,key=产品ID,value=产品信息
  20. */
  21. private Map<String, Info> infos = new HashMap<>();
  22. /**
  23. * 执行Map方法前会调用一次setup方法,我们可以用于
  24. * 初始化读取产品信息加到到内存中
  25. *
  26. */
  27. @Override
  28. protected void setup(Context context) throws IOException, InterruptedException {
  29. System.out.println("--------MAP初始化:加载产品信息数据到内存------");
  30. BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(System.getProperty("product.info.dir"))));
  31. String line;
  32. while (StringUtils.isNotEmpty(line = br.readLine())) {
  33. String[] fields = line.split(",");
  34. if (fields != null && fields.length == 4) {
  35. Info info = new Info(fields[0], fields[1], Float.parseFloat(fields[2]), fields[3], "", "", "", "1");
  36. infos.put(fields[0], info);
  37. }
  38. }
  39. br.close();
  40. System.out.println("--------MAP初始化:共加载了" + infos.size() + "条产品信息数据------");
  41. }
  42. @Override
  43. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  44. String line = value.toString();
  45. String[] fields = line.split(",");
  46. if (fields != null && fields.length == 4) {
  47. String pid = fields[1];
  48. Info produceInfo = infos.get(pid);
  49. if (produceInfo == null) {
  50. return;
  51. }
  52. Info info = new Info(produceInfo.getpId(), produceInfo.getpName(), produceInfo.getPrice(), produceInfo.getProduceArea()
  53. , fields[0], fields[2], fields[3], null);
  54. context.write(info, NullWritable.get());
  55. }
  56. }
  57. }

程序入口类MapJoinDriver.java

  1. package com.wsk.bigdata.mapreduce.driver;
  2. import com.wsk.bigdata.mapreduce.mapper.FileMapJoinMapper;
  3. import com.wsk.bigdata.pojo.Info;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.NullWritable;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import java.io.IOException;
  12. public class MapJoinDriver {
  13. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  14. if(args.length != 3 ) {
  15. System.err.println("please input 3 params: product_File page_click_file output_mapjoin directory");
  16. System.exit(0);
  17. }
  18. String productInfo = args[0];
  19. String input = args[1];
  20. String output = args[2];
  21. System.setProperty("hadoop.home.dir", "D:\\appanzhuang\\cdh\\hadoop-2.6.0-cdh5.7.0");
  22. System.setProperty("product.info.dir",productInfo);
  23. Configuration conf = new Configuration();
  24. // 写代码:死去活来法
  25. FileSystem fs = FileSystem.get(conf);
  26. Path outputPath = new Path(output);
  27. if(!fs.exists(new Path(productInfo))){
  28. System.err.println("not found File "+productInfo);
  29. System.exit(0);
  30. }
  31. if(fs.exists(outputPath)) {
  32. fs.delete(outputPath, true);
  33. }
  34. Job job = Job.getInstance(conf);
  35. job.setJarByClass(MapJoinDriver.class);
  36. job.setMapperClass(FileMapJoinMapper.class);
  37. // 指定mapper输出数据的kv类型
  38. job.setMapOutputKeyClass(Info.class);
  39. job.setMapOutputValueClass(NullWritable.class);
  40. FileInputFormat.setInputPaths(job, new Path(input));
  41. FileOutputFormat.setOutputPath(job, new Path(output));
  42. // map端join的逻辑不需要reduce阶段,设置reducetask数量为0
  43. job.setNumReduceTasks(0);
  44. boolean res = job.waitForCompletion(true);
  45. }
  46. }

程序运行参数,分别是产品信息文件路径、页面点击日志数据路径、输出结果路径
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-78w2BgKy-1608566731456)(https://s2.ax1x.com/2019/04/28/EMZCv9.md.png)]

3.Reduce端实现Join

3.1思路

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
优缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜

3.2编程实现

map实现类FileReduceJoinMapper.java

  1. package com.wsk.bigdata.mapreduce.mapper;
  2. import com.wsk.bigdata.pojo.Info;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  7. import java.io.IOException;
  8. public class FileReduceJoinMapper extends Mapper<LongWritable, Text, Text, Info> {
  9. Text k = new Text();
  10. @Override
  11. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  12. String line = value.toString();
  13. String[] fields = line.split(",");
  14. String pid = "";
  15. Info info = null;
  16. // 通过文件名判断是哪种数据
  17. FileSplit inputSplit = (FileSplit) context.getInputSplit();
  18. String name = inputSplit.getPath().getName();
  19. if (name.startsWith("product")) {
  20. pid=fields[0];
  21. info = new Info(pid,fields[1],Float.parseFloat(fields[2]),fields[3],"","","","1");
  22. } else {
  23. pid=fields[1];
  24. info = new Info(pid,"",0,"",fields[0],fields[2],fields[3],"0");
  25. }
  26. if(info==null){
  27. return;
  28. }
  29. k.set(pid);
  30. System.out.println("map 输出"+info.toString());
  31. context.write(k, info);
  32. }
  33. }

reducer实现类FileReduceJoinReducer.java

  1. package com.wsk.bigdata.mapreduce.reduce;
  2. import com.wsk.bigdata.pojo.Info;
  3. import org.apache.commons.beanutils.BeanUtils;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Reducer;
  7. import java.io.IOException;
  8. import java.lang.reflect.InvocationTargetException;
  9. import java.util.ArrayList;
  10. import java.util.Iterator;
  11. import java.util.List;
  12. public class FileReduceJoinReducer extends Reducer<Text, Info, Info, NullWritable> {
  13. @Override
  14. protected void reduce(Text key, Iterable<Info> values, Context context) throws IOException, InterruptedException {
  15. Info pInfo = new Info();
  16. List<Info> clickBeans = new ArrayList<Info>();
  17. Iterator<Info> iterator = values.iterator();
  18. while (iterator.hasNext()) {
  19. Info bean = iterator.next();
  20. System.out.println("reduce接收 "+bean);
  21. if ("1".equals(bean.getFlag())) { //产品
  22. try {
  23. BeanUtils.copyProperties(pInfo, bean);
  24. } catch (IllegalAccessException | InvocationTargetException e) {
  25. e.printStackTrace();
  26. }
  27. } else {
  28. Info clickBean = new Info();
  29. try {
  30. BeanUtils.copyProperties(clickBean, bean);
  31. clickBeans.add(clickBean);
  32. } catch (IllegalAccessException | InvocationTargetException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. }
  37. // 拼接数据获取最终结果
  38. for (Info bean : clickBeans) {
  39. bean.setpName(pInfo.getpName());
  40. bean.setPrice(pInfo.getPrice());
  41. bean.setProduceArea(pInfo.getProduceArea());
  42. System.out.println("reduce结果输出:"+bean.toString());
  43. context.write(bean, NullWritable.get());
  44. }
  45. }
  46. }

程序入口ReduceJoinDriver.java

  1. package com.wsk.bigdata.mapreduce.driver;
  2. import com.wsk.bigdata.mapreduce.mapper.FileReduceJoinMapper;
  3. import com.wsk.bigdata.mapreduce.reduce.FileReduceJoinReducer;
  4. import com.wsk.bigdata.pojo.Info;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.NullWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import java.io.IOException;
  14. public class ReduceJoinDriver {
  15. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  16. if (args.length != 2) {
  17. System.err.println("please input 2 params: inpt_data output_mapjoin directory");
  18. System.exit(0);
  19. }
  20. String input = args[0];
  21. String output = args[1];
  22. System.setProperty("hadoop.home.dir", "D:\\appanzhuang\\cdh\\hadoop-2.6.0-cdh5.7.0");
  23. Configuration conf = new Configuration();
  24. FileSystem fs = FileSystem.get(conf);
  25. Path outputPath = new Path(output);
  26. if (fs.exists(outputPath)) {
  27. fs.delete(outputPath, true);
  28. }
  29. Job job = Job.getInstance(conf);
  30. job.setJarByClass(ReduceJoinDriver.class);
  31. job.setMapperClass(FileReduceJoinMapper.class);
  32. job.setReducerClass(FileReduceJoinReducer.class);
  33. // 指定mapper输出数据的kv类型
  34. job.setMapOutputKeyClass(Text.class);
  35. job.setMapOutputValueClass(Info.class);
  36. //定义Reducer输出数据的kv类型
  37. job.setOutputKeyClass(Info.class);
  38. job.setOutputValueClass(NullWritable.class);
  39. FileInputFormat.setInputPaths(job, new Path(input));
  40. FileOutputFormat.setOutputPath(job, new Path(output));
  41. boolean res = job.waitForCompletion(true);
  42. if (!res) {
  43. System.err.println("error:作业执行失败");
  44. }
  45. }
  46. }

程序运行的两个参数,第一是输入输入目录,第二个是输出数据目录
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Oo7xIw12-1608566731458)(https://s2.ax1x.com/2019/04/28/EMmjEt.md.png)]

备注踩坑:

  • 代码引包时要注意引入的类是否正确
  • domain类重写序列化方法,一定要包含所有的字段,不然会导致字段缺少值以及值串位