距离问题详解
边的属性代表距离,箭头代表方向,如只能从5到2,不能从2到5,计算顶点5到各点的距离
// 顶点5到其他各顶点的最短距离。聚合操作(Pregel API)// 消息按照我的理解,就是A点如果更新了,就会向在其他机子上的A点传递自己的信息val sourceId: VertexId = 5L//初始化,5到5的距离是零,5到其各个点的距离是无穷val initailGraph: Graph[Double, Int] = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)//pregerl api/*def pregel[A: ClassTag](//初始值,或者看成初始消息initialMsg: A,maxIterations: Int = Int.MaxValue,//边的方向,选择out,则只计算出边,either 出边或入边activeDirection: EdgeDirection = EdgeDirection.out)(// 顶点接收到消息后的计算,第一次所有点都调用,传入初始消息,后续只有接收到消息的点会调用vprog: (VertexId, VD, A) => VD,// 只有在当前迭代中接收到消息的点会调用,由于第一次所有点都有消息,所以所有点都会调用,调用时会沿出边调用tripletsendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],// 合并消息,如果一个点有多个消息,先合并mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}*/val disGraph: Graph[Double, Int] = initailGraph.pregel(Double.PositiveInfinity, maxIterations = Int.MaxValue,activeDirection = EdgeDirection.Out)(// 两个消息来的时候,取其中的最小路径(id, dist, newDist) => math.min(dist, newDist),// Send Message 函数triplet => {println(triplet)if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))} elseIterator.empty},// mergeMsg(dista, distb) => math.min(dista, distb))disGraph.vertices.foreach(println)
可以比较一下EdgeDirection = EdgeDirection.Out和EdgeDirection = EdgeDirection.Either的打印结果
当点更新后,就会向另一个地方的自己发送消息,从而触发triplet的迭代,
最终结果一致,但out的迭代次数更少
