5.2.1 存储模式

5.2.1.1 图存储模式

PowerGraph 巨型图的存储总体上有边分割和点分割两种存储方式。

  • 边分割(Edge-Cut)每个顶点都存储一次,但有的边会被打断分到两台机器上。这样做的好处是节省存储空间;坏处是对图进行基于边的计算时,对于一条两个顶点被分到不同机器上的边来说,要跨机器通信传输数据,内网通信流量大。
  • 点分割(Vertex-Cut)每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上,增加了存储开销,同时会引发数据同步问题。好处是可以大幅减少内网通信量。

5.2 Spark GraphX 解析 - 图1

虽然两种方法互有利弊,但现在是点分割占上风,各种分布式图计算框架都将自己底层的存储形式变成了点分割。主要原因有以下两个:

  • 磁盘价格下降,存储空间不再是问题,而内网的通信资源没有突破性进展,集群计算时内网带宽是宝贵的,时间比磁盘更珍贵。这点就类似于常见的空间换时间的策略。
  • 在当前的应用场景中,绝大多数网络都是 “无尺度网络”,遵循幂律分布,不同点的邻居数量相差非常悬殊。而边分割会使那些多邻居的点所相连的边大多数被分到不同的机器上,这样的数据分布会使得内网带宽更加捉襟见肘,于是边分割存储方式被渐渐抛弃了

5.2.1.2 GraphX 存储模式

Graphx 借鉴 PowerGraph,使用的是 Vertex-Cut(点分割) 方式存储图,用三个 RDD 存储图数据信息

  • VertexTable(id, data):id 为顶点 id, data 为顶点属性。
  • EdgeTable(pid, src, dst, data):pid 为分区 id ,src 为源顶点 id ,dst 为目的顶点 id,data 为边属性。
  • RoutingTable(id, pid):id 为顶点 id ,pid 为分区 id。

点分割存储实现如下图所示:

5.2 Spark GraphX 解析 - 图2

GraphX 在进行图分割时,有几种不同的分区 (partition) 策略,它通过 PartitionStrategy 专门定义这些策略。在 PartitionStrategy 中,总共定义了 EdgePartition2D、EdgePartition1D、RandomVertexCut 以及 CanonicalRandomVertexCut 这四种不同的分区策略。下面分别介绍这几种策略。

RandomVertexCut
**

  1. case object RandomVertexCut extends PartitionStrategy {
  2. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
  3. math.abs((src, dst).hashCode()) % numParts
  4. }
  5. }

这个方法比较简单,通过取源顶点和目标顶点 id 的哈希值来将边分配到不同的分区。这个方法会产生一个随机的边分割,两个顶点之间相同方向的边会分配到同一个分区

CanonicalRandomVertexCut

  1. case object CanonicalRandomVertexCut extends PartitionStrategy {
  2. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
  3. if (src < dst) {
  4. math.abs((src, dst).hashCode()) % numParts
  5. } else {
  6. math.abs((dst, src).hashCode()) % numParts
  7. }
  8. }
  9. }

这种分割方法和前一种方法没有本质的不同。不同的是,哈希值的产生带有确定的方向(即两个顶点中较小 id 的顶点在前)两个顶点之间所有的边都会分配到同一个分区,而不管方向如何。

EdgePartition1D

  1. case object EdgePartition1D extends PartitionStrategy {
  2. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
  3. val mixingPrime: VertexId = 1125899906842597L
  4. (math.abs(src * mixingPrime) % numParts).toInt
  5. }
  6. }

这种方法仅仅根据源顶点 id 来将边分配到不同的分区。有相同源顶点的边会分配到同一分区。

EdgePartition2D

  1. case object EdgePartition2D extends PartitionStrategy {
  2. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
  3. val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
  4. val mixingPrime: VertexId = 1125899906842597L
  5. if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
  6. // Use old method for perfect squared to ensure we get same results
  7. val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
  8. val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
  9. (col * ceilSqrtNumParts + row) % numParts
  10. } else {
  11. // Otherwise use new method
  12. val cols = ceilSqrtNumParts
  13. val rows = (numParts + cols - 1) / cols
  14. val lastColRows = numParts - rows * (cols - 1)
  15. val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
  16. val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
  17. col * rows + row
  18. }
  19. }
  20. }

这种分割方法同时使用到了源顶点 id 和目的顶点 id它使用稀疏边连接矩阵的 2 维区分来将边分配到不同的分区,从而保证顶点的备份数不大于 **2 * sqrt(numParts)** 的限制。这里 numParts 表示分区数。 这个方法的实现分两种情况,即分区数能完全开方和不能完全开方两种情况。

5.2.2 vertices、edges 以及 triplets

vertices、edges 以及 triplets 是 GraphX 中三个非常重要的概念。我们在前文 GraphX 介绍中对这三个概念有初步的了解。

5.2.2.1 vertices

在 GraphX 中,vertices 对应着名称为 VertexRDD 的 RDD。这个 RDD 有顶点 id 和顶点属性两个成员变量。它的源码如下所示:

  1. abstract class VertexRDD[VD](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps)

从源码中我们可以看到,VertexRDD 继承自 RDD[(VertexId, VD)],这里 VertexId 表示顶点 id,VD 表示顶点所带的属性的类别。这从另一个角度也说明 VertexRDD 拥有顶点 id 和顶点属性

5.2.2.2 edges

在 GraphX 中,edges 对应着 EdgeRDD。这个 RDD 拥有三个成员变量,分别是源顶点 id、目标顶点 id 以及边属性。它的源码如下所示:

  1. abstract class EdgeRDD[ED](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps)

从源码中我们可以看到,EdgeRDD 继承自 RDD[Edge[ED]],即类型为 Edge[ED] 的 RDD。

5.2.2.3 triplets

在 GraphX 中,triplets 对应着 EdgeTriplet。它是一个三元组视图,这个视图逻辑上将顶点和边的属性保存为一个 RDD[EdgeTriplet[VD, ED]]。可以通过下面的 Sql 表达式表示这个三元视图的含义:

  1. SELECT
  2. src.id ,
  3. dst.id ,
  4. src.attr ,
  5. e.attr ,
  6. dst.attr
  7. FROM
  8. edges AS e
  9. LEFT JOIN vertices AS src ,
  10. vertices AS dst ON e.srcId = src.Id
  11. AND e.dstId = dst.Id

同样,也可以通过下面图解的形式来表示它的含义:

5.2 Spark GraphX 解析 - 图3

EdgeTriplet 的源代码如下所示:

  1. class EdgeTriplet[VD, ED] extends Edge[ED] {
  2. //源顶点属性
  3. var srcAttr: VD = _ // nullValue[VD]
  4. //目标顶点属性
  5. var dstAttr: VD = _ // nullValue[VD]
  6. protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = {
  7. srcId = other.srcId
  8. dstId = other.dstId
  9. attr = other.attr
  10. this
  11. }
  12. }

