reduce相关算子基本用法
在Flink DataSet中提供了一系列围绕reduce操作的算子,有单独的reduce,用于对两两数据的合并,输入数据类型和输出数据类型可以不一样。还有reduceGroup和combineGroup,这两者均的入参都是一个迭代器,即在算子的一次调用中可以处理全量的数据,而非像reduce中只能一次处理两个数据,其次相比于reduce,这两个算子的输出数量也是无限制的,可以为一个,也可以是零个或多个。上述三个算子均可以跟在全量的DataSet后面,也可以跟在经过groupBy的分组DataSet后面。
| reduce | reduceGroup | combineGroup | |
|---|---|---|---|
| 输入 | 数据类型相同的两条数据 | 迭代器 | 迭代器 |
| 输出 | 单条数据,数据类型可以和输入不同 | 任意数量的数据,数据类型可以与输入不同 | 任意数量的数据,数据类型可以与输入不同 |
GroupCombineFunction的用处
查看源码,可以发现GroupReduceFunction和GroupCombineFunction代码完全一致,那么为什么会这样呢?
@Public@FunctionalInterfacepublic interface GroupReduceFunction<T, O> extends Function, Serializable {/*** The reduce method. The function receives one call per group of elements.** @param values All records that belong to the given input key.* @param out The collector to hand results to.** @throws Exception This method may throw exceptions. Throwing an exception will cause the operation* to fail and may trigger recovery.*/void reduce(Iterable<T> values, Collector<O> out) throws Exception;}
@Public@FunctionalInterfacepublic interface GroupCombineFunction<IN, OUT> extends Function, Serializable {/*** The combine method, called (potentially multiple timed) with subgroups of elements.** @param values The elements to be combined.* @param out The collector to use to return values from the function.** @throws Exception The function may throw Exceptions, which will cause the program to cancel,* and may trigger the recovery logic.*/void combine(Iterable<IN> values, Collector<OUT> out) throws Exception;}
此处再想一想,为何reduceGroup有combineGroup相对应,那么reduce是否有combine相对应呢,答案是否定的?为什么呢,DataSet中也没有combine算子,GroupCombineFunction也不建议单独使用,GroupCombineFunction操作使用了贪心策略,用于内存中多次的局部同类型数据聚合,最后将聚合的结果送入GroupReduceFunction来做最后的结果计算,以此方式来提升性能和效率。具体使用时,需要同时实现GroupReduceFunction和GroupCombineFunction两个接口,且GroupCombineFunction的输出类型应和GroupReduceFunction的输入类型保持一致。
实践案例
这里给出一个将GroupReduceFunction和GroupCombineFunction组合使用的demo。本例中使用的数据源是一家餐厅的外卖订单数据,其属性包括订单id、订单日期、商品名称、该商品订购数量、菜品单价、该订单商品总订购量。我们的目标是统计每年销量最多的前n个商品。首先我们将所有的订单按年份分组,然后在各个年份内统计每个商品的累计销量,然后经过排序,得出销量最佳的前几个商品。首先给出订单类的定义。
public class Order {public String orderId;public Timestamp orderDate;public String itemName;public int quantity;public double productPrice;public int totalProducts;public Order() {}public Order(String orderId, Timestamp orderDate, String itemName, int quantity, double productPrice, int totalProducts) {this.orderId = orderId;this.orderDate = orderDate;this.itemName = itemName;this.quantity = quantity;this.productPrice = productPrice;this.totalProducts = totalProducts;}@Overridepublic String toString() {return "Order{" +"orderId='" + orderId + '\'' +", orderDate=" + orderDate +", itemName='" + itemName + '\'' +", quantity=" + quantity +", productPrice=" + productPrice +", totalProducts=" + totalProducts +'}';}}
下面是代码主体部分。
@Slf4jpublic class CombinableGroupReduce {public static void main(String[] args) throws Exception {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取resources文件夹下的本地csvURL fileUrl = CombinableGroupReduce.class.getClassLoader().getResource("orders.csv");DataSet<Order> source = env.readCsvFile(fileUrl.getPath())// 忽略第一行,即标题行.ignoreFirstLine()// 字典排序应该POJO类保持一致.pojoType(Order.class,"orderId","orderDate","itemName","quantity","productPrice","totalProducts");DataSet<Tuple3<String, String, Integer>> result = source// 从Order中抽取年份,按年份分组.groupBy(new KeySelector<Order, String>() {@Overridepublic String getKey(Order value) throws Exception {return new DateTime(value.orderDate).getYear() + "-" + value.itemName;}})// 可组合的GroupReduce需要调用同时实现GroupReduceFunction和GroupCombineFunction的实现类,在实际运行时,先调用GroupCombineFunction,后调用CombinableGroupReduce.reduceGroup(new MyReduce())// 再次按年份分组.groupBy(0)// 按商品累计销量从大到小排序.sortGroup(2, org.apache.flink.api.common.operators.Order.DESCENDING)// 取前5销量的商品.first(5);result.printOnTaskManager("out:");env.execute("CombinableGroupReduce Demo");}/*** 需要同时实现GroupReduceFunction和GroupCombineFunction*/public static class MyReduce implements GroupReduceFunction<Order, Tuple3<String, String, Integer>>, GroupCombineFunction<Order, Order> {@Overridepublic void combine(Iterable<Order> values, Collector<Order> out) throws Exception {Iterator<Order> iterator = values.iterator();Order first = iterator.next();int count = 0;while (iterator.hasNext()) {Order order = iterator.next();count += order.quantity;}// 累加各个订单类该商品的销量first.quantity += count;out.collect(first);}@Overridepublic void reduce(Iterable<Order> values, Collector<Tuple3<String, String, Integer>> out) throws Exception {Iterator<Order> iterator = values.iterator();Order first = iterator.next();int count = 0;while (iterator.hasNext()) {Order order = iterator.next();count += order.quantity;}out.collect(Tuple3.of("" + new DateTime(first.orderDate).getYear(), first.itemName, first.quantity + count));}}}
下面是解析拓扑图。
可以看出Myreduce类同时实现了GroupReduceFunction和GroupCombineFunction两个接口,在实际运行时GroupCombineFunction先执行,GroupReduceFunction后执行,所以在实现上,GroupCombineFunction的输出类型应该与GroupReduceFunction的输入类型一致。同时,可以看到两者在执行图优化时没有chain到一起。后面的first算子内部实际上也是同时实现了GroupReduceFunction和GroupCombineFunction两个接口。注意,本例是在经过groupBy的组内数据上应用组合的此模式,在全量的DataSet做reduce时,也应该采用GroupReduceFunction搭配GroupCombineFunction的计算模式,否则,reduce操作将无法并行化。
