join操作是为了实现数据库得联表操作

order.txt
customer.txt

Reduce端得join

map代码趋于简单 重点是后面的map这里不过多解释

:::info 思路: 就是获取用户表的key 然后value就是用户表的value 订单表 获取用户key value就是订单表的value
因为key相同 所以到了reduce 同一个key下面 有用户信息也有订单信息 对这个信息做一下顺序处理即可 :::

  1. public static class MapJoin extends Mapper<LongWritable, Text, Text, Text>{
  2. // 不同的文件不同的处理方式
  3. @Override
  4. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
  5. // 获取输入文件的路径
  6. FileSplit inputSplit =(FileSplit)context.getInputSplit();
  7. Text text = new Text();
  8. Text text2 = new Text();
  9. //获取文件名称
  10. String name = inputSplit.getPath().getName();
  11. String[] split = value.toString().split(" ");
  12. // 通过判断文件 来执行相应的文件相应的操作
  13. if (name.endsWith("customer.txt")){
  14. // 获取用户key 和用户名称
  15. String cKey = split[0];
  16. text.set(cKey);
  17. text2.set(value);
  18. // 传递用户key和数据到上下文对象
  19. context.write(text, text2);
  20. }else{
  21. // 获取用户key和order key
  22. String cKey = split[2];
  23. String oKey = split[1];
  24. // 获取用户key
  25. text.set(cKey);
  26. text2.set(oKey);
  27. // 把用户key和orderKey传入上下文对象
  28. context.write(text, text2);
  29. }
  30. }
  31. }
  1. public static class ReduceJoin extends Reducer<Text, Text, Text, Text>{
  2. @Override
  3. protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
  4. // 因为 传递的都是用户key 所以同一个用户key既有用户的信息 也有订单的信息
  5. // 为了保证格式 所以需要分先后
  6. String data = "";
  7. String customer = "";
  8. Text text = new Text();
  9. for (Text value : values) {
  10. // 通过数据判断是哪一个的数据做不同的操作
  11. if (value.toString().startsWith("order")) {
  12. data = data +"\t"+ value;
  13. }else {
  14. customer = customer + value;
  15. }
  16. }
  17. // 拼接用户的信息 和订单的信息
  18. text.set(customer + "\t" + data);
  19. context.write(key, text);
  20. }
  21. }

结果展示

1 1 customer1 18 男 order5 order3
2 2 customer2 19 女 order4 order2
3 3 customer3 15 女 order1

Map端得Join操作

:::info 思路 先把小表 比如用户表 唯一的 进行一个map整顿
1 先上传到hadoop集群上 然后做成一个(key, value)map key是用户key value是用户信息
2 对大表进行操作 获取用户key和其他订单信息 用户key用来获取map中的用户信息 则就把用户信息和订单信息联系起来了 :::

上代码

需要重写setUp方法

  1. public static class MapJoin extends Mapper<LongWritable, Text, Text, Text>{
  2. HashMap<String , String> map = new HashMap<>();
  3. @Override
  4. protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException {
  5. // 这个方法一般用来进行相关变量或则资源的集中处理
  6. // 获取分布式缓存文件列表
  7. URI[] cacheFiles = context.getCacheFiles();
  8. // 获取指定的分布式缓存文件的文件系统
  9. FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
  10. // 获取文件的输入流 这是字节流
  11. FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));
  12. // 将字节输入流转换为字符转换流 然后变成 bufferReader缓冲
  13. BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
  14. // 读取文件内容 并将数据存入map集合
  15. String line = null;
  16. while ((line = bufferedReader.readLine()) != null){
  17. String[] split = line.split(" ");
  18. map.put(split[0], line);
  19. }
  20. }
  21. @Override
  22. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
  23. // 从行文本数据中获取id
  24. String[] split = value.toString().split(" ");
  25. String cKey = split[2];
  26. String s = map.get(cKey);
  27. String s1 = s + "\t" + value;
  28. context.write(new Text(cKey), new Text(s1));
  29. }
  30. }
  1. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  2. Configuration configuration = new Configuration();
  3. Job job = Job.getInstance(configuration, "mapJoin");
  4. job.setJarByClass(JavaMapJoin.class);
  5. job.addCacheFile(URI.create("hdfs://192.168.23.24:9000/user/root/customer.txt"));
  6. FileInputFormat.addInputPath(job, new Path("/user/root/order.txt"));
  7. job.setMapperClass(JavaMapJoin.MapJoin.class);
  8. job.setMapOutputValueClass(Text.class);
  9. job.setMapOutputKeyClass(Text.class);
  10. FileOutputFormat.setOutputPath(job, new Path("/user/root/order/result5"));
  11. System.exit(job.waitForCompletion(true)?0:1);
  12. }

结果展示

1 1 customer1 18 男 5 order5 1
1 1 customer1 18 男 3 order3 1
2 2 customer2 19 女 4 order4 2
2 2 customer2 19 女 2 order2 2
3 3 customer3 15 女 1 order1 3

注意点和知识点

  1. MapJoin 在map端就把数据处理完毕 所以不需要Reduce
  2. 推荐使用Map Join 在Reduce Join没有降低数据从map传递到reduce的压力 最后的结果没有Mao join操作出来的结果更好
  3. setUp() 进行相关变量或者资源的集中初始化工作
  4. cleanup() 进行相关变量或资源的释放工作