EdgeTriplet 类继承自 Edge 类,我们来看看这个父类:

  1. case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (var srcId: VertexId = 0, var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable

Edge 类中包含源顶点 id,目标顶点 id 以及边的属性。所以从源代码中我们可以知道,triplets 既包含了边属性也包含了源顶点的 id 和属性、目标顶点的 id 和属性

5.2.3 图的构建

GraphX 的 Graph 对象是用户操作图的入口。前面的章节我们介绍过,它包含了边 (edges)、顶点(vertices) 以及 triplets 三部分,并且这三部分都包含相应的属性,可以携带额外的信息。

5.2.3.1 构建图的方法

构建图的入口方法有两种,分别是根据边构建和根据边的两个顶点构建。

根据边构建图 (**Graph.fromEdges**)
**
对于顶点的属性是使用提供的默认属性:

  1. def fromEdges[VD: ClassTag, ED: ClassTag](
  2. edges: RDD[Edge[ED]],
  3. defaultValue: VD,
  4. edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
  5. vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
  6. GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
  7. }

根据边的两个顶点数据构建 (**Graph.fromEdgeTuples**)
**
对于顶点的属性是使用提供的默认属性,对于边的属性是相同边的数量:

  1. def fromEdgeTuples[VD: ClassTag](
  2. rawEdges: RDD[(VertexId, VertexId)],
  3. defaultValue: VD,
  4. uniqueEdges: Option[PartitionStrategy] = None,
  5. edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
  6. vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
  7. {
  8. val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
  9. val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
  10. uniqueEdges match {
  11. case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
  12. case None => graph
  13. }
  14. }

从上面的代码我们知道,不管是根据边构建图还是根据边的两个顶点数据构建,最终都是使用 GraphImpl 来构建的,即调用了 GraphImpl 的 apply 方法。

从具有属性的顶点和边的 RDD 构建(Graph()
**

  1. def apply[VD: ClassTag, ED: ClassTag](
  2. vertices: RDD[(VertexId, VD)],
  3. edges: RDD[Edge[ED]],
  4. defaultVertexAttr: VD = null.asInstanceOf[VD],
  5. edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
  6. vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

5.2.3.2 构建图的过程

(注意这里讲的是底层构图原理)构建图的过程很简单,分为三步,它们分别是构建边 EdgeRDD构建顶点 VertexRDD生成 Graph 对象。下面分别介绍这三个步骤:

Step1:构建边 EdgeRDD
**
从源代码看 构建边 EdgeRDD 也分为三步,下图的例子详细说明了这些步骤。

5.2 Spark GraphX 解析 - 图4

(1)从文件中加载信息,转换成 tuple 的形式,即 (srcId, dstId)

  1. def fromEdgeTuples[VD: ClassTag](
  2. rawEdges: RDD[(VertexId, VertexId)],
  3. defaultValue: VD,
  4. uniqueEdges: Option[PartitionStrategy] = None,
  5. edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
  6. vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
  7. {
  8. val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
  9. val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
  10. uniqueEdges match {
  11. case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
  12. case None => graph
  13. }
  14. }

(2)入口,调用 Graph.fromEdgeTuples(rawEdgesRdd)

源数据为分割的两个点 ID,把源数据映射成 Edge(srcId, dstId, attr) 对象, attr 默认为 1。这样元数据就构建成了 RDD[Edge[ED]],如下面的代码:

  1. val edges = rawEdges.map(p => Edge(p._1, p._2, 1))

(3)将 RDD[Edge[ED]] 进一步转化成 EdgeRDDImpl[ED, VD]

第二步构建完 RDD[Edge[ED]] 之后,GraphX 通过调用 GraphImplapply 方法来构建 Graph。如下所示:

  1. val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
  2. def apply[VD: ClassTag, ED: ClassTag](
  3. edges: RDD[Edge[ED]],
  4. defaultVertexAttr: VD,
  5. edgeStorageLevel: StorageLevel,
  6. vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
  7. fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
  8. }

在 apply 调用 fromEdgeRDD 之前,代码会调用 **EdgeRDD.fromEdges(edges)**RDD[Edge[ED]] 转化成 EdgeRDDImpl[ED, VD]。如下所示:

  1. def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
  2. val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
  3. val builder = new EdgePartitionBuilder[ED, VD]
  4. iter.foreach { e =>
  5. builder.add(e.srcId, e.dstId, e.attr)
  6. }
  7. Iterator((pid, builder.toEdgePartition))
  8. }
  9. EdgeRDD.fromEdgePartitions(edgePartitions)
  10. }

程序遍历 RDD[Edge[ED]] 的每个分区,并调用 builder.toEdgePartition 对分区内的边作相应的处理。如下所示:

  1. def toEdgePartition: EdgePartition[ED, VD] = {
  2. val edgeArray = edges.trim().array
  3. new Sorter(Edge.edgeArraySortDataFormat[ED])
  4. .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
  5. val localSrcIds = new Array[Int](edgeArray.size)
  6. val localDstIds = new Array[Int](edgeArray.size)
  7. val data = new Array[ED](edgeArray.size)
  8. val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
  9. val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
  10. val local2global = new PrimitiveVector[VertexId]
  11. var vertexAttrs = Array.empty[VD]
  12. //采用列式存储的方式,节省了空间
  13. if (edgeArray.length > 0) {
  14. index.update(edgeArray(0).srcId, 0)
  15. var currSrcId: VertexId = edgeArray(0).srcId
  16. var currLocalId = -1
  17. var i = 0
  18. while (i < edgeArray.size) {
  19. val srcId = edgeArray(i).srcId
  20. val dstId = edgeArray(i).dstId
  21. localSrcIds(i) = global2local.changeValue(srcId,
  22. { currLocalId += 1; local2global += srcId; currLocalId }, identity)
  23. localDstIds(i) = global2local.changeValue(dstId,
  24. { currLocalId += 1; local2global += dstId; currLocalId }, identity)
  25. data(i) = edgeArray(i).attr
  26. //相同顶点srcId中第一个出现的srcId与其下标
  27. if (srcId != currSrcId) {
  28. currSrcId = srcId
  29. index.update(currSrcId, i)
  30. }
  31. i += 1
  32. }
  33. vertexAttrs = new Array[VD](currLocalId + 1)
  34. }
  35. new EdgePartition(
  36. localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs,
  37. None)
  38. }

说明:

  • toEdgePartition 的第一步就是对边进行排序
    • 按照 srcId 从小到大排序。排序是为了遍历时顺序访问,加快访问速度。采用数组而不是 Map,是因为数组是连续的内存单元,具有原子性,避免了 Map 的 hash 问题,访问速度快。
  • toEdgePartition 的第二步就是填充相关属性。包括 localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs。
    • 数组 localSrcIds, localDstIds 中保存的是通过 global2local.changeValue(srcId/dstId) 转换而成的分区本地索引。可以通过 localSrcIds、localDstIds 数组中保存的索引位从 local2global 中查到具体的 VertexId
    • global2local 是一个简单的,key 值非负的快速 hash map:GraphXPrimitiveKeyOpenHashMap,保存 vertextId 和本地索引的映射关系。global2local 中包含当前 partition 所有 srcId、dstId 与本地索引的映射关系。
  • data 就是当前分区的 attr 属性数组
    • 我们知道相同的 srcId 可能对应不同的 dstId。按照 srcId 排序之后,相同的 srcId 会出现多行,如上图中的 index desc 部分。index 中记录的是相同 srcId 中第一个出现的 srcId 与其下标。
  • local2global 记录的是所有的 VertexId 信息的数组。形如:srcId, dstId, srcId, dstId, srcId, dstId, srcId, dstId。其中会包含相同的 srcId。即:当前分区所有 vertextId 的顺序实际值。

我们可以通过根据本地下标取 VertexId,也可以根据 VertexId 取本地下标,取相应的属性。如下所示:

  1. // 根据本地下标取 VertexId
  2. localSrcIds/localDstIds -> index -> local2global -> VertexId
  3. // 根据 VertexId 取本地下标,取属性
  4. VertexId -> global2local -> index -> data -> attr object

Step2:构建顶点 VertexRDD
**
紧接着上面构建边 RDD 的代码,我们看看方法 fromEdgeRDD 的实现:

  1. private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
  2. edges: EdgeRDDImpl[ED, VD],
  3. defaultVertexAttr: VD,
  4. edgeStorageLevel: StorageLevel,
  5. vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
  6. val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
  7. val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
  8. .withTargetStorageLevel(vertexStorageLevel)
  9. fromExistingRDDs(vertices, edgesCached)
  10. }

从上面的代码我们可以知道,GraphX 使用 VertexRDD.fromEdges 构建顶点 VertexRDD,当然我们把边 RDD 作为参数传入。如下所示:

  1. def fromEdges[VD: ClassTag](edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
  2. // 1 创建路由表
  3. val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
  4. // 2 根据路由表生成分区对象vertexPartitions
  5. val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
  6. val routingTable =
  7. if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
  8. Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
  9. }, preservesPartitioning = true)
  10. // 3 创建VertexRDDImpl对象
  11. new VertexRDDImpl(vertexPartitions)
  12. }

构建顶点 VertexRDD 的过程分为三步,如上代码中的注释。它的构建过程如下图所示:

5.2 Spark GraphX 解析 - 图5

Step3:创建路由表

为了能通过点找到边,每个点需要保存点到边的信息,这些信息保存在
RoutingTablePartition **中。createRoutingTables 方法如下所示:

  1. private[graphx] def createRoutingTables(edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
  2. // 将edge partition中的数据转换成RoutingTableMessage类型,
  3. val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
  4. Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
  5. }

上述程序首先将边分区中的数据转换成 RoutingTableMessage 类型,即 tuple(VertexId,Int) 类型。

  1. def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]): Iterator[RoutingTableMessage] = {
  2. val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
  3. edgePartition.iterator.foreach { e =>
  4. map.changeValue(e.srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
  5. map.changeValue(e.dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
  6. }
  7. map.iterator.map { vidAndPosition =>
  8. val vid = vidAndPosition._1
  9. val position = vidAndPosition._2
  10. toMessage(vid, pid, position)
  11. }
  12. }
  13. //`30-0`比特位表示边分区`ID`,`32-31`比特位表示标志位
  14. private def toMessage(vid: VertexId, pid: PartitionID, position: Byte): RoutingTableMessage = {
  15. val positionUpper2 = position << 30
  16. val pidLower30 = pid & 0x3FFFFFFF
  17. (vid, positionUpper2 | pidLower30)
  18. }

根据代码,我们可以知道程序使用 int 的 32-31 比特位表示标志位,即 01: isSrcId ,10: isDstId。30-0 比特位表示边分区 ID。这样做可以节省内存。RoutingTableMessage 表达的信息是:顶点 id 和它相关联的边的分区 id 是放在一起的, 所以任何时候,我们都可以通过 RoutingTableMessage 找到顶点关联的边

Step4:根据路由表生成分区对象

  1. private[graphx] def createRoutingTables(edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
  2. // 将edge partition中的数据转换成RoutingTableMessage类型,
  3. val numEdgePartitions = edges.partitions.size
  4. vid2pid.partitionBy(vertexPartitioner).mapPartitions(
  5. iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
  6. preservesPartitioning = true)
  7. }

我们将第 1 步生成的 vid2pid 按照 HashPartitioner 重新分区。我们看看 RoutingTablePartition.fromMsgs 方法:

  1. def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage]): RoutingTablePartition = {
  2. val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
  3. val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
  4. val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
  5. for (msg <- iter) {
  6. val vid = vidFromMessage(msg)
  7. val pid = pidFromMessage(msg)
  8. val position = positionFromMessage(msg)
  9. pid2vid(pid) += vid
  10. srcFlags(pid) += (position & 0x1) != 0
  11. dstFlags(pid) += (position & 0x2) != 0
  12. }
  13. new RoutingTablePartition(pid2vid.zipWithIndex.map {
  14. case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
  15. })
  16. }

该方法从 RoutingTableMessage 获取数据,将 vid, 边 pid, isSrcId/isDstId 重新封装到 pid2vid,srcFlags,dstFlags 这三个数据结构中。它们表示当前顶点分区中的点在边分区的分布。想象一下,重新分区后,新分区中的点可能来自于不同的边分区,所以一个点要找到边,就需要先确定边的分区号 pid, 然后在确定的边分区中确定是 srcId 还是 dstId, 这样就找到了边。新分区中保存 vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)) 这样的记录。这里转换为 toBitSet 保存是为了节省空间。

根据上文生成的 routingTables,重新封装路由表里的数据结构为 ShippableVertexPartition。ShippableVertexPartition 会合并相同重复点的属性 attr 对象,补全缺失的 attr 对象。r如下所示:

  1. def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
  2. val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
  3. // 合并顶点
  4. iter.foreach { pair =>
  5. map.setMerge(pair._1, pair._2, mergeFunc)
  6. }
  7. // 不全缺失的属性值
  8. routingTable.iterator.foreach { vid =>
  9. map.changeValue(vid, defaultVal, identity)
  10. }
  11. new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
  12. }
  13. //ShippableVertexPartition定义
  14. ShippableVertexPartition[VD: ClassTag](
  15. val index: VertexIdToIndexMap,
  16. val values: Array[VD],
  17. val mask: BitSet,
  18. val routingTable: RoutingTablePartition)

map 就是映射 vertexId->attr,index 就是顶点集合,values 就是顶点集对应的属性集,mask 指顶点集的 BitSet。

Step5:生成 Graph 对象

2223.webp

使用上述构建的 edgeRDD vertexRDD,使用 new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]])) 就可以生成 Graph 对象。ReplicatedVertexView 是点和边的视图,用来管理运送 (shipping) 顶点属性到 EdgeRDD 的分区。当顶点属性改变时,我们需要运送它们到边分区来更新保存在边分区的顶点属性。

注意:在 ReplicatedVertexView 中不要保存一个对边的引用,因为在属性运送等级升级后,这个引用可能会发生改变。

  1. class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](var edges: EdgeRDDImpl[ED, VD], var hasSrcId: Boolean = false, var hasDstId: Boolean = false)

