译者:flink.sojb.cn

迭代算法出现在许多数据分析领域,例如机器学习图形分析。这些算法对于实现大数据从数据中提取有意义信息的承诺至关重要。随着越来越有兴趣在非常大的数据集上运行这些算法,需要以大规模并行方式执行迭代。

Flink程序通过定义步进函数并将其嵌入到特殊的迭代 算子中来实现迭代算法。此 算子有两种变体:IterateDelta Iterate。两个 算子在当前迭代状态下重复调用步进函数,直到达到某个终止条件。

在这里,我们提供两种算子变体的背景并概述它们的用法。该节目指南介绍了如何实现算子在这两个Scala和Java。我们还通过Flink的图形处理API Gelly支持以顶点为中心和聚集求和的迭代

下表提供了两个 算子的概述:

迭代 Delta迭代
迭代输入 部分解决方案 工作集解决方案集
步函数 任意数据流
状态更新 一部分解决方案 下一个工作集
对解决方案集的更改
迭代结果 最后部分解决方案 上次迭代后的解决方案设置状态
终止 最大迭代次数(默认) 最大迭代次数或空工作集(默认)
自定义聚合器收敛 自定义聚合器收敛

迭代 算子

迭代 算子覆盖所述迭代简单形式:在每次迭代中,阶梯函数消耗整个输入(在先前的迭代的结果,或在初始数据集),并且计算该部分解决方案的下一个版本(例如mapreducejoin,等等。)。

迭代 - 图1

  1. 迭代输入:来自数据源先前 算子第一次迭代的初始输入。
  2. 步骤函数:步进函数将在每次迭代中执行。它是由像算子的任意数据流mapreducejoin等,取决于手头的特定任务。
  3. Next Partial Solution:在每次迭代中,step函数的输出将反馈到下一次迭代
  4. 迭代结果最后一次迭代的输出被写入数据接收器或用作以下 算子的输入。

有多个选项可指定迭代的终止条件

  • 最大迭代次数:没有任何其他条件,迭代将执行多次。
  • 自定义聚合器收敛:迭代允许指定自定义聚合器收敛标准,如sum聚合发出的记录数(聚合器),如果此数字为零则终止(收敛标准)。

您还可以考虑伪代码中的迭代 算子:

  1. IterationState state = getInitialState();
  2. while (!terminationCriterion()) {
  3. state = step(state);
  4. }
  5. setFinalState(state);

有关详细信息和代码示例,请参阅编程指南

示例:递增数字

在以下示例中,我们迭代地递增一组数字

迭代 - 图2

  1. 迭代输入:初始输入从数据源读取和由五个单字段记录(整数15)。
  2. 步进函数:步进函数是单个map 算子,它将整数字段从i增加到i+1。它将应用于输入的每个记录。
  3. Next Partial Solution:step函数的输出将是map 算子的输出,即具有递增整数的记录。
  4. 迭代结果:经过十次迭代,最初的数字将被增加十倍,造成整数1115
  1. // 1st 2nd 10th
  2. map(1) -> 2 map(2) -> 3 ... map(10) -> 11
  3. map(2) -> 3 map(3) -> 4 ... map(11) -> 12
  4. map(3) -> 4 map(4) -> 5 ... map(12) -> 13
  5. map(4) -> 5 map(5) -> 6 ... map(13) -> 14
  6. map(5) -> 6 map(6) -> 7 ... map(14) -> 15

需要注意的是12,和4可以是任意的数据流。

Delta迭代算子

增量迭代算子覆盖的情况下,增量迭代。增量迭代有选择地修改解决方案的数据元并演化解决方案,而不是完全重新计算它。

在适用的情况下,这会导致更高效的算法,因为解决方案集中的每个数据元都不会在每次迭代中发生变化。这样可以专注于解决方案的热部件,并保持冷部件不受影响。通常,大多数解决方案相对较快地冷却,后来的迭代仅在一小部分数据上运行。

迭代 - 图3

  1. 迭代输入:从数据源先前的 算子读取初始工作集和解决方案集作为第一次迭代的输入。
  2. 步骤函数:步进函数将在每次迭代中执行。它是由像算子的任意数据流mapreducejoin等,取决于手头的特定任务。
  3. 一个工作集/更新解决方案集下一个工作集驱动迭代计算,并将反馈到下一个迭代。此外,解决方案集将被更新并隐式转发(不需要重建)。两个数据集都可以由步进函数的不同 算子更新。
  4. 迭代结果:在最后一次迭代之后解决方案集被写入数据接收器或用作以下 算子的输入。

delta迭代的默认终止条件空工作集收敛标准最大迭代次数指定。当生成的下一个工作集为空或达到最大迭代次数时,迭代将终止。还可以指定自定义聚合器收敛标准

您还可以考虑伪代码中的迭代 算子:

  1. IterationState workset = getInitialState();
  2. IterationState solution = getInitialSolution();
  3. while (!terminationCriterion()) {
  4. (delta, workset) = step(workset, solution);
  5. solution.update(delta)
  6. }
  7. setFinalState(solution);

有关详细信息和代码示例,请参阅编程指南

示例:在图表中传播最小值

在以下示例中,每个顶点都有一个ID和一个着色。每个顶点将其顶点ID传播到相邻顶点。该目标最小ID分配给子图的每个顶点。如果接收的ID小于当前的ID,则它将变为具有接收到的ID的顶点的颜色。其中一个应用可以在社区分析连通组件计算中找到。

迭代 - 图4

初始输入被设置为两个工作集和溶液组。在上图中,颜色可视化解决方案集演变。每次迭代时,最小ID的颜色在相应的子图中展开。同时,每次迭代,工作量(交换和比较顶点ID)都会Reduce。这对应于工作集的大小减小,其在三次迭代之后从所有七个顶点变为零,此时迭代终止。在重要的观察是,较低的子收敛上半之前不和增量迭代能够与工作集抽象捕捉到这一点。

在上部子图ID 1橙色)中是最小ID。在第一次迭代中,它将传播到顶点2,随后将其颜色更改为橙色。顶点3和4将接收ID 2黄色)作为其当前最小ID并更改为黄色。因为顶点1的颜色在第一次迭代中没有改变,所以可以在下一个工作集中跳过它。

在较低的子图中,ID 5青色)是最小ID。下子图的所有顶点将在第一次迭代中接收它。同样,我们可以跳过下一个工作集的未更改顶点(顶点5)。

第二次迭代中,工作集大小已经从七个数据元Reduce到五个数据元(顶点2,3,4,6和7)。这些是迭代的一部分,并进一步传播其当前的最小ID。在此迭代之后,下部子图已经收敛(图的冷部分),因为它在工作集中没有数据元,而上半部分需要对剩余的两个工作集数据元(顶点)进行进一步迭代(图的热部分) 3和4)。

第三次迭代后工作集为空时,迭代终止

超级同步

我们将迭代 算子的阶梯函数的每次执行称为单次迭代。在并行设置中,在迭代状态的不同分区上并行评估步骤函数的多个实例。在许多设置中,对所有并行实例的步骤函数的一个评估形成所谓的超级步骤,其也是同步的粒度。因此,迭代的所有并行任务都需要在初始化下一个超级步骤之前完成超级步骤。终止标准也将在超级障碍评估。

迭代 - 图5