迭代图像处理

Gelly利用了Flink的高效迭代操作以支持大型迭代图计算。目前我们提供了Vertex Centric,Scatter Gather和Gather Sum Apply模型的实现。接下来的章节中,我们描述了这些抽象并展示了如何在Gelly中使用它们。

Vertex Centric迭代计算

Vertex Centric模型,也称为”像顶点一样思考”或”Pregel”,通过图顶点的角度表达计算。 该计算在迭代的每一步(称为超步)中同步地处理,在每个超步时,每个顶点执行一个UDF(User Defined Function)。 顶点之间通过消息进行通讯,任何一个顶点可以给图中任何其他的顶点发送消息,只要知道它的ID即可。

下面的图中展示该计算模型,虚线框和并行单元对应。在每个超步中,所有的活跃顶点并行执行相同的用户定义的计算。因为超步时同步执行的,因此每次超步中发送的消息都被保证发送了到下次超步的开始。

Vertex-Centric Computational Model

在Gelly中使用Vertex Centric迭代,用户只需要定义顶点的计算函数ComputeFunction即可。

该函数和最大迭代次数通过Gelly的runVertexCentricIteration函数参数指定。该方法在输入图上执行Vertex Centric迭代计算,并输出更新后顶点值的新图。可选的MessageCombiner函数可以被用于减少通信消耗。