5.2.4 计算模式

5.2.4.1 BSP 计算模式

目前基于图的并行计算框架已经有很多,比如来自 Google 的 Pregel、来自 Apache 开源的图计算框架 Giraph/HAMA 以及最为著名的 GraphLab,其中 PregelHAMA Giraph 都是非常类似的,都是基于 BSP(Bulk Synchronous Parallell)模式。 Bulk Synchronous Parallell,即整体同步并行。

在 BSP 中,一次计算过程由一系列全局超步组成,每一个超步由并发计算、通信和同步三个步骤组成。同步完成,标志着这个超步的完成及下一个超步的开始。 BSP 模式的准则是批量同步 (bulk synchrony),其独特之处在于超步 (superstep) 概念的引入。一个 BSP 程序同时具有水平和垂直两个方面的结构。从垂直上看, 一个 BSP 程序由一系列串行的超步 (superstep) 组成,如图所示:

5.2 Spark GraphX 解析 - 图7

从水平上看,在一个超步中,所有的进程并行执行局部计算。一个超步可分为三个阶段,如图所示:

5.2 Spark GraphX 解析 - 图8

三个阶段:

  • 本地计算阶段,每个处理器只对存储在本地内存中的数据进行本地计算。
  • 全局通信阶段,对任何非本地数据进行操作。
  • 栅栏同步阶段,等待所有通信行为的结束。

BSP 模型有如下几个特点:

  • 将计算划分为一个一个的超步 (superstep),有效避免死锁。
  • 将处理器和路由器分开,强调了计算任务和通信任务的分开,而路由器仅仅完成点到点的消息传递,不提供组合、复制和广播等功能,这样做既掩盖具体的互连网络拓扑,又简化了通信协议。
  • 采用障碍同步的方式、以硬件实现的全局同步是可控的粗粒度级,提供了执行紧耦合同步式并行算法的有效方式。

5.2.4.2 图操作一览

正如 RDDs 有基本的操作 map、filter 和 reduceByKey 一样,属性图也有基本的集合操作,这些操作采用用户自定义的函数并产生包含转换特征和结构的新图。定义在 Graph 中的核心操作是经过优化的实现。表示为核心操作的组合的便捷操作定义在 GraphOps 中。然而,因为有 Scala 的隐式转换,定义在 GraphOps 中的操作可以作为 Graph 的成员自动使用。例如,我们可以通过下面的方式计算每个顶点 (定义在 GraphOps 中) 的入度。如下所示:

  1. val graph: Graph[(String, String), String]
  2. // Use the implicit GraphOps.inDegrees operator
  3. val inDegrees: VertexRDD[Int] = graph.inDegrees

区分核心图操作和 GraphOps 的原因是为了在将来支持不同的图表示。每个图表示都必须提供核心操作的实现并重用很多定义在 GraphOps 中的有用操作。

5.2 Spark GraphX 解析 - 图9

5.2.4.3 基本信息操作

以下是定义在 Graph 和 GraphOps 中(为了简单起见,表现为图的成员)的功能的快速浏览。注意,某些函数签名已经简化(如默认参数和类型的限制已删除),一些更高级的功能已经被删除,所以请参阅 API 文档了解官方的操作列表。

5.2 Spark GraphX 解析 - 图10

图属性操作总结:

  1. import org.apache.spark._
  2. import org.apache.spark.graphx._
  3. import org.apache.spark.rdd.RDD
  4. val users: VertexRDD[(String, String)] = VertexRDD[(String, String)](sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))))
  5. val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
  6. val graph = Graph(users, relationships)
  7. /** 图属性操做总结 */
  8. class Graph[VD, ED] {
  9. // 图信息操做
  10. // 获取边的数量
  11. val numEdges: Long
  12. // 获取顶点的数量
  13. val numVertices: Long
  14. // 获取全部顶点的入度
  15. val inDegrees: VertexRDD[Int]
  16. // 获取全部顶点的出度
  17. val outDegrees: VertexRDD[Int]
  18. // 获取全部顶点入度与出度之和
  19. val degrees: VertexRDD[Int]
  20. // 获取全部顶点的集合
  21. val vertices: VertexRDD[VD]
  22. // 获取全部边的集合
  23. val edges: EdgeRDD[ED]
  24. // 获取全部triplets表示的集合
  25. val triplets: RDD[EdgeTriplet[VD, ED]]
  26. // 缓存操做
  27. def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  28. def cache(): Graph[VD, ED]
  29. // 取消缓存
  30. def unpersist(blocking: Boolean = true): Graph[VD, ED]
  31. // 图重新分区
  32. def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  33. // 顶点和边属性转换
  34. def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
  35. def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  36. def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  37. def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  38. def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2]
  39. // 修改图结构
  40. // 反转图
  41. def reverse: Graph[VD, ED]
  42. // 获取子图
  43. def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true), vpred: (VertexID, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]
  44. def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  45. def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  46. // Join RDDs with the graph
  47. def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
  48. def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)]) (mapFunc: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED]
  49. // Aggregate information about adjacent triplets
  50. def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
  51. def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
  52. def aggregateMessages[Msg: ClassTag](sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
  53. // Iterative graph-parallel computation
  54. def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(vprog: (VertexID, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], mergeMsg: (A, A) => A): Graph[VD, ED]
  55. // Basic graph algorithms
  56. def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  57. def connectedComponents(): Graph[VertexID, ED]
  58. def triangleCount(): Graph[Int, ED]
  59. def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
  60. }

5.2.4.4 转换操作

GraphX 中的转换操作主要有 mapVertices、mapEdges 和 mapTriplets 三个,它们在 Graph 文件中定义,在 GraphImpl 文件中实现。下面分别介绍这三个方法。

mapVertices

mapVertices 用来更新顶点属性**。从图的构建那章我们知道,顶点属性保存在边分区中,所以我们需要改变的是边分区中的属性。对当前图每一个顶点应用提供的 map 函数来修改顶点的属性,返回一个新的图。

  1. override def mapVertices[VD2: ClassTag]
  2. (f: (VertexId, VD) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
  3. if (eq != null) {
  4. vertices.cache()
  5. // 使用方法 f 处理 vertices
  6. val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
  7. // 获得两个不同 vertexRDD 的不同
  8. val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
  9. // 更新 ReplicatedVertexView
  10. val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
  11. .updateVertices(changedVerts)
  12. new GraphImpl(newVerts, newReplicatedVertexView)
  13. } else {
  14. GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
  15. }
  16. }

上面的代码中,当 VD 和 VD2 类型相同时,我们可以重用没有发生变化的点,否则需要重新创建所有的点。我们分析 VD 和 VD2 相同的情况,分四步处理。

(1)使用方法 f 处理 vertices,获得新的 VertexRDD。

(2)使用在 VertexRDD 中定义的 diff 方法求出新 VertexRDD 和源 VertexRDD 的不同。

  1. override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
  2. val otherPartition = other match {
  3. case other: VertexRDD[_] if this.partitioner == other.partitioner =>
  4. other.partitionsRDD
  5. case _ =>
  6. VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
  7. }
  8. val newPartitionsRDD = partitionsRDD.zipPartitions(
  9. otherPartition, preservesPartitioning = true
  10. ) { (thisIter, otherIter) =>
  11. val thisPart = thisIter.next()
  12. val otherPart = otherIter.next()
  13. Iterator(thisPart.diff(otherPart))
  14. }
  15. this.withPartitionsRDD(newPartitionsRDD)
  16. }

这个方法首先处理新生成的 VertexRDD 的分区,如果它的分区和源 VertexRDD 的分区一致,那么直接取出它的 partitionsRDD,否则重新分区后取出它的 partitionsRDD。 针对新旧两个 VertexRDD 的所有分区,调用 VertexPartitionBaseOps 中的 diff 方法求得分区的不同。

  1. def diff(other: Self[VD]): Self[VD] = {
  2. // 首先判断
  3. if (self.index != other.index) {
  4. diff(createUsingIndex(other.iterator))
  5. } else {
  6. val newMask = self.mask & other.mask
  7. var i = newMask.nextSetBit(0)
  8. while (i >= 0) {
  9. if (self.values(i) == other.values(i)) {
  10. newMask.unset(i)
  11. }
  12. i = newMask.nextSetBit(i + 1)
  13. }
  14. this.withValues(other.values).withMask(newMask)
  15. }
  16. }

该方法隐藏两个 VertexRDD 中相同的顶点信息,得到一个新的 VertexRDD。

(3)更新 ReplicatedVertexView。

  1. def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = {
  2. // 生成一个 VertexAttributeBlock
  3. val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId)
  4. .setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format(
  5. hasSrcId, hasDstId))
  6. .partitionBy(edges.partitioner.get)
  7. // 生成新的边 RDD
  8. val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
  9. (ePartIter, shippedVertsIter) => ePartIter.map {
  10. case (pid, edgePartition) =>
  11. (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
  12. }
  13. })
  14. new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
  15. }

updateVertices 方法返回一个新的 ReplicatedVertexView,它更新了边分区中包含的顶点属性。我们看看它的实现过程。首先看 shipVertexAttributes 方法的调用。调用 shipVertexAttributes 方法会生成一个 VertexAttributeBlock,VertexAttributeBlock 包含当前分区的顶点属性,这些属性可以在特定的边分区使用。

  1. def shipVertexAttributes(
  2. shipSrc: Boolean, shipDst: Boolean): Iterator[(PartitionID, VertexAttributeBlock[VD])] = {
  3. Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
  4. val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64
  5. val vids = new PrimitiveVector[VertexId](initialSize)
  6. val attrs = new PrimitiveVector[VD](initialSize)
  7. var i = 0
  8. routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid =>
  9. if (isDefined(vid)) {
  10. vids += vid
  11. attrs += this(vid)
  12. }
  13. i += 1
  14. }
  15. // (边分区id, VertexAttributeBlock (顶点id, 属性))
  16. (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
  17. }
  18. }

获得新的顶点属性之后,我们就可以调用 updateVertices 更新边中顶点的属性了,如下面代码所示:

  1. edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))
  2. //更新EdgePartition的属性
  3. def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
  4. val newVertexAttrs = new Array[VD](vertexAttrs.length)
  5. System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
  6. while (iter.hasNext) {
  7. val kv = iter.next()
  8. //global2local获得顶点的本地index
  9. newVertexAttrs(global2local(kv._1)) = kv._2
  10. }
  11. new EdgePartition(
  12. localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
  13. activeSet)
  14. }

