分享

结构操作 · spark

 樊清波 2016-12-22

结构操作

当前的GraphX仅仅支持一组简单的常用结构性操作。下面是基本的结构性操作列表。

+

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

下面分别介绍这四种函数的原理。

+

1 reverse

reverse操作返回一个新的图,这个图的边的方向都是反转的。例如,这个操作可以用来计算反转的PageRank。因为反转操作没有修改顶点或者边的属性或者改变边的数量,所以我们可以 在不移动或者复制数据的情况下有效地实现它。

+

override def reverse: Graph[VD, ED] = {
    new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
}
def reverse(): ReplicatedVertexView[VD, ED] = {
    val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
    new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
}
//EdgePartition中的reverse
def reverse: EdgePartition[ED, VD] = {
    val builder = new ExistingEdgePartitionBuilder[ED, VD](
      global2local, local2global, vertexAttrs, activeSet, size)
    var i = 0
    while (i < size) {
      val localSrcId = localSrcIds(i)
      val localDstId = localDstIds(i)
      val srcId = local2global(localSrcId)
      val dstId = local2global(localDstId)
      val attr = data(i)
      //将源顶点和目标顶点换位置
      builder.add(dstId, srcId, localDstId, localSrcId, attr)
      i += 1
    }
    builder.toEdgePartition
  }

2 subgraph

subgraph操作利用顶点和边的判断式(predicates),返回的图仅仅包含满足顶点判断式的顶点、满足边判断式的边以及满足顶点判断式的triplesubgraph操作可以用于很多场景,如获取 感兴趣的顶点和边组成的图或者获取清除断开连接后的图。

+

override def subgraph(
      epred: EdgeTriplet[VD, ED] => Boolean = x => true,
      vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
    vertices.cache()
    // 过滤vertices, 重用partitioner和索引
    val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
    // 过滤 triplets
    replicatedVertexView.upgrade(vertices, true, true)
    val newEdges = replicatedVertexView.edges.filter(epred, vpred)
    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
  }

该代码显示,subgraph方法的实现分两步:先过滤VertexRDD,然后再过滤EdgeRDD。如上,过滤VertexRDD比较简单,我们重点看过滤EdgeRDD的过程。

+

def filter(
      epred: EdgeTriplet[VD, ED] => Boolean,
      vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = {
    mapEdgePartitions((pid, part) => part.filter(epred, vpred))
  }
//EdgePartition中的filter方法
def filter(
      epred: EdgeTriplet[VD, ED] => Boolean,
      vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
    val builder = new ExistingEdgePartitionBuilder[ED, VD](
      global2local, local2global, vertexAttrs, activeSet)
    var i = 0
    while (i < size) {
      // The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
      val localSrcId = localSrcIds(i)
      val localDstId = localDstIds(i)
      val et = new EdgeTriplet[VD, ED]
      et.srcId = local2global(localSrcId)
      et.dstId = local2global(localDstId)
      et.srcAttr = vertexAttrs(localSrcId)
      et.dstAttr = vertexAttrs(localDstId)
      et.attr = data(i)
      if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
        builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
      }
      i += 1
    }
    builder.toEdgePartition
  }

因为用户可以看到EdgeTriplet的信息,所以我们不能重用EdgeTriplet,需要重新创建一个,然后在用epred函数处理。这里localSrcIds,localDstIds,local2global等前文均有介绍,在此不再赘述。

+

3 mask

mask操作构造一个子图,这个子图包含输入图中包含的顶点和边。它的实现很简单,顶点和边均做inner join操作即可。这个操作可以和subgraph操作相结合,基于另外一个相关图的特征去约束一个图。

+

override def mask[VD2: ClassTag, ED2: ClassTag] (
      other: Graph[VD2, ED2]): Graph[VD, ED] = {
    val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
    val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
  }

4 groupEdges

groupEdges操作合并多重图中的并行边(如顶点对之间重复的边)。在大量的应用程序中,并行的边可以合并(它们的权重合并)为一条边从而降低图的大小。

+

 override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
    val newEdges = replicatedVertexView.edges.mapEdgePartitions(
      (pid, part) => part.groupEdges(merge))
    new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
  }
 def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
     val builder = new ExistingEdgePartitionBuilder[ED, VD](
       global2local, local2global, vertexAttrs, activeSet)
     var currSrcId: VertexId = null.asInstanceOf[VertexId]
     var currDstId: VertexId = null.asInstanceOf[VertexId]
     var currLocalSrcId = -1
     var currLocalDstId = -1
     var currAttr: ED = null.asInstanceOf[ED]
     // 迭代处理所有的边
     var i = 0
     while (i < size) {
       //如果源顶点和目的顶点都相同
       if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
         // 合并属性
         currAttr = merge(currAttr, data(i))
       } else {
         // This edge starts a new run of edges
         if (i > 0) {
           // 添加到builder中
           builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
         }
         // Then start accumulating for a new run
         currSrcId = srcIds(i)
         currDstId = dstIds(i)
         currLocalSrcId = localSrcIds(i)
         currLocalDstId = localDstIds(i)
         currAttr = data(i)
       }
       i += 1
     }
     if (size > 0) {
       builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
     }
     builder.toEdgePartition
   }

图构建那章我们说明过,存储的边按照源顶点id排过序,所以上面的代码可以通过一次迭代完成对所有相同边的处理。

+

5 应用举例

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  ).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  ).collect.foreach(println(_))

/ Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

6 参考文献

【1】spark源码

+

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约