距离问题详解
边的属性代表距离,箭头代表方向,如只能从5到2,不能从2到5,计算顶点5到各点的距离
// 顶点5到其他各顶点的最短距离。聚合操作(Pregel API)
// 消息按照我的理解,就是A点如果更新了,就会向在其他机子上的A点传递自己的信息
val sourceId: VertexId = 5L
//初始化,5到5的距离是零,5到其各个点的距离是无穷
val initailGraph: Graph[Double, Int] = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
//pregerl api
/*
def pregel[A: ClassTag](
//初始值,或者看成初始消息
initialMsg: A,
maxIterations: Int = Int.MaxValue,
//边的方向,选择out,则只计算出边,either 出边或入边
activeDirection: EdgeDirection = EdgeDirection.out)(
// 顶点接收到消息后的计算,第一次所有点都调用,传入初始消息,后续只有接收到消息的点会调用
vprog: (VertexId, VD, A) => VD,
// 只有在当前迭代中接收到消息的点会调用,由于第一次所有点都有消息,所以所有点都会调用,调用时会沿出边调用triplet
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
// 合并消息,如果一个点有多个消息,先合并
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
*/
val disGraph: Graph[Double, Int] = initailGraph.pregel(Double.PositiveInfinity, maxIterations = Int.MaxValue,activeDirection = EdgeDirection.Out)(
// 两个消息来的时候,取其中的最小路径
(id, dist, newDist) => math.min(dist, newDist),
// Send Message 函数
triplet => {
println(triplet)
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else
Iterator.empty
},
// mergeMsg
(dista, distb) => math.min(dista, distb)
)
disGraph.vertices.foreach(println)
可以比较一下EdgeDirection = EdgeDirection.Out和EdgeDirection = EdgeDirection.Either的打印结果
当点更新后,就会向另一个地方的自己发送消息,从而触发triplet的迭代,
最终结果一致,但out的迭代次数更少