例子:将顶点的 2 个属性合并为 1 个属性(即将字符串合并)。

  1. scala> graph.vertices.collect.foreach(println _)
  2. (5,(franklin,prof))
  3. (2,(istoica,prof))
  4. (3,(rxin,student))
  5. (7,(jgonzal,postdoc))
  6. scala> graph.mapVertices{ case (vid, (attr1,attr2)) => attr1 + attr2 } // 或者 graph.mapVertices((VertexId, VD) => VD._1 + VD._2)
  7. res20: org.apache.spark.graphx.Graph[String,String] = org.apache.spark.graphx.impl.GraphImpl@4c819eab
  8. scala> res20.vertices.collect.foreach(println _)
  9. (5,franklinprof)
  10. (2,istoicaprof)
  11. (3,rxinstudent)
  12. (7,jgonzalpostdoc)

mapEdges

mapEdges 用来更新边属性**。对当前图每一条边应用提供的 map 函数来修改边的属性,返回一个新图。

  1. override def mapEdges[ED2: ClassTag](f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
  2. val newEdges = replicatedVertexView.edges
  3. .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
  4. new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
  5. }

相比于 mapVertices,mapEdges 显然要简单得多,它只需要根据方法 f 生成新的 EdgeRDD,然后再初始化即可。

例子:将边的属性都加一个前缀。

  1. scala> graph.edges.collect.foreach(println _)
  2. Edge(3,7,collab)
  3. Edge(5,3,advisor)
  4. Edge(2,5,colleague)
  5. Edge(5,7,pi)
  6. scala> graph.mapEdges(edge => "name:" + edge.attr)
  7. res29: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@72da828b
  8. scala> res29.edges.collect.foreach(println _)
  9. Edge(3,7,name:collab)
  10. Edge(5,3,name:advisor)
  11. Edge(2,5,name:colleague)
  12. Edge(5,7,name:pi)

mapTriplets

mapTriplets 用来更新边属性。与 mapEdges 不同的地方仅仅在于可以使用的作为 map 条件的东西多了邻近的顶点的属性**,最终改变的东西仍然是 edge 的属性。

  1. override def mapTriplets[ED2: ClassTag](
  2. f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2],
  3. tripletFields: TripletFields): Graph[VD, ED2] = {
  4. vertices.cache()
  5. replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
  6. val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
  7. part.map(f(pid, part.tripletIterator(tripletFields.useSrc, tripletFields.useDst)))
  8. }
  9. new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
  10. }

这段代码中,replicatedVertexView 调用 upgrade 方法修改当前的 ReplicatedVertexView,使调用者可以访问到指定级别的边信息(如仅仅可以读源顶点的属性)。

  1. def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
  2. // 判断传递级别
  3. val shipSrc = includeSrc && !hasSrcId
  4. val shipDst = includeDst && !hasDstId
  5. if (shipSrc || shipDst) {
  6. val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
  7. vertices.shipVertexAttributes(shipSrc, shipDst)
  8. .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
  9. includeSrc, includeDst, shipSrc, shipDst))
  10. .partitionBy(edges.partitioner.get)
  11. val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
  12. (ePartIter, shippedVertsIter) => ePartIter.map {
  13. case (pid, edgePartition) =>
  14. (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
  15. }
  16. })
  17. edges = newEdges
  18. hasSrcId = includeSrc
  19. hasDstId = includeDst
  20. }
  21. }

最后,用 f 处理边,生成新的 RDD,最后用新的数据初始化图。

例子:边属性添加前缀。

  1. scala> graph.edges.collect.foreach(println _)
  2. Edge(3,7,collab)
  3. Edge(5,3,advisor)
  4. Edge(2,5,colleague)
  5. Edge(5,7,pi)
  6. scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets
  7. res37: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = MapPartitionsRDD[80] at mapPartitions at GraphImpl.scala:48
  8. scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets.collect
  9. res39: Array[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),name:collab), ((5,(franklin,prof)),(3,(rxin,student)),name:advisor), ((2,(istoica,prof)),(5,(franklin,prof)),name:colleague), ((5,(franklin,prof)),(7,(jgonzal,postdoc)),name:pi))
  10. scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets.collect.foreach(println _)
  11. ((3,(rxin,student)),(7,(jgonzal,postdoc)),name:collab)
  12. ((5,(franklin,prof)),(3,(rxin,student)),name:advisor)
  13. ((2,(istoica,prof)),(5,(franklin,prof)),name:colleague)
  14. ((5,(franklin,prof)),(7,(jgonzal,postdoc)),name:pi)

5.2.4.5 结构操作

当前的 GraphX 仅仅支持一组简单的常用结构性操作。下面是基本的结构性操作列表:

  1. class Graph[VD, ED] {
  2. def reverse: Graph[VD, ED]
  3. def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
  4. vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  5. def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  6. def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
  7. }

下面分别介绍这四种函数的原理。

reverse

reverse 操作返回一个新的图,这个图的边的方向都是反转的**。例如,这个操作可以用来计算反转的 PageRank。因为反转操作没有修改顶点或者边的属性或者改变边的数量,所以我们可以在不移动或者复制数据的情况下有效地实现它。

  1. override def reverse: Graph[VD, ED] = {
  2. new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
  3. }
  4. def reverse(): ReplicatedVertexView[VD, ED] = {
  5. val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
  6. new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
  7. }
  8. // EdgePartition 中的 reverse
  9. def reverse: EdgePartition[ED, VD] = {
  10. val builder = new ExistingEdgePartitionBuilder[ED, VD](
  11. global2local, local2global, vertexAttrs, activeSet, size)
  12. var i = 0
  13. while (i < size) {
  14. val localSrcId = localSrcIds(i)
  15. val localDstId = localDstIds(i)
  16. val srcId = local2global(localSrcId)
  17. val dstId = local2global(localDstId)
  18. val attr = data(i)
  19. // 将源顶点和目标顶点换位置
  20. builder.add(dstId, srcId, localDstId, localSrcId, attr)
  21. i += 1
  22. }
  23. builder.toEdgePartition
  24. }

subgraph

subgraph 操作利用顶点和边的判断式(predicates),返回的图仅仅包含满足顶点判断式的顶点、满足边判断式的边以及满足顶点判断式的 triple**。subgraph 操作可以用于很多场景,如获取 感兴趣的顶点和边组成的图或者获取清除断开连接后的图。

  1. override def subgraph(
  2. epred: EdgeTriplet[VD, ED] => Boolean = x => true,
  3. vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
  4. vertices.cache()
  5. // 过滤 vertices,重用 partitione和索引
  6. val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
  7. // 过滤 triplets
  8. replicatedVertexView.upgrade(vertices, true, true)
  9. val newEdges = replicatedVertexView.edges.filter(epred, vpred)
  10. new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
  11. }
  12. // 该代码显示,subgraph 方法的实现分两步:先过滤 VertexRDD,然后再过滤 EdgeRDD。如上,过滤 VertexRDD 比较简单,我们重点看过滤 EdgeRDD 的过程。
  13. def filter(
  14. epred: EdgeTriplet[VD, ED] => Boolean,
  15. vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = {
  16. mapEdgePartitions((pid, part) => part.filter(epred, vpred))
  17. }
  18. // EdgePartition 中的 filter 方法
  19. def filter(
  20. epred: EdgeTriplet[VD, ED] => Boolean,
  21. vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
  22. val builder = new ExistingEdgePartitionBuilder[ED, VD](global2local, local2global, vertexAttrs, activeSet)
  23. var i = 0
  24. while (i < size) {
  25. // The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
  26. val localSrcId = localSrcIds(i)
  27. val localDstId = localDstIds(i)
  28. val et = new EdgeTriplet[VD, ED]
  29. et.srcId = local2global(localSrcId)
  30. et.dstId = local2global(localDstId)
  31. et.srcAttr = vertexAttrs(localSrcId)
  32. et.dstAttr = vertexAttrs(localDstId)
  33. et.attr = data(i)
  34. if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
  35. builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
  36. }
  37. i += 1
  38. }
  39. builder.toEdgePartition
  40. }

因为用户可以看到 EdgeTriplet 的信息,所以我们不能重用 EdgeTriplet,需要重新创建一个,然后在用 epred 函数处理。

例子:

  1. scala> graph.subgraph(Triplet => Triplet.attr.startsWith("c"), (VertexId, VD) => VD._2.startsWith("pro"))
  2. res3: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@49db5438
  3. scala> res3.vertices.collect
  4. res4: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((2,(istoica,prof)), (5,(franklin,prof)))
  5. scala> res3.edges.collect
  6. res5: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(2,5,colleague))

mask

mask 操作构造一个子图,类似于交集,这个子图包含输入图中包含的顶点和边**。它的实现很简单,顶点和边均做 inner join 操作即可。这个操作可以和 subgraph 操作相结合,基于另外一个相关图的特征去约束一个图。

  1. override def mask[VD2: ClassTag, ED2: ClassTag] (
  2. other: Graph[VD2, ED2]): Graph[VD, ED] = {
  3. val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
  4. val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
  5. new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
  6. }

groupEdges

groupEdges 操作合并多重图中的并行边 (如顶点对之间重复的边),并传入一个函数来合并两个边的属性。在大量的应用程序中,并行的边可以合并(它们的权重合并)为一条边从而降低图的大小。

  1. override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
  2. val newEdges = replicatedVertexView.edges.mapEdgePartitions(
  3. (pid, part) => part.groupEdges(merge))
  4. new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
  5. }
  6. def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
  7. val builder = new ExistingEdgePartitionBuilder[ED, VD](
  8. global2local, local2global, vertexAttrs, activeSet)
  9. var currSrcId: VertexId = null.asInstanceOf[VertexId]
  10. var currDstId: VertexId = null.asInstanceOf[VertexId]
  11. var currLocalSrcId = -1
  12. var currLocalDstId = -1
  13. var currAttr: ED = null.asInstanceOf[ED]
  14. // 迭代处理所有的边
  15. var i = 0
  16. while (i < size) {
  17. // 如果源顶点和目的顶点都相同
  18. if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
  19. // 合并属性
  20. currAttr = merge(currAttr, data(i))
  21. } else {
  22. // This edge starts a new run of edges
  23. if (i > 0) {
  24. // 添加到 builder 中
  25. builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
  26. }
  27. // Then start accumulating for a new run
  28. currSrcId = srcIds(i)
  29. currDstId = dstIds(i)
  30. currLocalSrcId = localSrcIds(i)
  31. currLocalDstId = localDstIds(i)
  32. currAttr = data(i)
  33. }
  34. i += 1
  35. }
  36. if (size > 0) {
  37. builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
  38. }
  39. builder.toEdgePartition
  40. }