让我们考虑基于Vertex Centric的单源点最短路径算法(SSSP)。算法开始时,除了源顶点初始值为0,每个顶点初始值为正无穷。第一次超步计算时,源顶点将距离传播给它的邻居。在接下来的超步中,每个顶点检查接收的消息并选择其中最小的距离值。如果该距离值比顶点当前值小,则更新顶点的值,并产生消息发送给其邻居。如果顶点在超步中没有更新它的值,则在下次超步时不会发送任何消息给它的邻居。当没有顶点的值发生更新或者达到了最大的超步迭代次数,算法将会收敛。在这个算法中,Message Combiner可以被用来减少发送给目标顶点的消息个数。

  1. // read the input graph
  2. Graph<Long, Double, Double> graph = ...
  3. // define the maximum number of iterations
  4. int maxIterations = 10;
  5. // Execute the vertex-centric iteration
  6. Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
  7. new SSSPComputeFunction(), new SSSPCombiner(), maxIterations);
  8. // Extract the vertices as the result
  9. DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
  10. // - - - UDFs - - - //
  11. public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
  12. public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) {
  13. double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY;
  14. for (Double msg : messages) {
  15. minDistance = Math.min(minDistance, msg);
  16. }
  17. if (minDistance < vertex.getValue()) {
  18. setNewVertexValue(minDistance);
  19. for (Edge<Long, Double> e: getEdges()) {
  20. sendMessageTo(e.getTarget(), minDistance + e.getValue());
  21. }
  22. }
  23. }
  24. // message combiner
  25. public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
  26. public void combineMessages(MessageIterator<Double> messages) {
  27. double minMessage = Double.POSITIVE_INFINITY;
  28. for (Double msg: messages) {
  29. minMessage = Math.min(minMessage, msg);
  30. }
  31. sendCombinedMessage(minMessage);
  32. }
  33. }
  1. // read the input graph
  2. val graph: Graph[Long, Double, Double] = ...
  3. // define the maximum number of iterations
  4. val maxIterations = 10
  5. // Execute the vertex-centric iteration
  6. val result = graph.runVertexCentricIteration(new SSSPComputeFunction, new SSSPCombiner, maxIterations)
  7. // Extract the vertices as the result
  8. val singleSourceShortestPaths = result.getVertices
  9. // - - - UDFs - - - //
  10. final class SSSPComputeFunction extends ComputeFunction[Long, Double, Double, Double] {
  11. override def compute(vertex: Vertex[Long, Double], messages: MessageIterator[Double]) = {
  12. var minDistance = if (vertex.getId.equals(srcId)) 0 else Double.MaxValue
  13. while (messages.hasNext) {
  14. val msg = messages.next
  15. if (msg < minDistance) {
  16. minDistance = msg
  17. }
  18. }
  19. if (vertex.getValue > minDistance) {
  20. setNewVertexValue(minDistance)
  21. for (edge: Edge[Long, Double] <- getEdges) {
  22. sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
  23. }
  24. }
  25. }
  26. // message combiner
  27. final class SSSPCombiner extends MessageCombiner[Long, Double] {
  28. override def combineMessages(messages: MessageIterator[Double]) {
  29. var minDistance = Double.MaxValue
  30. while (messages.hasNext) {
  31. val msg = inMessages.next
  32. if (msg < minDistance) {
  33. minDistance = msg
  34. }
  35. }
  36. sendCombinedMessage(minMessage)
  37. }
  38. }

Vertex Centric迭代计算配置

Vertex Centric迭代计算可以使用VertexCentricConfiguration对象进行配置。

目前有如下参数可以指定:

  • 名称: Vertex Centric迭代计算的名称,该名称在日志和消息中显示,并可以使用setName()进行指定。

  • 并行度: 迭代计算的并行度,可以使用setParallelism()进行指定。

  • 堆内Solution Set: 定义了Solution Set是否保存在堆内内存(Flink内部对象的序列化方式)中,还是保存在简单的对象表中。默认情况下,Solution Set保存在堆内内存中,该属性可以通过setSolutionSetUnmanagedMemory()方法进行设置。

  • 聚合器: 迭代聚合器可以使用registerAggregator()方法进行注册,迭代聚合器可以将每次超步的聚合结果合并起来,并使得在下次超步中可以访问它们。注册后的聚合器可以在用户定义的ComputeFunction的内部进行访问。

  • 广播变量: 可以使用addBroadcastSet()方法将数据集作为广播变量添加到ComputeFunction

  1. Graph<Long, Double, Double> graph = ...
  2. // configure the iteration
  3. VertexCentricConfiguration parameters = new VertexCentricConfiguration();
  4. // set the iteration name
  5. parameters.setName("Gelly Iteration");
  6. // set the parallelism
  7. parameters.setParallelism(16);
  8. // register an aggregator
  9. parameters.registerAggregator("sumAggregator", new LongSumAggregator());
  10. // run the vertex-centric iteration, also passing the configuration parameters
  11. Graph<Long, Long, Double> result =
  12. graph.runVertexCentricIteration(
  13. new Compute(), null, maxIterations, parameters);
  14. // user-defined function
  15. public static final class Compute extends ComputeFunction {
  16. LongSumAggregator aggregator = new LongSumAggregator();
  17. public void preSuperstep() {
  18. // retrieve the Aggregator
  19. aggregator = getIterationAggregator("sumAggregator");
  20. }
  21. public void compute(Vertex<Long, Long> vertex, MessageIterator inMessages) {
  22. //do some computation
  23. Long partialValue = ...
  24. // aggregate the partial value
  25. aggregator.aggregate(partialValue);
  26. // update the vertex value
  27. setNewVertexValue(...);
  28. }
  29. }
  1. val graph: Graph[Long, Long, Double] = ...
  2. val parameters = new VertexCentricConfiguration
  3. // set the iteration name
  4. parameters.setName("Gelly Iteration")
  5. // set the parallelism
  6. parameters.setParallelism(16)
  7. // register an aggregator
  8. parameters.registerAggregator("sumAggregator", new LongSumAggregator)
  9. // run the vertex-centric iteration, also passing the configuration parameters
  10. val result = graph.runVertexCentricIteration(new Compute, new Combiner, maxIterations, parameters)
  11. // user-defined function
  12. final class Compute extends ComputeFunction {
  13. var aggregator = new LongSumAggregator
  14. override def preSuperstep {
  15. // retrieve the Aggregator
  16. aggregator = getIterationAggregator("sumAggregator")
  17. }
  18. override def compute(vertex: Vertex[Long, Long], inMessages: MessageIterator[Long]) {
  19. //do some computation
  20. val partialValue = ...
  21. // aggregate the partial value
  22. aggregator.aggregate(partialValue)
  23. // update the vertex value
  24. setNewVertexValue(...)
  25. }
  26. }

Scatter Gather迭代计算

Scatter Gather模型,也称为”signal/collect”模型,通过图顶点的角度表达计算。该计算在迭代的每一步(称为超步)中同步地处理,在每个超步时,每个顶点为其他顶点产生消息,并基于它接受到的消息更新自己的值。在Gelly使用Scatter Gather迭代计算,用户只需要定义每次超步中每个顶点的行为即可:

  • Scatter: 产生一个顶点发送给其他顶点的消息。
  • Gather: 使用接收到的消息更新顶点的值。

Gelly为Scatter Gather迭代计算提供了方法,用户只需要实现两个函数,与scatter和gather阶段相对应。第一个函数是ScatterFunction,它允许顶点向其他顶点发送消息,消息将会在它们被发送所在的超步时被接收。第二个函数是GatherFunction,它定义了顶点如何基于接收到的消息更新自己的值。

这些函数和最大迭代次数通过Gelly的runScatterGatherIteration函数参数指定。该方法在输入图上执行Vertex Centric迭代计算,并输出更新后顶点值的新图。

一次Scatter Gather迭代可以使用类如总顶点数、入度和出度等信息进行扩展。

另外,Scatter Gather迭代运行使用的邻居类型(in/out/all)也可以被指定。默认情况下,来自输入邻居的更新将会修改当前顶点的状态,而消息将发送给输出的邻居。

让我们考虑使用Scatter Gather迭代模型计算单源点最短路径的算法(SSSP),下面的图中顶点1是原点。每次超步中,每个顶点发送候选的距离消息到它的邻居,消息的值是当前顶点值与连接到该顶点的边的权值之和。当收到候选的距离消息时,每个顶点计算最小的距离,如果更短的路径被计算出来,顶点的值就会被更新。如果顶点在超步中没有更改它的值,那么它就不会产生下次超步中发送给邻居的消息。该算法在没有顶点值更新时收敛。

Scatter-gather SSSP superstep 1

  1. // read the input graph
  2. Graph<Long, Double, Double> graph = ...
  3. // define the maximum number of iterations
  4. int maxIterations = 10;
  5. // Execute the scatter-gather iteration
  6. Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
  7. new MinDistanceMessenger(), new VertexDistanceUpdater(), maxIterations);
  8. // Extract the vertices as the result
  9. DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
  10. // - - - UDFs - - - //
  11. // scatter: messaging
  12. public static final class MinDistanceMessenger extends ScatterFunction<Long, Double, Double, Double> {
  13. public void sendMessages(Vertex<Long, Double> vertex) {
  14. for (Edge<Long, Double> edge : getEdges()) {
  15. sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
  16. }
  17. }
  18. }
  19. // gather: vertex update
  20. public static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> {
  21. public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
  22. Double minDistance = Double.MAX_VALUE;
  23. for (double msg : inMessages) {
  24. if (msg < minDistance) {
  25. minDistance = msg;
  26. }
  27. }
  28. if (vertex.getValue() > minDistance) {
  29. setNewVertexValue(minDistance);
  30. }
  31. }
  32. }
  1. // read the input graph
  2. val graph: Graph[Long, Double, Double] = ...
  3. // define the maximum number of iterations
  4. val maxIterations = 10
  5. // Execute the scatter-gather iteration
  6. val result = graph.runScatterGatherIteration(new MinDistanceMessenger, new VertexDistanceUpdater, maxIterations)
  7. // Extract the vertices as the result
  8. val singleSourceShortestPaths = result.getVertices
  9. // - - - UDFs - - - //
  10. // messaging
  11. final class MinDistanceMessenger extends ScatterFunction[Long, Double, Double, Double] {
  12. override def sendMessages(vertex: Vertex[Long, Double]) = {
  13. for (edge: Edge[Long, Double] <- getEdges) {
  14. sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
  15. }
  16. }
  17. }
  18. // vertex update
  19. final class VertexDistanceUpdater extends GatherFunction[Long, Double, Double] {
  20. override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) = {
  21. var minDistance = Double.MaxValue
  22. while (inMessages.hasNext) {
  23. val msg = inMessages.next
  24. if (msg < minDistance) {
  25. minDistance = msg
  26. }
  27. }
  28. if (vertex.getValue > minDistance) {
  29. setNewVertexValue(minDistance)
  30. }
  31. }
  32. }

