使用Spark GraphX进行分布式图形处理

图片

Edsger Dijkstra的“简单是可靠性的前提”

序言


自从Leonhard Euler时代以来,图形就已经清晰而易于理解,它迫使人们在各种各样的任务上打动了人类的思维,例如如何穿越Königsberg的所有七座桥梁而又没有经历两次或作为旅行调解人有利可图的路线。

自Euler以来发生了许多变化:晶体管,编程语言和分布式计算已经出现。 这是该列表的最后一个,它极大地简化了图形的存储和处理。 实际上,这就是本文将要讨论的内容。

如果您不熟悉基本的Apache Spark概念,例如RDD,Driver程序,Worker节点等,那么在继续本文之前,建议您阅读Databricks的文档

对我而言,处理技术的最佳方法是尝试在其上编写一些东西。 在本文中,我们将使用图论的基本概念对“社交网络”的相似性进行分析。

编写代码


我选择的存储“社交网络”的方法非常简单直观:tsv文件在磁盘上,自然可以是任何其他格式的文件,例如Parquet,Avro。 文件的存储位置无关紧要,无论是HDFS还是S3,即使我们需要更改某些内容,Spark SQL也会为我们完成主要工作。 网络结构如下:第一个文件是一对用户ID及其名称,第二个用户ID文件是一个对等方列表。 Apache Spark支持以下Java,Scala和Python编程语言作为API。 我选择了第二个。

我立即想回答一个普遍的问题,即当您执行许多插入/更新操作时,是否值得使用Spark GraphX存储图-答案是否定的,所有RDD更改操作都会强制更改集群中的整个RDD,这不是最佳解决方案,特殊的解决方案适合这种情况NoSql解决方案,例如Neo4J,Titanium甚至Cassandra,Hbase。 没有什么可以阻止您将Spark GraphX与它们一起专门用于处理图形,例如通过sheduler或事件驱动的样式从数据库加载数据本身。

好了,让我们开始编写代码。 首先,我们需要将图形加载到内存中,获取源文件,并拉出必要的顶点和边线(这是要点,可以在本文结尾找到包含源代码的完整列表的链接):

def verts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES) .flatMap(InputDataFlow.parseNames) def edges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH) .flatMap(InputDataFlow.makeEdges) 

普雷格尔


GraphX中图迭代的主要机制是Pregel算法。 该算法由Google开发,Pregel模型使用图中顶点之间的消息传递 。 通过一系列称为超级步骤的迭代传递的消息是此算法的主要思想。 而且,主要思想可以描述如下: “像顶点一样思考” ,即当前顶点的状态仅取决于其相邻节点的状态。

当使用常规的MapReduce解决问题变得非常困难时,Pregel变得极为必要。 有趣的是,普雷格尔(Pregel)的名字来自于横跨科尼斯堡(Koenigsberg)七座桥梁的河流的名字。

遍历图形的主要图元是一个三元组-它由以下组件组成:当前顶点(源顶点),我们要传递到的顶点(目标顶点)以及它们之间的边(连接边)-一切都很清楚:去我们去的地方,去哪条路。 另外,对于Pregel,您必须指定顶点之间的默认距离,通常,它是一个PositiveInfinity,每个顶点的UDF(用户定义函数)函数以处理传入消息并计算下一个顶点,并且UDF合并两个传入消息,此函数应是可交换的,并且联想的。 由于Scala是一种功能语言,因此后两个函数将表示为两个lambda表达式。

剖析Pregel的主要成分后,值得实践。 我们将要实现的第一个算法是Dijkstra算法,用于找到从任意顶点到所有其他顶点的最短路径。

 def dijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (_, dist, newDist) => math.min(dist, newDist), triplet => { //Distance accumulator if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a, b) => math.min(a, b) ) sssp.vertices.sortByKey(ascending = true).collect.mkString("\n") } 

一切都显而易见:我们从给定的顶点开始,使用最小函数确定每一步的最小距离。 Pregel使用的第一个功能将保留传入消息和当前顶点之间的最短距离。 第二个功能是在保持距离的同时将消息分发给邻居。 最后一个功能-这是Reduce阶段的类似功能-在有多个传入消息的情况下选择最小值。 接下来,我们简单地形成一个方便的图形输出。

分离度


我敢肯定,本文的许多读者都听说过六次握手( 六度分离 )的理论-这是一种未经证实的理论,根据该理论,任何两个人之间的相识程度均不超过五级,也就是说,最多需要六次握手才能连接两个任意人在地球上。 就图论而言,这听起来像是:地球上任何两个人的约会图的直径不超过6。

让我们开始使用以下代码编写代码,我们需要在图形上进行广度优先搜索,以搜索指定顶点的接触,为此,我们需要修改Dijkstra算法代码:

 def getBFS(root: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity) val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)( (_, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr + 1)) } else { Iterator.empty } }, (a, b) => math.min(a, b)).cache() bfs } 

一切都与上面的非常相似,但是我们已经指出了迭代次数-对于您的图形,这可能是不同的数字-根据经验我的图形为10。 接下来,加入用户名并为任意用户获取前100个值:

 def degreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = { getBFS(root).vertices.join(verts).take(100) } 

现在,我们正在寻找从给定顶点到所有其他顶点的分离度,您还可以搜索两个任意顶点的分离度:

 def degreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = { getBFS(firstUser) .vertices .filter { case (vertexId, _) => vertexId == secondUser } .collect.map { case (_, degree) => degree } } 

框中的Spark GraphX使您有机会获得有关图的很多信息,例如,获得图的连接组件(connected component):

 def getMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = { graph.degrees.join(verts) .sortBy({ case (_, (userName, _)) => userName }, ascending = false) .take(amount) } 

或获取指标,例如图中的三角形数量(三角形计数):

 def socialGraphTriangleCount = graph.triangleCount() 

页面等级


PageRank算法的出现要归功于斯坦福大学的研究生Larry Page和Sergey Brin。 对于图的每个顶点,算法在所有其他顶点之间分配重要性。 例如,如果一个Twitter用户有来自其他用户的大量订阅,那么他将获得很高的评价,因此,可以在搜索引擎中轻松找到他。

GraphX具有PageRank实现的静态和动态版本。 静态版本具有固定的迭代次数,而动态版本将起作用,直到评级开始收敛到给定值为止。

对于我们的图形,将如下所示:

 def dynamicRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.pageRank(tol = tolerance).vertices def staticRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.staticPageRank(numIter = 20).vertices 

结论


一位细心的读者注意到,本文的主题是图形的分布式处理,但是在编写代码时,我们没有做任何使处理真正分散的事情。 在这里,我们应该回想起Edsger Dijkstra一开始的话。 通过承担分布式计算的负担,Spark大大简化了我们的生活。 编写将在分布式集群上执行的代码并不像一开始那样困难。 这里甚至还有几个用于管理集群资源的选项 :Hadoop YARN,Apache Mesos(个人,我最喜欢的选项),以及最近对Kubernetes的支持。 本文中分析的所有源代码都可以在github上找到。

Source: https://habr.com/ru/post/zh-CN415939/


All Articles