在图构建那章我们说明过,存储的边按照源顶点 id 排过序,所以上面的代码可以通过一次迭代完成对所有相同边的处理。

应用举例
**

  1. // Create an RDD for the vertices
  2. val users: RDD[(VertexId, (String, String))] =
  3. sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
  4. (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
  5. (4L, ("peter", "student"))))
  6. // Create an RDD for edges
  7. val relationships: RDD[Edge[String]] =
  8. sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
  9. Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
  10. Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
  11. // Define a default user in case there are relationship with missing user
  12. val defaultUser = ("John Doe", "Missing")
  13. // Build the initial Graph
  14. val graph = Graph(users, relationships, defaultUser)
  15. // Notice that there is a user 0 (for which we have no information) connected to users
  16. // 4 (peter) and 5 (franklin).
  17. graph.triplets.map(
  18. triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  19. ).collect.foreach(println(_))
  20. // Remove missing vertices as well as the edges to connected to them
  21. val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
  22. // The valid subgraph will disconnect users 4 and 5 by removing user 0
  23. validGraph.vertices.collect.foreach(println(_))
  24. validGraph.triplets.map(
  25. triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  26. ).collect.foreach(println(_))
  27. // Run Connected Components
  28. val ccGraph = graph.connectedComponents() // No longer contains missing field
  29. // Remove missing vertices as well as the edges to connected to them
  30. val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
  31. // Restrict the answer to the valid subgraph
  32. val validCCGraph = ccGraph.mask(validGraph)

5.2.4.6 顶点关联操作

在许多情况下,有必要将外部数据加入到图中。例如,我们可能有额外的用户属性需要合并到已有的图中或者我们可能想从一个图中取出顶点特征加入到另外一个图中。这些任务可以用 join 操作完成。主要的 join 操作如下所示:

  1. class Graph[VD, ED] {
  2. def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
  3. def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
  4. }

joinVertices 操作 join 输入 RDD 和顶点,返回一个新的带有顶点特征的图。这些特征是通过在连接顶点的结果上使用用户定义的 map 函数获得的。没有匹配的顶点保留其原始值。下面详细地来分析这两个函数。

joinVertices

joinVertices 来 join 相同 ID 的顶点数据**。

  1. def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED] = {
  2. val uf = (id: VertexId, data: VD, o: Option[U]) => {
  3. o match {
  4. case Some(u) => mapFunc(id, data, u)
  5. case None => data
  6. }
  7. }
  8. graph.outerJoinVertices(table)(uf)
  9. }

我们可以看到,joinVertices 的实现是通过 outerJoinVertices 来实现的。这是因为 join 本来就是 outer join 的一种特例。

例子:

  1. scala> graph.vertices.collect.foreach(println _)
  2. (5,(franklin,prof))
  3. (2,(istoica,prof))
  4. (3,(rxin,student))
  5. (7,(jgonzal,postdoc))
  6. scala> val join = sc.parallelize(Array((3L, "123")))
  7. join: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[137] at parallelize at <console>:31
  8. scala> graph.joinVertices(join)((VertexId, VD, U) => (VD._1, VD._2 + U))
  9. res33: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@4e5b8728
  10. scala> res33.vertices.collect.foreach(println _)
  11. (7,(jgonzal,postdoc))
  12. (2,(istoica,prof))
  13. (3,(rxin,student123))
  14. (5,(franklin,prof))

outerJoinVertices

跟 JOIN 类似,只不过 table 中没有的顶点默认值为 None**。

  1. override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
  2. (other: RDD[(VertexId, U)])
  3. (updateF: (VertexId, VD, Option[U]) => VD2)
  4. (implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
  5. if (eq != null) {
  6. vertices.cache()
  7. // updateF preserves type, so we can use incremental replication
  8. val newVerts = vertices.leftJoin(other)(updateF).cache()
  9. val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
  10. val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
  11. .updateVertices(changedVerts)
  12. new GraphImpl(newVerts, newReplicatedVertexView)
  13. } else {
  14. // updateF does not preserve type, so we must re-replicate all vertices
  15. val newVerts = vertices.leftJoin(other)(updateF)
  16. GraphImpl(newVerts, replicatedVertexView.edges)
  17. }
  18. }

通过以上的代码我们可以看到,如果 updateF 不改变类型,我们只需要创建改变的顶点即可,否则我们要重新创建所有的顶点。我们讨论不改变类型的情况。 这种情况分三步。

(1)修改顶点属性值。

  1. val newVerts = vertices.leftJoin(other)(updateF).cache()

这一步会用顶点 RDD join 传入的 RDD,然后用 updateF 作用 joinRDD 中的所有顶点,改变它们的值。

(2)找到发生改变的顶点。

  1. val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)

(3)更新 newReplicatedVertexView 中边分区中的顶点属性。

  1. val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]].updateVertices(changedVerts)

例子:

  1. scala> graph.vertices.collect.foreach(println _)
  2. (5,(franklin,prof))
  3. (2,(istoica,prof))
  4. (3,(rxin,student))
  5. (7,(jgonzal,postdoc))
  6. scala> graph.outerJoinVertices(join)((VertexId, VD, U) => (VD._1, VD._2 + U))
  7. res35: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@7c542a14
  8. scala> res35.vertices.collect.foreach(println _)
  9. (7,(jgonzal,postdocNone))
  10. (2,(istoica,profNone))
  11. (3,(rxin,studentSome(123)))
  12. (5,(franklin,profNone))

5.2.4.7 聚合操作

GraphX 中提供的聚合操作有 aggregateMessages、collectNeighborIds 和 collectNeighbors 三个,其中 aggregateMessages 在 GraphImpl 中实现,collectNeighborIds 和 collectNeighbors 在 GraphOps 中实现。下面分别介绍这几个方法。

aggregateMessages

aggregateMessages 接口:aggregateMessages 是 GraphX 最重要的 API,用于替换 mapReduceTriplets。目前 mapReduceTriplets 最终也是通过 aggregateMessages 来实现的。它主要功能是向邻边发消息,合并邻边收到的消息,返回 messageRDD

aggregateMessages 的接口如下:

  1. def aggregateMessages[A: ClassTag](
  2. sendMsg: EdgeContext[VD, ED, A] => Unit,
  3. mergeMsg: (A, A) => A,
  4. tripletFields: TripletFields = TripletFields.All): VertexRDD[A] = {
  5. aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
  6. }

该接口有三个参数,分别为发消息函数、合并消息函数以及发消息的方向。

sendMsg: 发消息函数

  1. private def sendMsg(ctx: EdgeContext[KCoreVertex, Int, Map[Int, Int]]): Unit = {
  2. ctx.sendToDst(Map(ctx.srcAttr.preKCore -> -1, ctx.srcAttr.curKCore -> 1))
  3. ctx.sendToSrc(Map(ctx.dstAttr.preKCore -> -1, ctx.dstAttr.curKCore -> 1))
  4. }

mergeMsg:合并消息函数

该函数用于在 Map 阶段每个 edge 分区中每个点收到的消息合并,并且它还用于 reduce 阶段,合并不同分区的消息。合并 vertexId 相同的消息。

tripletFields:定义发消息的方向

aggregateMessages 处理流程:aggregateMessages 方法分为 Map 和 Reduce 两个阶段,下面我们分别就这两个阶段说明。

Map 阶段
**
从入口函数进入 aggregateMessagesWithActiveSet 函数,该函数首先使用 VertexRDD[VD] 更新 replicatedVertexView, 只更新其中 vertexRDD 中 attr 对象。如构建图中介绍的,replicatedVertexView 是点和边的视图,点的属性有变化,要更新边中包含的点的 attr。

  1. replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
  2. val view = activeSetOpt match {
  3. case Some((activeSet, _)) =>
  4. // 返回只包含活跃顶点的 replicatedVertexView
  5. replicatedVertexView.withActiveSet(activeSet)
  6. case None =>
  7. replicatedVertexView
  8. }

程序然后会对 replicatedVertexView 的 edgeRDD 做 mapPartitions 操作,所有的操作都在每个边分区的迭代中完成,如下面的代码:

  1. val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
  2. case (pid, edgePartition) =>
  3. // 选择 scan 方法
  4. val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
  5. activeDirectionOpt match {
  6. case Some(EdgeDirection.Both) =>
  7. if (activeFraction < 0.8) {
  8. edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
  9. EdgeActiveness.Both)
  10. } else {
  11. edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
  12. EdgeActiveness.Both)
  13. }
  14. case Some(EdgeDirection.Either) =>
  15. edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
  16. EdgeActiveness.Either)
  17. case Some(EdgeDirection.Out) =>
  18. if (activeFraction < 0.8) {
  19. edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
  20. EdgeActiveness.SrcOnly)
  21. } else {
  22. edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
  23. EdgeActiveness.SrcOnly)
  24. }
  25. case Some(EdgeDirection.In) =>
  26. edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
  27. EdgeActiveness.DstOnly)
  28. case _ => // None
  29. edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
  30. EdgeActiveness.Neither)
  31. }
  32. })

在分区内,根据 activeFraction 的大小选择是进入 aggregateMessagesEdgeScan 还是 aggregateMessagesIndexScan 处理。aggregateMessagesEdgeScan 会顺序地扫描所有的边, 而 aggregateMessagesIndexScan 会先过滤源顶点索引,然后再扫描。我们重点去分析 aggregateMessagesEdgeScan。

  1. def aggregateMessagesEdgeScan[A: ClassTag](
  2. sendMsg: EdgeContext[VD, ED, A] => Unit,
  3. mergeMsg: (A, A) => A,
  4. tripletFields: TripletFields,
  5. activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
  6. var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
  7. var i = 0
  8. while (i < size) {
  9. val localSrcId = localSrcIds(i)
  10. val srcId = local2global(localSrcId)
  11. val localDstId = localDstIds(i)
  12. val dstId = local2global(localDstId)
  13. val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD]
  14. val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
  15. ctx.set(srcId, dstId, localSrcId, localDstId, srcAttr, dstAttr, data(i))
  16. sendMsg(ctx)
  17. i += 1
  18. }
  19. }

该方法由两步组成,分别是获得顶点相关信息,以及发送消息。