Scatter Gather迭代计算配置

Scatter Gather迭代可以使用ScatterGatherConfiguration对象进行配置。

目前有以下参数可以被指定:

  • 名称: Scatter Gather迭代计算的名称,该名称在日志和消息中显示,并可以使用setName()进行指定。

  • 并行度: 迭代计算的并行度,可以使用setParallelism()进行指定。

  • 堆内Solution Set: 定义了Solution Set是否保存在堆内内存(Flink内部对象的序列化方式)中,还是保存在简单的对象表中。默认情况下,Solution Set保存在堆内内存中,该属性可以通过setSolutionSetUnmanagedMemory()方法进行设置。

  • 聚合器: 迭代聚合器可以使用registerAggregator()方法进行注册,迭代聚合器可以将每次超步的聚合结果合并起来,并使得在下次超步中可以访问它们。注册后的聚合器可以在用户定义的ScatterFunctionGatherFunction的内部进行访问。

  • 广播变量: 可以分别使用addBroadcastSetForUpdateFunction()addBroadcastSetForMessagingFunction()方法将数据集作为广播变量添加到ScatterFunctionGatherFunction

  • 顶点数: 允许迭代内部访问顶点总数,该属性可以通过setOptNumVertices()方法设置。

在顶点更新函数和消息传播函数内可以使用getNumberOfVertices()函数访问顶点数。如果该选型未设置,方法返回-1。

  • : 允许迭代内部访问顶点的出度或入度,该属性可以通过setOptDegrees()方法设置。

