5.1.1 什么是 Spark GraphX
Spark GraphX 是一个分布式图处理框架,它是基于 Spark 平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求。那么什么是图,都计算些什么?众所周知社交网络中人与人之间有很多关系链,例如 Twitter、Facebook、微博和微信等,数据中出现网状结构关系都需要图计算。
GraphX 是一个新的 Spark API,它用于图和分布式图 (graph-parallel) 的计算。GraphX 通过引入弹性分布式属性图(Resilient Distributed Property Graph): 顶点和边均有属性的有向多重图,来扩展 Spark RDD。为了支持图计算,GraphX 开发了一组基本的功能操作以及一个优化过的 Pregel API。另外,GraphX 也包含了一个快速增长的图算法和图 builders 的集合,用以简化图分析任务。
从社交网络到语言建模,不断增长的数据规模以及图形数据的重要性已经推动了许多新的分布式图系统的发展。通过限制计算类型以及引入新的技术来切分和分配图,这些系统可以高效地执行复杂的图形算法,比一般的分布式数据计算(data-parallel,如 spark、MapReduce)快很多。
分布式图(graph-parallel)计算和分布式数据(data-parallel)计算类似,分布式数据计算采用了一种 record-centric(以记录为中心)的集合视图,而分布式图计算采用了一种 vertex-centric(以顶点为中心)的图视图。分布式数据计算通过同时处理独立的数据来获得并发的目的,分布式图计算则是通过对图数据进行分区(即切分)来获得并发的目的。更准确的说,分布式图计算递归地定义特征的转换函数(这种转换函数作用于邻居特征),通过并发地执行这些转换函数来获得并发的目的。
分布式图计算比分布式数据计算更适合图的处理,但是在典型的图处理流水线中,它并不能很好地处理所有操作。例如,虽然分布式图系统可以很好的计算 PageRank 等算法,但是它们不适合从不同的数据源构建图或者跨过多个图计算特征。更准确的说,分布式图系统提供的更窄的计算视图无法处理那些构建和转换图结构以及跨越多个图的需求。分布式图系统中无法提供的这些操作需要数据在图本体之上移动并且需要一个图层面而不是单独的顶点或边层面的计算视图。例如,我们可能想限制我们的分析到几个子图上,然后比较结果。 这不仅需要改变图结构,还需要跨多个图计算。
我们如何处理数据取决于我们的目标,有时同一原始数据可能会处理成许多不同表和图的视图,并且图和表之间经常需要能够相互移动。如下图所示:
所以我们的图流水线必须通过组合 graph-parallel 和 data- parallel 来实现。但是这种组合必然会导致大量的数据移动以及数据复制,同时这样的系统也非常复杂。例如,在传统的图计算流水线中,在 Table View 视图下,可能需要 Spark 或者 Hadoop 的支持,在 Graph View 这种视图下,可能需要 Prege 或者 GraphLab 的支持。也就是把图和表分在不同的系统中分别处理。不同系统之间数据的移动和通信会成为很大的负担。
GraphX 项目将 graph-parallel 和 data-parallel 统一到一个系统中,并提供了一个唯一的组合 API。GraphX 允许用户把数据当做一个图和一个集合(RDD),而不需要数据移动或者复制。也就是说 GraphX 统一了 Graph View 和 Table View,可以非常轻松的做 pipeline 操作。
5.1.2 弹性分布式属性图
GraphX 的核心抽象是弹性分布式属性图,它是一个有向多重图,带有连接到每个顶点和边的用户定义的对象。有向多重图中多个并行的边共享相同的源和目的顶点。支持并行边的能力简化了建模场景,相同的顶点可能存在多种关系 (例如 co-worker 和 friend)。 每个顶点用一个唯一的 64 位长的标识符(VertexID)作为 key。GraphX 并没有对顶点标识强加任何排序。同样,边拥有相应的源和目的顶点标识符。
属性图扩展了 Spark RDD 的抽象,有 Table 和 Graph 两种视图,但是只需要一份物理存储。两种视图都有自己独有的操作符,从而使我们同时获得了操作的灵活性和执行的高效率。属性图以 vertex(VD) 和 edge(ED) 类型作为参数类型,这些类型分别是顶点和边相关联的对象的类型。
在某些情况下,在同样的图中,我们可能希望拥有不同属性类型的顶点。这可以通过继承完成。例如,将用户和产品建模成一个二分图,我们可以用如下方式:
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
和 RDD 一样,属性图是不可变的、分布式的、容错的。图的值或者结构的改变需要生成一个新的图来实现。注意,原始图中不受影响的部分都可以在新图中重用,用来减少存储的成本。执行者使用一系列顶点分区方法来对图进行分区。如 RDD 一样,图的每个分区可以在发生故障的情况下被重新创建在不同的机器上。
逻辑上,属性图对应于一对类型化的集合 (RDD),这个集合包含每一个顶点和边的属性。因此,图的类中包含访问图中顶点和边的成员变量。如下所示:
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
VertexRDD[VD] 和 EdgeRDD[ED] 类是 RDD[(VertexID, VD)] 和 RDD[Edge[ED]] 的继承和优化版本。VertexRDD[VD] 和 EdgeRDD[ED] 都提供了额外的图计算功能并提供内部优化功能。如下图所示:
源码如下:
abstract class VertexRDD[VD](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps)
abstract class EdgeRDD[ED](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps)
GraphX 的底层设计有以下几个关键点:
- 对 Graph 视图的所有操作,最终都会转换成其关联的 Table 视图的 RDD 操作来完成。这样对一个图的计算,最终在逻辑上,等价于一系列 RDD 的转换过程。因此,Graph 最终具备了 RDD 的 3 个关键特性:Immutable、Distributed 和 Fault-Tolerant,其中最关键的是 Immutable(不变性)。逻辑上,所有图的转换和操作都产生了一个新图;物理上,GraphX 会有一定程度的不变顶点和边的复用优化,对用户透明。
- 两种视图底层共用的物理数据,由 RDD[Vertex-Partition] 和 RDD[Edge-Partition] 这两个 RDD 组成。点和边实际都不是以表 Collection[tuple] 的形式存储的,而是由 VertexPartition/EdgePartition 在内部存储一个带索引结构的分片数据块,以加速不同视图下的遍历速度。不变的索引结构在 RDD 转换过程中是共用的,降低了计算和存储开销。
- 图的分布式存储采用点分割模式,而且使用 partitionBy 方法,由用户指定不同的划分策略(PartitionStrategy)。划分策略会将边分配到各个 EdgePartition,顶点分配到各个 VertexPartition,EdgePartition 也会缓存本地边关联点的 Ghost 副本。划分策略的不同会影响到所需要缓存的 Ghost 副本数量,以及每个 EdgePartition 分配的边的均衡程度,需要根据图的结构特征选取最佳策略。目前有 EdgePartition2d、EdgePartition1d、RandomVertexCut 和 CanonicalRandomVertexCut 这四种策略。
5.1.3 运行图计算程序
假设我们想构造一个包括不同合作者的属性图。顶点属性可能包含用户名和职业。我们可以用描述合作者之间关系的字符串标注边缘。
Step1:开始的第一步是引入 Spark 和 GraphX 到你的项目中。
如下面所示:
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
Step2:如果你没有用到 Spark shell,你还将需要 SparkContext。
所得的图形将具有类型签名:val userGraph: Graph[(String, String), String]
。
有很多方式从一个原始文件、RDD 构造一个属性图。最一般的方法是利用 Graph object。下面的代码从 RDD 集合生成属性图:
// 创建 SparkConf() 并设置 App 名称
val conf = new SparkConf().setMaster("local[3]").setAppName("WC")
// 创建 SparkContext,该对象是提交 spark App 的入口
val sc = new SparkContext(conf)
// Create an RDD for the vertices (顶点),这里的顶点属性是 一个二元组
val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges (边),这里的边属性是 String 类型
val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val defaultUser = ("John Doe", "Missing") // 缺省属性
// Build the initial Graph (图)
val graph = Graph(users, relationships, defaultUser)
Step3:在上面的例子中,我们用到了 Edge 样本类。
边有一个 srcId 和 dstId 分别对应于源和目标顶点的标示符。另外,Edge 类有一个 attr 成员用来存储边属性。
我们可以分别用 graph.vertices
和 graph.edges
成员将一个图解构为相应的顶点和边。代码如下:
// Count all users which are postdocs (过滤图上的所有顶点,统计顶点的属性的第二个值是 postdoc 的个数)
val verticesCount = graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
println(verticesCount)
// Count all the edges where src > dst (统计图上满足边的 源顶点ID > 目标顶点ID 的个数)
val edgeCount = graph.edges.filter(e => e.srcId > e.dstId).count
println(edgeCount)
graph.vertices
返回一个 VertexRDD[(String, String)],它继承于 RDD[(VertexID, (String, String))]。所以我们可以用 scala 的 case 表达式解构这个元组。另一方面,graph.edges
返回一个包含 Edge[String] 对象的 EdgeRDD。我们也可以用到 case 类的类型构造器,如下例所示:
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
Step4:除了属性图的顶点和边视图,GraphX 也包含了一个三元组视图。
三元视图逻辑上将顶点和边的属性保存为一个 RDD[EdgeTriplet[VD, ED]],它包含 EdgeTriplet 类的实例。可以通过下面的 Sql 表达式表示这个连接:
SELECT
src.id ,
dst.id ,
src.attr ,
e.attr ,
dst.attr
FROM
edges AS e
LEFT JOIN vertices AS src ,
vertices AS dst ON e.srcId = src.Id
AND e.dstId = dst.Id
或者通过下面的图来表示:
Step5:**EdgeTriplet 类继承于 Edge 类,并且加入了 srcAttr 和 dstAttr** 成员,这两个成员分别包含源和目的的属性。
我们可以用一个三元组视图渲染字符串集合用来描述用户之间的关系:
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))