获取顶点相关信息:在前文介绍 edge partitio n 时,我们知道它包含 localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs 这几个重要的数据结构。其中 localSrcIds, localDstIds 分别表示源顶点、目的顶点在当前分区中的索引。所以我们可以遍历 localSrcIds,根据其下标去 localSrcIds 中拿到 srcId 在全局 local2global 中的索引,最后拿到 srcId。通过 vertexAttrs 拿到顶点属性。通过 data 拿到边属性。

发送消息:发消息前会根据接口中定义的 tripletFields,拿到发消息的方向。发消息的过程就是遍历到一条边,向 localSrcIds/localDstIds 中添加数据,如果 localSrcIds/localDstIds 中已经存在该数据,则执行合并函数 mergeMsg。

  1. override def sendToSrc(msg: A) {
  2. send(_localSrcId, msg)
  3. }
  4. override def sendToDst(msg: A) {
  5. send(_localDstId, msg)
  6. }
  7. @inline private def send(localId: Int, msg: A) {
  8. if (bitset.get(localId)) {
  9. aggregates(localId) = mergeMsg(aggregates(localId), msg)
  10. } else {
  11. aggregates(localId) = msg
  12. bitset.set(localId)
  13. }
  14. }

每个点之间在发消息的时候是独立的,即:点单纯根据方向,向以相邻点的以 localId 为下标的数组中插数据,互相独立,可以并行运行。Map 阶段最后返回消息 RDD messages: RDD[(VertexId, VD2)]。

Map 阶段的执行流程如下例所示:

5.2 Spark GraphX 解析 - 图11

Reduce 阶段
**
Reduce 阶段的实现就是调用下面的代码:

  1. vertices.aggregateUsingIndex(preAgg, mergeMsg)
  2. override def aggregateUsingIndex[VD2: ClassTag](
  3. messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
  4. val shuffled = messages.partitionBy(this.partitioner.get)
  5. val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
  6. thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
  7. }
  8. this.withPartitionsRDD[VD2](parts)
  9. }

面的代码通过两步实现:

  1. 对 messages 重新分区,分区器使用 VertexRDD 的 partitioner。然后使用 zipPartitions 合并两个分区。
  2. 对等合并 attr, 聚合函数使用传入的 mergeMs g 函数。

根据传参,我们知道上面的代码迭代的是 messagePartition,并不是每个节点都会收到消息,所以 messagePartition 集合最小,迭代速度会快。

  1. def aggregateUsingIndex[VD2: ClassTag](
  2. iter: Iterator[Product2[VertexId, VD2]],
  3. reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
  4. val newMask = new BitSet(self.capacity)
  5. val newValues = new Array[VD2](self.capacity)
  6. iter.foreach { product =>
  7. val vid = product._1
  8. val vdata = product._2
  9. val pos = self.index.getPos(vid)
  10. if (pos >= 0) {
  11. if (newMask.get(pos)) {
  12. newValues(pos) = reduceFunc(newValues(pos), vdata)
  13. } else { // otherwise just store the new value
  14. newMask.set(pos)
  15. newValues(pos) = vdata
  16. }
  17. }
  18. }
  19. this.withValues(newValues).withMask(newMask)
  20. }

这段代码表示,我们根据 vetexId 从 index 中取到其下标 pos,再根据下标,从 values 中取到 attr,存在 attr 就用 mergeMsg 合并 attr,不存在就直接赋值。

Reduce 阶段的过程如下图所示:

5.2 Spark GraphX 解析 - 图12

举例:下面的例子计算比用户年龄大的追随者(即 followers)的平均年龄。

  1. // Import random graph generation library
  2. import org.apache.spark.graphx.util.GraphGenerators
  3. // Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
  4. val graph: Graph[Double, Int] =
  5. GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
  6. // Compute the number of older followers and their total age
  7. val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  8. triplet => { // Map Function
  9. if (triplet.srcAttr > triplet.dstAttr) {
  10. // Send message to destination vertex containing counter and age
  11. triplet.sendToDst(1, triplet.srcAttr)
  12. }
  13. },
  14. // Add counter and age
  15. (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
  16. )
  17. // Divide total age by number of older followers to get average age of older followers
  18. val avgAgeOfOlderFollowers: VertexRDD[Double] =
  19. olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
  20. // Display the results
  21. avgAgeOfOlderFollowers.collect.foreach(println(_))

collectNeighbors

该方法的作用是收集每个顶点的邻居顶点的顶点 id 和顶点属性。需要指定方向**。

  1. def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
  2. val nbrs = edgeDirection match {
  3. case EdgeDirection.Either =>
  4. graph.aggregateMessages[Array[(VertexId, VD)]](
  5. ctx => {
  6. ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
  7. ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
  8. },
  9. (a, b) => a ++ b, TripletFields.All)
  10. case EdgeDirection.In =>
  11. graph.aggregateMessages[Array[(VertexId, VD)]](
  12. ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
  13. (a, b) => a ++ b, TripletFields.Src)
  14. case EdgeDirection.Out =>
  15. graph.aggregateMessages[Array[(VertexId, VD)]](
  16. ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
  17. (a, b) => a ++ b, TripletFields.Dst)
  18. case EdgeDirection.Both =>
  19. throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
  20. "EdgeDirection.Either instead.")
  21. }
  22. graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
  23. nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
  24. }
  25. }

从上面的代码中,第一步是根据 EdgeDirection 来确定调用哪个 aggregateMessages 实现聚合操作。我们用满足条件 EdgeDirection.Either 的情况来说明。可以看到 aggregateMessages 的方式消息的函数为:

  1. ctx => {
  2. ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
  3. ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
  4. }

这个函数在处理每条边时都会同时向源顶点和目的顶点发送消息,消息内容分别为(目的顶点 id,目的顶点属性)、(源顶点 id,源顶点属性)。为什么会这样处理呢? 我们知道,每条边都由两个顶点组成,对于这个边,我需要向源顶点发送目的顶点的信息来记录它们之间的邻居关系,同理向目的顶点发送源顶点的信息来记录它们之间的邻居关系。

Merge 函数是一个集合合并操作,它合并同同一个顶点对应的所有目的顶点的信息。如下所示:

  1. (a, b) => a ++ b

通过 aggregateMessages 获得包含邻居关系信息的 VertexRDD 后,把它和现有的 vertices 作 join 操作,得到每个顶点的邻居消息。

collectNeighborIds

该方法的作用是收集每个顶点的邻居顶点的顶点 id。它的实现和 collectNeighbors 非常相同。需要指定方向**。

  1. def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
  2. val nbrs =
  3. if (edgeDirection == EdgeDirection.Either) {
  4. graph.aggregateMessages[Array[VertexId]](
  5. ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },
  6. _ ++ _, TripletFields.None)
  7. } else if (edgeDirection == EdgeDirection.Out) {
  8. graph.aggregateMessages[Array[VertexId]](
  9. ctx => ctx.sendToSrc(Array(ctx.dstId)),
  10. _ ++ _, TripletFields.None)
  11. } else if (edgeDirection == EdgeDirection.In) {
  12. graph.aggregateMessages[Array[VertexId]](
  13. ctx => ctx.sendToDst(Array(ctx.srcId)),
  14. _ ++ _, TripletFields.None)
  15. } else {
  16. throw new SparkException("It doesn't make sense to collect neighbor ids without a " +
  17. "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
  18. }
  19. graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
  20. nbrsOpt.getOrElse(Array.empty[VertexId])
  21. }
  22. }

和 collectNeighbors 的实现不同的是,aggregateMessages 函数中的 sendMsg 函数只发送顶点 Id 到源顶点和目的顶点。其它的实现基本一致。

  1. ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) }

5.2.4.8 缓存操作

在 Spark 中,RDD 默认是不缓存的。为了避免重复计算,当需要多次利用它们时,我们必须显示地缓存它们。GraphX 中的图也有相同的方式。当利用到图多次时,确保首先访问 Graph.cache() 方法。

在迭代计算中,为了获得最佳的性能,不缓存可能是必须的。默认情况下,缓存的 RDD 和图会一直保留在内存中直到因为内存压力迫使它们以 LRU 的顺序删除。对于迭代计算,先前的迭代的中间结果将填充到缓存中。虽然它们最终会被删除,但是保存在内存中的不需要的数据将会减慢垃圾回收。只有中间结果不需要,不缓存它们是更高效的。然而,因为图是由多个 RDD 组成的,正确的不持久化它们是困难的。对于迭代计算,我们建议使用 Pregel API,它可以正确的不持久化中间结果

GraphX 中的缓存操作有 cache, persist, unpersistunpersistVertices。它们的接口分别是:

  1. def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  2. def cache(): Graph[VD, ED]
  3. def unpersist(blocking: Boolean = true): Graph[VD, ED]
  4. def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

5.2.5 Pregel API


在Hadoop兴起之后,google 又发布了三篇研究论文,分别阐述了了 Caffeine、Pregel、Dremel 三种技术,这三种技术也被成为 google 的新“三驾马车”,
其中的 Pregel 是 google 提出的用于大规模分布式图计算框架。主要用于图遍历(BFS)、最短路径(SSSP)、PageRank计算等等计算

图本质上是一种递归的数据结构,可以使用 Spark GraphX 的 PregelAPI 接口对图数据进行批量计算

图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性直到满足某个确定的条件一系列的图并发 (graph-parallel) 抽象已经被提出来用来表达这些迭代算法。GraphX 公开了一个类似 Pregel 的操作,它是广泛使用的 Pregel 和 GraphLab 抽象的一个融合。

GraphX 中实现的这个更高级的 Pregel 操作是一个约束到图拓扑的批量同步(bulk-synchronous)并行消息抽象。Pregel 操作者执行一系列的超步(super steps),在这些步骤中,顶点从之前的超步中接收进入 (inbound) 消息的总和,为顶点属性计算一个新的值,然后在以后的超步中发送消息到邻居顶点。不像 Pregel 而更像 GraphLab,消息通过边 triplet 的一个函数被并行计算,消息的计算既会访问源顶点特征也会访问目的顶点特征。在超步中,没有收到消息的顶点会被跳过。当没有消息遗留时,Pregel 操作停止迭代并返回最终的图。

注意:与标准的 Pregel 实现不同的是,GraphX 中的顶点仅仅能发送信息给邻居顶点,并且可以利用用户自定义的消息函数并行地构造消息。这些限制允许对 GraphX 进行额外的优化。

