分享

连通组件 · spark

 樊清波 2016-12-22
import scala.reflect.ClassTag import org.apache.spark.graphx._ /** Connected components algorithm. */ object ConnectedComponents { /** * Compute the connected component membership of each vertex and return a graph with the vertex * value containing the lowest vertex id in the connected component containing that vertex. * * @tparam VD the vertex attribute type (discarded in the computation) * @tparam ED the edge attribute type (preserved in the computation) * @param graph the graph for which to compute the connected components * @param maxIterations the maximum number of iterations to run for * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIterations: Int): Graph[VertexId, ED] = { require(maxIterations > 0, s"Maximum of iterations must be greater than 0," + s" but got ${maxIterations}") val ccGraph = graph.mapVertices { case (vid, _) => vid } def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = { if (edge.srcAttr < edge.dstAttr) { Iterator((edge.dstId, edge.srcAttr)) } else if (edge.srcAttr > edge.dstAttr) { Iterator((edge.srcId, edge.dstAttr)) } else { Iterator.empty } } val initialMessage = Long.MaxValue val pregelGraph = Pregel(ccGraph, initialMessage, maxIterations, EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) ccGraph.unpersist() pregelGraph } // end of connectedComponents /** * Compute the connected component membership of each vertex and return a graph with the vertex * value containing the lowest vertex id in the connected component containing that vertex. * * @tparam VD the vertex attribute type (discarded in the computation) * @tparam ED the edge attribute type (preserved in the computation) * @param graph the graph for which to compute the connected components * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { run(graph, Int.MaxValue) } } 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多