reduce相关算子基本用法

在Flink DataSet中提供了一系列围绕reduce操作的算子,有单独的reduce,用于对两两数据的合并,输入数据类型和输出数据类型可以不一样。还有reduceGroup和combineGroup,这两者均的入参都是一个迭代器,即在算子的一次调用中可以处理全量的数据,而非像reduce中只能一次处理两个数据,其次相比于reduce,这两个算子的输出数量也是无限制的,可以为一个,也可以是零个或多个。上述三个算子均可以跟在全量的DataSet后面,也可以跟在经过groupBy的分组DataSet后面。

reduce reduceGroup combineGroup
输入 数据类型相同的两条数据 迭代器 迭代器
输出 单条数据,数据类型可以和输入不同 任意数量的数据,数据类型可以与输入不同 任意数量的数据,数据类型可以与输入不同

GroupCombineFunction的用处

查看源码,可以发现GroupReduceFunction和GroupCombineFunction代码完全一致,那么为什么会这样呢?

  1. @Public
  2. @FunctionalInterface
  3. public interface GroupReduceFunction<T, O> extends Function, Serializable {
  4. /**
  5. * The reduce method. The function receives one call per group of elements.
  6. *
  7. * @param values All records that belong to the given input key.
  8. * @param out The collector to hand results to.
  9. *
  10. * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
  11. * to fail and may trigger recovery.
  12. */
  13. void reduce(Iterable<T> values, Collector<O> out) throws Exception;
  14. }
  1. @Public
  2. @FunctionalInterface
  3. public interface GroupCombineFunction<IN, OUT> extends Function, Serializable {
  4. /**
  5. * The combine method, called (potentially multiple timed) with subgroups of elements.
  6. *
  7. * @param values The elements to be combined.
  8. * @param out The collector to use to return values from the function.
  9. *
  10. * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
  11. * and may trigger the recovery logic.
  12. */
  13. void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;
  14. }

此处再想一想,为何reduceGroup有combineGroup相对应,那么reduce是否有combine相对应呢,答案是否定的?为什么呢,DataSet中也没有combine算子,GroupCombineFunction也不建议单独使用,GroupCombineFunction操作使用了贪心策略,用于内存中多次的局部同类型数据聚合,最后将聚合的结果送入GroupReduceFunction来做最后的结果计算,以此方式来提升性能和效率。具体使用时,需要同时实现GroupReduceFunction和GroupCombineFunction两个接口,且GroupCombineFunction的输出类型应和GroupReduceFunction的输入类型保持一致。

实践案例

这里给出一个将GroupReduceFunction和GroupCombineFunction组合使用的demo。本例中使用的数据源是一家餐厅的外卖订单数据,其属性包括订单id、订单日期、商品名称、该商品订购数量、菜品单价、该订单商品总订购量。我们的目标是统计每年销量最多的前n个商品。首先我们将所有的订单按年份分组,然后在各个年份内统计每个商品的累计销量,然后经过排序,得出销量最佳的前几个商品。首先给出订单类的定义。

  1. public class Order {
  2. public String orderId;
  3. public Timestamp orderDate;
  4. public String itemName;
  5. public int quantity;
  6. public double productPrice;
  7. public int totalProducts;
  8. public Order() {
  9. }
  10. public Order(String orderId, Timestamp orderDate, String itemName, int quantity, double productPrice, int totalProducts) {
  11. this.orderId = orderId;
  12. this.orderDate = orderDate;
  13. this.itemName = itemName;
  14. this.quantity = quantity;
  15. this.productPrice = productPrice;
  16. this.totalProducts = totalProducts;
  17. }
  18. @Override
  19. public String toString() {
  20. return "Order{" +
  21. "orderId='" + orderId + '\'' +
  22. ", orderDate=" + orderDate +
  23. ", itemName='" + itemName + '\'' +
  24. ", quantity=" + quantity +
  25. ", productPrice=" + productPrice +
  26. ", totalProducts=" + totalProducts +
  27. '}';
  28. }
  29. }

下面是代码主体部分。

  1. @Slf4j
  2. public class CombinableGroupReduce {
  3. public static void main(String[] args) throws Exception {
  4. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  5. // 读取resources文件夹下的本地csv
  6. URL fileUrl = CombinableGroupReduce.class.getClassLoader().getResource("orders.csv");
  7. DataSet<Order> source = env
  8. .readCsvFile(fileUrl.getPath())
  9. // 忽略第一行,即标题行
  10. .ignoreFirstLine()
  11. // 字典排序应该POJO类保持一致
  12. .pojoType(Order.class,
  13. "orderId",
  14. "orderDate",
  15. "itemName",
  16. "quantity",
  17. "productPrice",
  18. "totalProducts");
  19. DataSet<Tuple3<String, String, Integer>> result = source
  20. // 从Order中抽取年份,按年份分组
  21. .groupBy(new KeySelector<Order, String>() {
  22. @Override
  23. public String getKey(Order value) throws Exception {
  24. return new DateTime(value.orderDate).getYear() + "-" + value.itemName;
  25. }
  26. })
  27. // 可组合的GroupReduce需要调用同时实现GroupReduceFunction和GroupCombineFunction的实现类,在实际运行时,先调用GroupCombineFunction,后调用CombinableGroupReduce
  28. .reduceGroup(new MyReduce())
  29. // 再次按年份分组
  30. .groupBy(0)
  31. // 按商品累计销量从大到小排序
  32. .sortGroup(2, org.apache.flink.api.common.operators.Order.DESCENDING)
  33. // 取前5销量的商品
  34. .first(5);
  35. result.printOnTaskManager("out:");
  36. env.execute("CombinableGroupReduce Demo");
  37. }
  38. /**
  39. * 需要同时实现GroupReduceFunction和GroupCombineFunction
  40. */
  41. public static class MyReduce implements GroupReduceFunction<Order, Tuple3<String, String, Integer>>, GroupCombineFunction<Order, Order> {
  42. @Override
  43. public void combine(Iterable<Order> values, Collector<Order> out) throws Exception {
  44. Iterator<Order> iterator = values.iterator();
  45. Order first = iterator.next();
  46. int count = 0;
  47. while (iterator.hasNext()) {
  48. Order order = iterator.next();
  49. count += order.quantity;
  50. }
  51. // 累加各个订单类该商品的销量
  52. first.quantity += count;
  53. out.collect(first);
  54. }
  55. @Override
  56. public void reduce(Iterable<Order> values, Collector<Tuple3<String, String, Integer>> out) throws Exception {
  57. Iterator<Order> iterator = values.iterator();
  58. Order first = iterator.next();
  59. int count = 0;
  60. while (iterator.hasNext()) {
  61. Order order = iterator.next();
  62. count += order.quantity;
  63. }
  64. out.collect(Tuple3.of("" + new DateTime(first.orderDate).getYear(), first.itemName, first.quantity + count));
  65. }
  66. }
  67. }

下面是解析拓扑图。
image.png
可以看出Myreduce类同时实现了GroupReduceFunction和GroupCombineFunction两个接口,在实际运行时GroupCombineFunction先执行,GroupReduceFunction后执行,所以在实现上,GroupCombineFunction的输出类型应该与GroupReduceFunction的输入类型一致。同时,可以看到两者在执行图优化时没有chain到一起。后面的first算子内部实际上也是同时实现了GroupReduceFunction和GroupCombineFunction两个接口。注意,本例是在经过groupBy的组内数据上应用组合的此模式,在全量的DataSet做reduce时,也应该采用GroupReduceFunction搭配GroupCombineFunction的计算模式,否则,reduce操作将无法并行化。