下面的代码是 pregel 的具体实现:

  1. def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
  2. (graph: Graph[VD, ED],
  3. initialMsg: A,
  4. maxIterations: Int = Int.MaxValue,
  5. activeDirection: EdgeDirection = EdgeDirection.Either)
  6. (vprog: (VertexId, VD, A) => VD,
  7. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
  8. mergeMsg: (A, A) => A): Graph[VD, ED] =
  9. {
  10. var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
  11. // 计算消息
  12. var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
  13. var activeMessages = messages.count()
  14. // 迭代
  15. var prevG: Graph[VD, ED] = null
  16. var i = 0
  17. while (activeMessages > 0 && i < maxIterations) {
  18. // 接收消息并更新顶点
  19. prevG = g
  20. g = g.joinVertices(messages)(vprog).cache()
  21. val oldMessages = messages
  22. // 发送新消息
  23. messages = g.mapReduceTriplets(
  24. sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
  25. activeMessages = messages.count()
  26. i += 1
  27. }
  28. g
  29. }

5.2.5.1 pregel 计算模型

**
图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。

Pregel 计算模式中,输入是一个有向图,该有向图的每一个顶点都有一个相应的独一无二的顶点 id (vertex identifier)。每一个顶点都有一些属性,这些属性可以被修改,其初始值由用户定义。每一条有向边都和其源顶点关联,并且也拥有一些用户定义的属性和值,并同时还记录了其目的顶点的 ID。

一个典型的 Pregel 计算过程如下:

  1. 读取输入。
  2. 初始化该图。
  3. 当图被初始化好后,运行一系列的 supersteps,每一次 superstep 都在全局的角度上独立运行,直到整个计算结束。
  4. 输出结果。

image.png

在 pregel 中顶点有两种状态:活跃状态(active)和不活跃状态(halt)如果某一个顶点接收到了消息并且需要执行计算那么它就会将自己设置为活跃状态。如果没有接收到消息或者接收到消息,但是发现自己不需要进行计算,那么就会将自己设置为不活跃状态。

Pregel中的计算分为一个个“superstep”,这些”superstep”中执行流程如下:

  1. 首先输入图数据,并进行初始化。
  2. 将每个节点均设置为活跃状态。每个节点根据预先定义好的 sendMsg 函数,以及方向(边的正向、反向或者双向)向周围的节点发送信息。
  3. 每个节点接收信息如果发现需要计算则根据预先定义好的计算函数对接收到的信息进行处理,这个过程可能会更新自己的信息。如果接收到消息但是不需要计算则将自己状态设置为不活跃。
  4. 每个活跃节点按照 sendMsg 函数向周围节点发送消息。
  5. 下一个 sendMsg 开始,像步骤3一样继续计算,直到所有节点都变成不活跃状态,整个计算过程结束

Pregel 计算模型中有三个重要的函数,分别是 vertexProgram、sendMessage 和 messageCombiner:

  • vertexProgram:用户定义的顶点运行程序。它作用于每一个顶点,负责接收进来的信息,并计算新的顶点值。
  • sendMsg:发送消息。
  • mergeMsg:合并消息。

我们具体分析它的实现。根据代码可以知道,这个实现是一个迭代的过程。在开始迭代之前,先完成一些初始化操作:

  1. var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
  2. // 计算消息
  3. var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
  4. var activeMessages = messages.count()

程序首先用 vprog 函数处理图中所有的顶点,生成新的图。然后用生成的图调用聚合操作(mapReduceTriplets,实际的实现是我们前面章节讲到的 aggregateMessagesWithActiveSet 函数)获取聚合后的消息。 activeMessages 指 messages 这个 VertexRDD 中的顶点数。

下面就开始迭代操作了。在迭代内部,分为二步。

(1)接收消息,并更新顶点。

  1. g = g.joinVertices(messages)(vprog).cache()
  2. // joinVertices 的定义
  3. def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
  4. : Graph[VD, ED] = {
  5. val uf = (id: VertexId, data: VD, o: Option[U]) => {
  6. o match {
  7. case Some(u) => mapFunc(id, data, u)
  8. case None => data
  9. }
  10. }
  11. graph.outerJoinVertices(table)(uf)
  12. }

这一步实际上是使用 outerJoinVertices 来更新顶点属性。outerJoinVertices 在关联操作中有详细介绍。

(2)发送新消息。

  1. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()

注意:在上面的代码中,mapReduceTriplets 多了一个参数 Some((oldMessages, activeDirection))。这个参数的作用是:它使我们在发送新的消息时,会忽略掉那些两端都没有接收到消息的边,减少计算量。

image.png

从左到右依次为第 1、2、3、4 个节点,圈中的数字为节点的属性值,实线代表节点之间的边,虚线是不同超步之间的信息发送,带阴影的圈是不活跃的节点。我们的目的是让图中所有节点的属性值都变成最大的那个属性值。

Pregel 计算过程:

  1. superstep 0:首先所有节点设置为活跃,并且沿正向边向相邻节点发送自身的属性值。
  2. Superstep 1:所有节点接收到信息,节点 1 和节点 4 发现自己接受到的值比自己的大,所以更新自己的节点(这个过程可以看做是计算),并保持活跃。节点 2 和 3 没有接收到比自己大的值,所以不计算、不更新。活跃节点继续向相邻节点发送当前自己的属性值。
  3. Superstep 2:节点 3 接受信息并计算,其它节点没接收到信息或者接收到但是不计算,所以接下来只有节点 3 活跃并发送消息。
  4. Superstep 3:节点 2 和 4 接受到消息但是不计算所以不活跃,所有节点均不活跃,所以计算结束。

5.2.5.2 pregel 实现最短路径

**
最短路径的算法解析:

  1. Superstep 0:首先将所有除了源顶点的其它顶点的属性值设置为无穷大,源顶点的属性值设置为 0,然后对所有顶点用 initialmsg 进行初始化,实际上这次初始化并没有改变什么。
  2. Superstep 1:这一步执行完后图中满足条件的 attr 都成了 1 而且成为获跃节点,其它点的 attr 不变同时变成不活跃节点。活跃结点根据 triplet.srcAttr+triplet.attr< triplet.dstAttr 继续发消息,mergeMsg 函数会对发送到同一节点的多个消息进行聚合,聚合的结果就是最小的那个值。
  3. Superstep 2:所有收到消息的节点比较自己的 attr 和发过来的 attr,将较小的值作为自己的 attr。然后自己成为活节点继续向周围的节点发送 attr+1 这个消息,然后再聚合。直到没有节点的 attr 被更新,不再满足活跃节点数为大于 0 且没有达到最大允许迭代次数。这时就得到最短路径了,这个路径值保存在其它节点的 attr 中。

代码如下:

  1. import org.apache.spark.graphx._
  2. import org.apache.spark.graphx.util.GraphGenerators
  3. val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
  4. val sourceId: VertexId = 42 // The ultimate source
  5. // 初始化图
  6. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
  7. val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  8. (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  9. triplet => { // Send Message
  10. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
  11. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
  12. } else {
  13. Iterator.empty
  14. }
  15. },
  16. (a,b) => math.min(a,b) // Merge Message
  17. )
  18. println(sssp.vertices.collect.mkString("\n"))

上面的例子中,Vertex Program 函数定义如下:

  1. (id, dist, newDist) => math.min(dist, newDist)

这个函数的定义显而易见,当两个消息来的时候,取它们当中路径的最小值。同理 Merge Message 函数也是同样的含义。Send Message 函数中,会首先比较 triplet.srcAttr + triplet.attr 和 triplet.dstAttr,即比较加上边的属性后,这个值是否小于目的节点的属性,如果小于,则发送消息到目的顶点。

5.2.6 GraphX 实例

下图中有 6 个人,每个人有名字和年龄,这些人根据社会关系形成 8 条边,每条边有其属性。在以下例子演示中将构建顶点、边和图,打印图的属性、转换操作、结构操作、连接操作、聚合操作,并结合实际要求进行演示。

5.2 Spark GraphX 解析 - 图15

