join操作是为了实现数据库得联表操作
Reduce端得join
map代码趋于简单 重点是后面的map这里不过多解释
:::info
思路: 就是获取用户表的key 然后value就是用户表的value 订单表 获取用户key value就是订单表的value
因为key相同 所以到了reduce 同一个key下面 有用户信息也有订单信息 对这个信息做一下顺序处理即可
:::
public static class MapJoin extends Mapper<LongWritable, Text, Text, Text>{// 不同的文件不同的处理方式@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {// 获取输入文件的路径FileSplit inputSplit =(FileSplit)context.getInputSplit();Text text = new Text();Text text2 = new Text();//获取文件名称String name = inputSplit.getPath().getName();String[] split = value.toString().split(" ");// 通过判断文件 来执行相应的文件相应的操作if (name.endsWith("customer.txt")){// 获取用户key 和用户名称String cKey = split[0];text.set(cKey);text2.set(value);// 传递用户key和数据到上下文对象context.write(text, text2);}else{// 获取用户key和order keyString cKey = split[2];String oKey = split[1];// 获取用户keytext.set(cKey);text2.set(oKey);// 把用户key和orderKey传入上下文对象context.write(text, text2);}}}
public static class ReduceJoin extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {// 因为 传递的都是用户key 所以同一个用户key既有用户的信息 也有订单的信息// 为了保证格式 所以需要分先后String data = "";String customer = "";Text text = new Text();for (Text value : values) {// 通过数据判断是哪一个的数据做不同的操作if (value.toString().startsWith("order")) {data = data +"\t"+ value;}else {customer = customer + value;}}// 拼接用户的信息 和订单的信息text.set(customer + "\t" + data);context.write(key, text);}}
结果展示
1 1 customer1 18 男 order5 order3
2 2 customer2 19 女 order4 order2
3 3 customer3 15 女 order1
Map端得Join操作
:::info
思路 先把小表 比如用户表 唯一的 进行一个map整顿
1 先上传到hadoop集群上 然后做成一个(key, value)map key是用户key value是用户信息
2 对大表进行操作 获取用户key和其他订单信息 用户key用来获取map中的用户信息 则就把用户信息和订单信息联系起来了
:::
上代码
需要重写setUp方法
public static class MapJoin extends Mapper<LongWritable, Text, Text, Text>{HashMap<String , String> map = new HashMap<>();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException {// 这个方法一般用来进行相关变量或则资源的集中处理// 获取分布式缓存文件列表URI[] cacheFiles = context.getCacheFiles();// 获取指定的分布式缓存文件的文件系统FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());// 获取文件的输入流 这是字节流FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));// 将字节输入流转换为字符转换流 然后变成 bufferReader缓冲BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));// 读取文件内容 并将数据存入map集合String line = null;while ((line = bufferedReader.readLine()) != null){String[] split = line.split(" ");map.put(split[0], line);}}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {// 从行文本数据中获取idString[] split = value.toString().split(" ");String cKey = split[2];String s = map.get(cKey);String s1 = s + "\t" + value;context.write(new Text(cKey), new Text(s1));}}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration, "mapJoin");job.setJarByClass(JavaMapJoin.class);job.addCacheFile(URI.create("hdfs://192.168.23.24:9000/user/root/customer.txt"));FileInputFormat.addInputPath(job, new Path("/user/root/order.txt"));job.setMapperClass(JavaMapJoin.MapJoin.class);job.setMapOutputValueClass(Text.class);job.setMapOutputKeyClass(Text.class);FileOutputFormat.setOutputPath(job, new Path("/user/root/order/result5"));System.exit(job.waitForCompletion(true)?0:1);}
结果展示
1 1 customer1 18 男 5 order5 1
1 1 customer1 18 男 3 order3 1
2 2 customer2 19 女 4 order4 2
2 2 customer2 19 女 2 order2 2
3 3 customer3 15 女 1 order1 3
注意点和知识点
- MapJoin 在map端就把数据处理完毕 所以不需要Reduce
- 推荐使用Map Join 在Reduce Join没有降低数据从map传递到reduce的压力 最后的结果没有Mao join操作出来的结果更好
- setUp() 进行相关变量或者资源的集中初始化工作
- cleanup() 进行相关变量或资源的释放工作
