距离问题详解
    image.png
    边的属性代表距离,箭头代表方向,如只能从5到2,不能从2到5,计算顶点5到各点的距离

    1. // 顶点5到其他各顶点的最短距离。聚合操作(Pregel API)
    2. // 消息按照我的理解,就是A点如果更新了,就会向在其他机子上的A点传递自己的信息
    3. val sourceId: VertexId = 5L
    4. //初始化,5到5的距离是零,5到其各个点的距离是无穷
    5. val initailGraph: Graph[Double, Int] = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
    6. //pregerl api
    7. /*
    8. def pregel[A: ClassTag](
    9. //初始值,或者看成初始消息
    10. initialMsg: A,
    11. maxIterations: Int = Int.MaxValue,
    12. //边的方向,选择out,则只计算出边,either 出边或入边
    13. activeDirection: EdgeDirection = EdgeDirection.out)(
    14. // 顶点接收到消息后的计算,第一次所有点都调用,传入初始消息,后续只有接收到消息的点会调用
    15. vprog: (VertexId, VD, A) => VD,
    16. // 只有在当前迭代中接收到消息的点会调用,由于第一次所有点都有消息,所以所有点都会调用,调用时会沿出边调用triplet
    17. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
    18. // 合并消息,如果一个点有多个消息,先合并
    19. mergeMsg: (A, A) => A)
    20. : Graph[VD, ED] = {
    21. Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
    22. }
    23. */
    24. val disGraph: Graph[Double, Int] = initailGraph.pregel(Double.PositiveInfinity, maxIterations = Int.MaxValue,activeDirection = EdgeDirection.Out)(
    25. // 两个消息来的时候,取其中的最小路径
    26. (id, dist, newDist) => math.min(dist, newDist),
    27. // Send Message 函数
    28. triplet => {
    29. println(triplet)
    30. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
    31. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    32. } else
    33. Iterator.empty
    34. },
    35. // mergeMsg
    36. (dista, distb) => math.min(dista, distb)
    37. )
    38. disGraph.vertices.foreach(println)

    可以比较一下EdgeDirection = EdgeDirection.Out和EdgeDirection = EdgeDirection.Either的打印结果
    当点更新后,就会向另一个地方的自己发送消息,从而触发triplet的迭代,
    最终结果一致,但out的迭代次数更少
    image.png