程序代码如下:

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.graphx.{Edge, _}
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object Practice extends App {
  6. // 屏蔽日志
  7. Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  8. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  9. //设定一个 SparkConf
  10. val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local[4]")
  11. val sc = new SparkContext(conf)
  12. // 初始化顶点集合
  13. val vertexArray = Array(
  14. (1L, ("Alice", 28)),
  15. (2L, ("Bob", 27)),
  16. (3L, ("Charlie", 65)),
  17. (4L, ("David", 42)),
  18. (5L, ("Ed", 55)),
  19. (6L, ("Fran", 50))
  20. )
  21. // 创建顶点的 RDD 表示
  22. val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
  23. // 初始化边的集合
  24. val edgeArray = Array(
  25. Edge(2L, 1L, 7),
  26. Edge(2L, 4L, 2),
  27. Edge(3L, 2L, 4),
  28. Edge(3L, 6L, 3),
  29. Edge(4L, 1L, 1),
  30. Edge(2L, 5L, 2),
  31. Edge(5L, 3L, 8),
  32. Edge(5L, 6L, 3)
  33. )
  34. // 创建边的 RDD 表示
  35. val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
  36. // 创建一个图
  37. val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
  38. //*************************** 图的属性 ***************************
  39. println("属性演示")
  40. println("**********************************************************")
  41. println("找出图中年龄大于30的顶点:")
  42. graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
  43. case (id, (name, age)) => println(s"$name is $age")
  44. }
  45. println
  46. println("找出图中属性大于 5 的边:")
  47. graph.edges.filter(e => e.attr > 5).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  48. println
  49. // triplets 操作,((srcId, srcAttr), (dstId, dstAttr), attr)
  50. println("列出边属性 >5 的 tripltes:")
  51. for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) {
  52. println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
  53. }
  54. println
  55. // degrees 操作
  56. println("找出图中最大的出度、入度、度数:")
  57. def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  58. if (a._2 > b._2) a else b
  59. }
  60. println("max of outDegrees:" + graph.outDegrees.reduce(max) + " max of inDegrees:" + graph.inDegrees.reduce(max) + " max of Degrees:" + graph.degrees.reduce(max))
  61. println
  62. //*************************** 转换操作 ****************************************
  63. println("转换操作")
  64. println("**********************************************************")
  65. println("顶点的转换操作,顶点age + 10:")
  66. graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)) }.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
  67. println
  68. println("边的转换操作,边的属性*2:")
  69. graph.mapEdges(e => e.attr * 2).edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  70. println
  71. println("三元组的转换操作,边的属性为端点的age相加:")
  72. graph.mapTriplets(tri => tri.srcAttr._2 * tri.dstAttr._2).triplets.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  73. println
  74. //*************************** 结构操作 ****************************************
  75. println("结构操作")
  76. println("**********************************************************")
  77. println("顶点年纪 >30 的子图:")
  78. val subGraph = graph.subgraph(vpred = (id, vd) => vd._2 >= 30)
  79. println("子图所有顶点:")
  80. subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
  81. println
  82. println("子图所有边:")
  83. subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  84. println
  85. println("反转整个图:")
  86. val reverseGraph = graph.reverse
  87. println("子图所有顶点:")
  88. reverseGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
  89. println
  90. println("子图所有边:")
  91. reverseGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  92. println
  93. //*************************** 连接操作 ****************************************
  94. println("连接操作")
  95. println("**********************************************************")
  96. val inDegrees: VertexRDD[Int] = graph.inDegrees
  97. case class User(name: String, age: Int, inDeg: Int, outDeg: Int)
  98. // 创建一个新图,顶类点VD的数据型为 User,并从 graph 做类型转换
  99. val initialUserGraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0) }
  100. // initialUserGraph 与 inDegrees、outDegrees(RDD)进行连接,并修改 initialUserGraph 中 inDeg 值、outDeg 值
  101. val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
  102. case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)
  103. }.outerJoinVertices(initialUserGraph.outDegrees) {
  104. case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0))
  105. }
  106. println("连接图的属性:")
  107. userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg} outDeg: ${v._2.outDeg}"))
  108. println
  109. println("出度和入读相同的人员:")
  110. userGraph.vertices.filter {
  111. case (id, u) => u.inDeg == u.outDeg
  112. }.collect.foreach {
  113. case (id, property) => println(property.name)
  114. }
  115. println
  116. //*************************** 聚合操作 ***************************
  117. println("聚合操作")
  118. println("**********************************************************")
  119. println("collectNeighbors:获取当前节点source节点的id和属性")
  120. graph.collectNeighbors(EdgeDirection.In).collect.foreach(v => {
  121. println(s"id: ${v._1}");
  122. for (arr <- v._2) {
  123. println(s" ${arr._1} (name: ${arr._2._1} age: ${arr._2._2})")
  124. }
  125. })
  126. println("aggregateMessages版本:")
  127. graph.aggregateMessages[Array[(VertexId, (String, Int))]](ctx => ctx.sendToDst(Array((ctx.srcId.toLong, (ctx.srcAttr._1, ctx.srcAttr._2)))), _ ++ _).collect.foreach(v => {
  128. println(s"id: ${v._1}");
  129. for (arr <- v._2) {
  130. println(s" ${arr._1} (name: ${arr._2._1} age: ${arr._2._2})")
  131. }
  132. })
  133. println("聚合操作")
  134. println("**********************************************************")
  135. println("找出年纪最大的追求者:")
  136. val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)](
  137. // 将源顶点的属性发送给目标顶点,map 过程
  138. ctx => ctx.sendToDst((ctx.srcAttr.name, ctx.srcAttr.age)),
  139. // 得到最大追求者,reduce 过程
  140. (a, b) => if (a._2 > b._2) a else b
  141. )
  142. userGraph.vertices.leftJoin(oldestFollower) { (id, user, optOldestFollower) =>
  143. optOldestFollower match {
  144. case None => s"${user.name} does not have any followers."
  145. case Some((name, age)) => s"${name} is the oldest follower of ${user.name}."
  146. }
  147. }.collect.foreach { case (id, str) => println(str) }
  148. println
  149. //*************************** 实用操作 ***************************
  150. println("聚合操作")
  151. println("**********************************************************")
  152. val sourceId: VertexId = 5L // 定义源点
  153. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
  154. initialGraph.triplets.collect().foreach(println)
  155. println("找出5到各顶点的最短距离:")
  156. val sssp = initialGraph.pregel(Double.PositiveInfinity, Int.MaxValue, EdgeDirection.Out)(
  157. (id, dist, newDist) => {
  158. println("||||" + id);
  159. math.min(dist, newDist)
  160. },
  161. triplet => { // 计算权重
  162. println(">>>>" + triplet.srcId)
  163. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
  164. // 发送成功
  165. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
  166. } else {
  167. // 发送不成功
  168. Iterator.empty
  169. }
  170. },
  171. (a, b) => math.min(a, b) // 当前节点所有输入的最短距离
  172. )
  173. sssp.triplets.collect().foreach(println)
  174. println(sssp.vertices.collect.mkString("\n"))
  175. sc.stop()
  176. }

运行结果如下:

  1. 属性演示
  2. **********************************************************
  3. 找出图中年龄大于30的顶点:
  4. David is 42
  5. Ed is 55
  6. Fran is 50
  7. Charlie is 65
  8. 找出图中属性大于 5 的边:
  9. 2 to 1 att 7
  10. 5 to 3 att 8
  11. 列出边属性 >5 tripltes
  12. Bob likes Alice
  13. Ed likes Charlie
  14. 找出图中最大的出度、入度、度数:
  15. max of outDegrees:(2,3) max of inDegrees:(6,2) max of Degrees:(2,4)
  16. 转换操作
  17. **********************************************************
  18. 顶点的转换操作,顶点age + 10
  19. 4 is (David,52)
  20. 1 is (Alice,38)
  21. 5 is (Ed,65)
  22. 6 is (Fran,60)
  23. 2 is (Bob,37)
  24. 3 is (Charlie,75)
  25. 边的转换操作,边的属性*2
  26. 2 to 1 att 14
  27. 2 to 4 att 4
  28. 3 to 2 att 8
  29. 3 to 6 att 6
  30. 2 to 5 att 4
  31. 4 to 1 att 2
  32. 5 to 3 att 16
  33. 5 to 6 att 6
  34. 三元组的转换操作,边的属性为端点的age相加:
  35. 2 to 1 att 756
  36. 2 to 4 att 1134
  37. 3 to 2 att 1755
  38. 3 to 6 att 3250
  39. 2 to 5 att 1485
  40. 4 to 1 att 1176
  41. 5 to 3 att 3575
  42. 5 to 6 att 2750
  43. 结构操作
  44. **********************************************************
  45. 顶点年纪 >30 的子图:
  46. 子图所有顶点:
  47. David is 42
  48. Ed is 55
  49. Fran is 50
  50. Charlie is 65
  51. 子图所有边:
  52. 3 to 6 att 3
  53. 5 to 3 att 8
  54. 5 to 6 att 3
  55. 反转整个图:
  56. 子图所有顶点:
  57. David is 42
  58. Alice is 28
  59. Ed is 55
  60. Fran is 50
  61. Bob is 27
  62. Charlie is 65
  63. 子图所有边:
  64. 1 to 2 att 7
  65. 4 to 2 att 2
  66. 2 to 3 att 4
  67. 6 to 3 att 3
  68. 1 to 4 att 1
  69. 5 to 2 att 2
  70. 3 to 5 att 8
  71. 6 to 5 att 3
  72. 连接操作
  73. **********************************************************
  74. 连接图的属性:
  75. David inDeg: 1 outDeg: 1
  76. Alice inDeg: 2 outDeg: 0
  77. Ed inDeg: 1 outDeg: 2
  78. Fran inDeg: 2 outDeg: 0
  79. Bob inDeg: 1 outDeg: 3
  80. Charlie inDeg: 1 outDeg: 2
  81. 出度和入读相同的人员:
  82. David
  83. 聚合操作
  84. **********************************************************
  85. collectNeighbors:获取当前节点source节点的id和属性
  86. id: 4
  87. 2 (name: Bob age: 27)
  88. id: 1
  89. 2 (name: Bob age: 27)
  90. 4 (name: David age: 42)
  91. id: 5
  92. 2 (name: Bob age: 27)
  93. id: 6
  94. 3 (name: Charlie age: 65)
  95. 5 (name: Ed age: 55)
  96. id: 2
  97. 3 (name: Charlie age: 65)
  98. id: 3
  99. 5 (name: Ed age: 55)
  100. aggregateMessages版本:
  101. id: 4
  102. 2 (name: Bob age: 27)
  103. id: 1
  104. 2 (name: Bob age: 27)
  105. 4 (name: David age: 42)
  106. id: 5
  107. 2 (name: Bob age: 27)
  108. id: 6
  109. 3 (name: Charlie age: 65)
  110. 5 (name: Ed age: 55)
  111. id: 2
  112. 3 (name: Charlie age: 65)
  113. id: 3
  114. 5 (name: Ed age: 55)
  115. 聚合操作
  116. **********************************************************
  117. 找出年纪最大的追求者:
  118. Bob is the oldest follower of David.
  119. David is the oldest follower of Alice.
  120. Bob is the oldest follower of Ed.
  121. Charlie is the oldest follower of Fran.
  122. Charlie is the oldest follower of Bob.
  123. Ed is the oldest follower of Charlie.
  124. 聚合操作
  125. **********************************************************
  126. ((2,Infinity),(1,Infinity),7)
  127. ((2,Infinity),(4,Infinity),2)
  128. ((3,Infinity),(2,Infinity),4)
  129. ((3,Infinity),(6,Infinity),3)
  130. ((2,Infinity),(5,0.0),2)
  131. ((4,Infinity),(1,Infinity),1)
  132. ((5,0.0),(3,Infinity),8)
  133. ((5,0.0),(6,Infinity),3)
  134. 找出5到各顶点的最短距离:
  135. ||||6
  136. ||||1
  137. ||||3
  138. ||||4
  139. ||||5
  140. ||||2
  141. >>>>5
  142. >>>>2
  143. >>>>3
  144. >>>>2
  145. >>>>4
  146. >>>>5
  147. >>>>3
  148. >>>>2
  149. ||||3
  150. ||||6
  151. >>>>3
  152. >>>>3
  153. ||||2
  154. >>>>2
  155. >>>>2
  156. >>>>2
  157. ||||1
  158. ||||4
  159. >>>>4
  160. ||||1
  161. ((2,12.0),(1,15.0),7)
  162. ((2,12.0),(4,14.0),2)
  163. ((3,8.0),(2,12.0),4)
  164. ((3,8.0),(6,3.0),3)
  165. ((2,12.0),(5,0.0),2)
  166. ((4,14.0),(1,15.0),1)
  167. ((5,0.0),(3,8.0),8)
  168. ((5,0.0),(6,3.0),3)
  169. (4,14.0)
  170. (1,15.0)
  171. (5,0.0)
  172. (6,3.0)
  173. (2,12.0)
  174. (3,8.0)