在scatter和gather函数内可以使用getInDegree()getInDegree()函数访问顶点的入度和出度。如果度选项未设置,方法返回-1。

  • 消息传播方向: 默认情况下,顶点发送消息给它的邻居,并基于从邻居接收到的消息更新自身的值。该配置选项允许用户更改消息传播的方向,取值EdgeDirection.INEdgeDirection.OUTEdgeDirection.ALL。相应地,消息传播方向决定了更新接受消息的方向为EdgeDirection.OUTEdgeDirection.INEdgeDirection.ALL。该属性可以通过setDirection()`方法进行设置。
  1. Graph<Long, Double, Double> graph = ...
  2. // configure the iteration
  3. ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
  4. // set the iteration name
  5. parameters.setName("Gelly Iteration");
  6. // set the parallelism
  7. parameters.setParallelism(16);
  8. // register an aggregator
  9. parameters.registerAggregator("sumAggregator", new LongSumAggregator());
  10. // run the scatter-gather iteration, also passing the configuration parameters
  11. Graph<Long, Double, Double> result =
  12. graph.runScatterGatherIteration(
  13. new Messenger(), new VertexUpdater(), maxIterations, parameters);
  14. // user-defined functions
  15. public static final class Messenger extends ScatterFunction {...}
  16. public static final class VertexUpdater extends GatherFunction {
  17. LongSumAggregator aggregator = new LongSumAggregator();
  18. public void preSuperstep() {
  19. // retrieve the Aggregator
  20. aggregator = getIterationAggregator("sumAggregator");
  21. }
  22. public void updateVertex(Vertex<Long, Long> vertex, MessageIterator inMessages) {
  23. //do some computation
  24. Long partialValue = ...
  25. // aggregate the partial value
  26. aggregator.aggregate(partialValue);
  27. // update the vertex value
  28. setNewVertexValue(...);
  29. }
  30. }
  1. val graph: Graph[Long, Double, Double] = ...
  2. val parameters = new ScatterGatherConfiguration
  3. // set the iteration name
  4. parameters.setName("Gelly Iteration")
  5. // set the parallelism
  6. parameters.setParallelism(16)
  7. // register an aggregator
  8. parameters.registerAggregator("sumAggregator", new LongSumAggregator)
  9. // run the scatter-gather iteration, also passing the configuration parameters
  10. val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
  11. // user-defined functions
  12. final class Messenger extends ScatterFunction {...}
  13. final class VertexUpdater extends GatherFunction {
  14. var aggregator = new LongSumAggregator
  15. override def preSuperstep {
  16. // retrieve the Aggregator
  17. aggregator = getIterationAggregator("sumAggregator")
  18. }
  19. override def updateVertex(vertex: Vertex[Long, Long], inMessages: MessageIterator[Long]) {
  20. //do some computation
  21. val partialValue = ...
  22. // aggregate the partial value
  23. aggregator.aggregate(partialValue)
  24. // update the vertex value
  25. setNewVertexValue(...)
  26. }
  27. }

下面的例子展示了度数和顶点数选项的使用。

  1. Graph<Long, Double, Double> graph = ...
  2. // configure the iteration
  3. ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
  4. // set the number of vertices option to true
  5. parameters.setOptNumVertices(true);
  6. // set the degree option to true
  7. parameters.setOptDegrees(true);
  8. // run the scatter-gather iteration, also passing the configuration parameters
  9. Graph<Long, Double, Double> result =
  10. graph.runScatterGatherIteration(
  11. new Messenger(), new VertexUpdater(), maxIterations, parameters);
  12. // user-defined functions
  13. public static final class Messenger extends ScatterFunction {
  14. ...
  15. // retrieve the vertex out-degree
  16. outDegree = getOutDegree();
  17. ...
  18. }
  19. public static final class VertexUpdater extends GatherFunction {
  20. ...
  21. // get the number of vertices
  22. long numVertices = getNumberOfVertices();
  23. ...
  24. }
  1. val graph: Graph[Long, Double, Double] = ...
  2. // configure the iteration
  3. val parameters = new ScatterGatherConfiguration
  4. // set the number of vertices option to true
  5. parameters.setOptNumVertices(true)
  6. // set the degree option to true
  7. parameters.setOptDegrees(true)
  8. // run the scatter-gather iteration, also passing the configuration parameters
  9. val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
  10. // user-defined functions
  11. final class Messenger extends ScatterFunction {
  12. ...
  13. // retrieve the vertex out-degree
  14. val outDegree = getOutDegree
  15. ...
  16. }
  17. final class VertexUpdater extends GatherFunction {
  18. ...
  19. // get the number of vertices
  20. val numVertices = getNumberOfVertices
  21. ...
  22. }

下面的例子展示了边方向选项的使用,顶点更新的值包含它的输入邻居列表。

  1. Graph<Long, HashSet<Long>, Double> graph = ...
  2. // configure the iteration
  3. ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
  4. // set the messaging direction
  5. parameters.setDirection(EdgeDirection.IN);
  6. // run the scatter-gather iteration, also passing the configuration parameters
  7. DataSet<Vertex<Long, HashSet<Long>>> result =
  8. graph.runScatterGatherIteration(
  9. new Messenger(), new VertexUpdater(), maxIterations, parameters)
  10. .getVertices();
  11. // user-defined functions
  12. public static final class Messenger extends GatherFunction {...}
  13. public static final class VertexUpdater extends ScatterFunction {...}
  1. val graph: Graph[Long, HashSet[Long], Double] = ...
  2. // configure the iteration
  3. val parameters = new ScatterGatherConfiguration
  4. // set the messaging direction
  5. parameters.setDirection(EdgeDirection.IN)
  6. // run the scatter-gather iteration, also passing the configuration parameters
  7. val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
  8. .getVertices
  9. // user-defined functions
  10. final class Messenger extends ScatterFunction {...}
  11. final class VertexUpdater extends GatherFunction {...}

Gather Sum Apply迭代计算

就像Scatter Gather模型,Gather Sum Apply也在迭代步骤中同步执行,即超步。每个超步包含以下部分:

  • Gather: 一个在边和对应顶点邻居上并行调用的UDF,产生一个局部值。
  • Sum: 用户定义的聚合函数,讲Gather产生的局部值合并为一个单一值。
  • Apply: 通过该函数将Sum产生的聚合值更新到每个顶点的当前值。

让我们考虑使用GSA迭代模型计算单源点最短路径的算法(SSSP),下面的图中顶点1是原点。在Gather阶段,我们通过累加每个顶点的值和边的权值计算新的候选距离。在Sum阶段,候选距离通过顶点ID分组,并选择最小的距离。在Apply阶段,新计算出来的距离将会和当前顶点的值,二者之间的最小值将会被设为顶点的新值。

GSA SSSP superstep 1

需要注意的是,如果顶点在超步中没有更新它的值,将不会在下次超步中计算候选距离。当没有顶点值发生更新时,算法收敛。

使用Gelly GSA实现该例子,用户只需要在输入图上调用runGatherSumApplyIteration方法,并提供GatherFunctionSumFunctionApplyFunction用户定义函数。迭代的同步、分组、值更新和收敛由框架处理。

  1. // read the input graph
  2. Graph<Long, Double, Double> graph = ...
  3. // define the maximum number of iterations
  4. int maxIterations = 10;
  5. // Execute the GSA iteration
  6. Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
  7. new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations);
  8. // Extract the vertices as the result
  9. DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
  10. // - - - UDFs - - - //
  11. // Gather
  12. private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
  13. public Double gather(Neighbor<Double, Double> neighbor) {
  14. return neighbor.getNeighborValue() + neighbor.getEdgeValue();
  15. }
  16. }
  17. // Sum
  18. private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
  19. public Double sum(Double newValue, Double currentValue) {
  20. return Math.min(newValue, currentValue);
  21. }
  22. }
  23. // Apply
  24. private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
  25. public void apply(Double newDistance, Double oldDistance) {
  26. if (newDistance < oldDistance) {
  27. setResult(newDistance);
  28. }
  29. }
  30. }
  1. // read the input graph
  2. val graph: Graph[Long, Double, Double] = ...
  3. // define the maximum number of iterations
  4. val maxIterations = 10
  5. // Execute the GSA iteration
  6. val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance, new UpdateDistance, maxIterations)
  7. // Extract the vertices as the result
  8. val singleSourceShortestPaths = result.getVertices
  9. // - - - UDFs - - - //
  10. // Gather
  11. final class CalculateDistances extends GatherFunction[Double, Double, Double] {
  12. override def gather(neighbor: Neighbor[Double, Double]): Double = {
  13. neighbor.getNeighborValue + neighbor.getEdgeValue
  14. }
  15. }
  16. // Sum
  17. final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
  18. override def sum(newValue: Double, currentValue: Double): Double = {
  19. Math.min(newValue, currentValue)
  20. }
  21. }
  22. // Apply
  23. final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
  24. override def apply(newDistance: Double, oldDistance: Double) = {
  25. if (newDistance < oldDistance) {
  26. setResult(newDistance)
  27. }
  28. }
  29. }

注意gatherNeighbor类型作为参数,这个方便的类型简单包装了顶点和它的邻边。

想要了解更多如何使用Gather Sum Apply模型实现算法,请查看{% gh_link /flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java “GSAPageRank” %} and {% gh_link /flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java “GSAConnectedComponents” %}提供的Gelly库方法。

Gather Sum Apply迭代计算配置

GSA迭代计算可以通过GSAConfiguration对象进行配置。

目前以下参数可以被指定:

  • 名称: GSA迭代计算的名称,该名称在日志和消息中显示,并可以使用setName()进行指定。

  • 并行度: 迭代计算的并行度,可以使用setParallelism()进行指定。

  • 堆内Solution Set: 定义了Solution Set是否保存在堆内内存(Flink内部对象的序列化方式)中,还是保存在简单的对象表中。默认情况下,Solution Set保存在堆内内存中,该属性可以通过setSolutionSetUnmanagedMemory()方法进行设置。

  • 聚合器: 迭代聚合器可以使用registerAggregator()方法进行注册,迭代聚合器可以将每次超步的聚合结果合并起来,并使得在下次超步中可以访问它们。注册后的聚合器可以在用户定义的GatherFunctionSumFunctionApplyFunction的内部进行访问。

  • 广播变量: 可以分别使用addBroadcastSetForGatherFunction()addBroadcastSetForSumFunction()addBroadcastSetForApplyFunction方法将数据集作为广播变量添加到GatherFunctionSumFunctionApplyFunction

  • 顶点数: 允许迭代内部访问顶点总数,该属性可以通过setOptNumVertices()方法设置。

在gather,sum和apply函数内可以使用getNumberOfVertices()函数访问顶点数。如果该选型未设置,方法返回-1。

  • 边方向: 默认情况下会收集顶点输出邻居的值,该方向可以使用setDirection()修改。

下面的例子展示了顶点数选项的使用。

  1. Graph<Long, Double, Double> graph = ...
  2. // configure the iteration
  3. GSAConfiguration parameters = new GSAConfiguration();
  4. // set the number of vertices option to true
  5. parameters.setOptNumVertices(true);
  6. // run the gather-sum-apply iteration, also passing the configuration parameters
  7. Graph<Long, Long, Long> result = graph.runGatherSumApplyIteration(
  8. new Gather(), new Sum(), new Apply(),
  9. maxIterations, parameters);
  10. // user-defined functions
  11. public static final class Gather {
  12. ...
  13. // get the number of vertices
  14. long numVertices = getNumberOfVertices();
  15. ...
  16. }
  17. public static final class Sum {
  18. ...
  19. // get the number of vertices
  20. long numVertices = getNumberOfVertices();
  21. ...
  22. }
  23. public static final class Apply {
  24. ...
  25. // get the number of vertices
  26. long numVertices = getNumberOfVertices();
  27. ...
  28. }
  1. val graph: Graph[Long, Double, Double] = ...
  2. // configure the iteration
  3. val parameters = new GSAConfiguration
  4. // set the number of vertices option to true
  5. parameters.setOptNumVertices(true)
  6. // run the gather-sum-apply iteration, also passing the configuration parameters
  7. val result = graph.runGatherSumApplyIteration(new Gather, new Sum, new Apply, maxIterations, parameters)
  8. // user-defined functions
  9. final class Gather {
  10. ...
  11. // get the number of vertices
  12. val numVertices = getNumberOfVertices
  13. ...
  14. }
  15. final class Sum {
  16. ...
  17. // get the number of vertices
  18. val numVertices = getNumberOfVertices
  19. ...
  20. }
  21. final class Apply {
  22. ...
  23. // get the number of vertices
  24. val numVertices = getNumberOfVertices
  25. ...
  26. }

下面的例子展示了边方向选项的使用。

  1. Graph<Long, HashSet<Long>, Double> graph = ...
  2. // configure the iteration
  3. GSAConfiguration parameters = new GSAConfiguration();
  4. // set the messaging direction
  5. parameters.setDirection(EdgeDirection.IN);
  6. // run the gather-sum-apply iteration, also passing the configuration parameters
  7. DataSet<Vertex<Long, HashSet<Long>>> result =
  8. graph.runGatherSumApplyIteration(
  9. new Gather(), new Sum(), new Apply(), maxIterations, parameters)
  10. .getVertices();
  1. val graph: Graph[Long, HashSet[Long], Double] = ...
  2. // configure the iteration
  3. val parameters = new GSAConfiguration
  4. // set the messaging direction
  5. parameters.setDirection(EdgeDirection.IN)
  6. // run the gather-sum-apply iteration, also passing the configuration parameters
  7. val result = graph.runGatherSumApplyIteration(new Gather, new Sum, new Apply, maxIterations, parameters)
  8. .getVertices()

迭代计算抽象对比

虽然Gelly提供的三种迭代计算抽象看起来很相似,然而理解它们之间的差别可以为程序提供更高的性能和可维护性。

以上三者之中,Vertex Centric模型是最通用的模型,并支持任意的计算和消息发送。Scatter Gather模型中,生产消息的逻辑和更顶点新的逻辑解耦。因此使用Scatter Gather模型开发的程序更容易跟进和维护。

将消息发送阶段和顶点更新逻辑分开,不仅让程序更容易跟进,更能对性能产生积极的影响。因为不需要同时访问存储接收消息和发送消息的数据结构,传统方式实现的Scatter Gather模型有更低的内存需求。然而,这个特性也限制了算法的表达能力,使得一些计算模式变得不够直观。自然地,如果一个算法需要顶点同时访问接收消息和发送消息的存储,那么使用Scatter Gather进行表示将会出现问题。强连通分量和近似最大权匹配算法就是类似的算法。这个限制的直接后果,就是顶点在同一个阶段中,无法既生成消息又更新状态。从而,要决定是否传播消息,就需要存储点的值,以便下一次迭代的gather阶段可以访问得到。如果顶点更新逻辑包含对邻边权值的计算,这就需要内部包含一个从scatter到gather阶段的特殊消息。因此,通常的解决办法将会导致更高的内存消耗,降低代码的优雅性,从而导致算法实现更难被理解。

GSA迭代模型和Scatter Gather模型也很相似。实际上,任何使用GSA表达的算法都可以使用Scatter Gather模型实现。Scatter Gather模型的消息发送阶段和GSA的Gather和Sum阶段等价:Gather可以被看作消息的生产阶段,Sum可以被看作消息路由到目标顶点的阶段。而顶点更新阶段和Apply步骤相对应。

二者实现的最主要的差别是GSA的Gather阶段是基于边的并行计算,而Scatter Gather的消息发送阶段是基于顶点的并行计算。结合上边SSSP的例子,我们可以看到在Scatter Gather例子的第一个超不中,顶点1、2、3并行产生消息,顶点1生产了3条消息,而顶点2和3各生产了一条消息。而在GSA的例子中,计算是基于边并行的:顶点1的三个候选的距离值是并行产生的。因此,如果Gather阶段包含了大量计算,使用GSA进行传播计算将是更好的方案,而不是加重一个顶点的计算。另外一种情况就是当输入图是倾斜(一些顶点相对于其他顶点拥有更多的邻居)的时候,基于边的并行计算会更加高效。

两者的另外一个区别就是Scatter Gather模型内部使用了coGroup,而GSA使用reduce进行计算。因此,如果计算中合并消息的函数需要全组的值,就应该使用Scatter Gather模型。如果更新函数是可结合、可交换的,那么GSA的聚合操作将是更高效的实现,因为它可以利用组合的特性。

另外需要注意的是,GSA需要严格地在邻居上执行,而Vertex Centric和Scatter Gather模型,顶点可以给任何已知ID的顶点发送消息,而不管这个顶点是否是自己的邻居。最后,在Gelly的Scatter Gather实现中,用户可以选择消息传播的方向,而GSA尚不能支持该特性,因此每个顶点只能基于它的输入邻居的值进行更新。

Gelly的迭代计算模型之间的主要差别如下表所示。

Iteration Model Update Function Update Logic Communication Scope Communication Logic
Vertex-Centric arbitrary arbitrary any vertex arbitrary
Scatter-Gather arbitrary based on received messages any vertex based on vertex state
Gather-Sum-Apply associative and commutative based on neighbors’ values neighborhood based on vertex state