Procesamiento gráfico distribuido con Spark GraphX

imagen

"La simplicidad es un requisito previo para la fiabilidad" por Edsger Dijkstra

Prologo


Los gráficos son una estructura de datos tan clara y fácil de entender, ya que desde la época de Leonhard Euler se ha visto obligado a romper las mentes de la humanidad sobre tareas heterogéneas, como la forma de atravesar los siete puentes de Königsberg sin atravesar ninguno de ellos dos veces o, como intermediario de viaje, encontrar el mayor ruta rentable

Mucho ha cambiado desde Euler: han aparecido transistores, lenguajes de programación y computación distribuida. Es el último de esta lista que simplificó drásticamente el almacenamiento y el procesamiento de gráficos. En realidad, esto es lo que se discutirá en este artículo.

Si no está familiarizado con los conceptos básicos de Apache Spark, como RDD, el programa Driver, el nodo Worker, etc., entonces, antes de continuar con este artículo, le recomendaría que lea la documentación de Databricks.

En cuanto a mí, la mejor manera de lidiar con una tecnología es intentar escribir algo en ella. En este artículo haremos un análisis de la similitud de una "red social" utilizando los conceptos básicos de la teoría de grafos.

Escribir un código


El método de almacenamiento de nuestra "red social" que elegí fue extremadamente simple e intuitivo: archivos tsv en disco, naturalmente, estos podrían ser archivos de cualquier otro formato como Parquet, Avro. La ubicación de almacenamiento de los archivos no importa si podría ser HDFS o S3, incluso si necesitamos cambiar algo, entonces Spark SQL hará el trabajo principal por nosotros. La estructura de la red será la siguiente: el primer archivo es un par de identificación de usuario y su nombre, el segundo archivo de identificación de usuario y una lista de pares. Apache Spark admite los siguientes lenguajes de programación Java, Scala y Python como API. Elegí el segundo.

Inmediatamente quiero responder la pregunta popular de si vale la pena usar Spark GraphX ​​para almacenar gráficos cuando tiene muchas operaciones de inserción / actualización: la respuesta es no, todas las operaciones de cambio de RDD obligan a cambiar todo el RDD en el clúster, que no es la solución óptima, las especiales son adecuadas para este caso Solución NoSql como Neo4J, Titanio o incluso Cassandra, Hbase. Nada le impide usar Spark GraphX ​​con ellos específicamente para procesar gráficos, cargar los datos de la base de datos, por ejemplo, por sheduler o en un estilo basado en eventos.

Bueno, entonces, comencemos a escribir código. Primero, necesitamos cargar el gráfico en la memoria, tomar los archivos fuente y extraer los vértices y bordes necesarios (aquí están los puntos principales, al final del artículo se puede encontrar un enlace a la lista completa con el código fuente):

def verts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES) .flatMap(InputDataFlow.parseNames) def edges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH) .flatMap(InputDataFlow.makeEdges) 

Pregel


El mecanismo principal para la iteración de gráficos en GraphX ​​es el algoritmo Pregel. El algoritmo fue desarrollado por Google, el modelo Pregel utiliza la transferencia de mensajes entre los vértices en el gráfico. El mensaje que pasa por una serie de iteraciones llamadas superpasos es la idea principal detrás de este algoritmo. Además, la idea principal se puede describir de la siguiente manera: "pensar como un vértice" , es decir, el estado del vértice actual depende solo del estado de sus vecinos.

Pregel se vuelve extremadamente necesario cuando resolver un problema con un MapReduce regular se convierte en un proceso extremadamente difícil. Curiosamente, el nombre Pregel proviene del nombre del río, que abarca los siete puentes de Koenigsberg.

La primitiva principal para atravesar un gráfico es un triplete: consta de los siguientes componentes: el vértice actual (un vértice de origen), el vértice al que pasamos (un vértice de destino) y el borde entre ellos (un borde de conexión): todo está claro: donde ir a donde vamos y hacia dónde vamos. Además, para Pregel, debe especificar la distancia predeterminada entre los vértices, como regla general, es una función PositiveInfinity, UDF (función definida por el usuario) para cada vértice para procesar el mensaje entrante y calcular el siguiente vértice, y UDF para fusionar los dos mensajes entrantes, esta función debe ser conmutativa y asociativo Como Scala es un lenguaje funcional, las dos últimas funciones se representarán como dos expresiones lambda.

Cuando separamos los componentes principales de Pregel, vale la pena venir a practicar. El primer algoritmo que implementaremos es el algoritmo de Dijkstra para encontrar la ruta más corta desde un vértice arbitrario a todos los demás.

 def dijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (_, dist, newDist) => math.min(dist, newDist), triplet => { //Distance accumulator if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a, b) => math.min(a, b) ) sssp.vertices.sortByKey(ascending = true).collect.mkString("\n") } 

Aquí todo es obvio: comenzamos desde un vértice dado, usamos la función mínima para determinar la distancia mínima en cada paso. La primera función utilizada por Pregel conserva la distancia más corta entre el mensaje entrante y el vértice actual. La segunda función distribuye mensajes a los vecinos mientras mantiene la distancia. La última función, que es un análogo de la etapa Reducir, selecciona el valor mínimo en el caso de varios mensajes entrantes. A continuación, simplemente formamos una salida gráfica conveniente.

Grado de separación


Estoy seguro de que muchos lectores de este artículo han escuchado sobre la teoría de los seis apretones de manos ( Seis grados de separación ): esta es una teoría no probada según la cual dos personas están separadas por no más de cinco niveles de conocidos comunes, es decir, se necesita un máximo de 6 apretones de manos para conectar dos arbitrarios hombre en la tierra En términos de teoría de grafos, suena así: el diámetro del gráfico de datación no excede de 6 para dos personas en la Tierra.

Comencemos a escribir el código con lo siguiente, necesitamos una búsqueda de amplitud en el gráfico para buscar contactos del vértice indicado, para esto necesitamos modificar el código del algoritmo Dijkstra:

 def getBFS(root: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity) val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)( (_, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr + 1)) } else { Iterator.empty } }, (a, b) => math.min(a, b)).cache() bfs } 

Todo es muy similar a lo anterior, pero ya indicamos el número de iteraciones; para su gráfico, este puede ser un número diferente, 10 para mi gráfico que obtuve empíricamente. Luego, únase con los nombres de usuario y tome los primeros 100 valores para un usuario arbitrario:

 def degreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = { getBFS(root).vertices.join(verts).take(100) } 

Ahora estamos buscando el grado de separación del vértice dado a todos los demás, también puede buscar el grado de separación de dos vértices arbitrarios:

 def degreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = { getBFS(firstUser) .vertices .filter { case (vertexId, _) => vertexId == secondUser } .collect.map { case (_, degree) => degree } } 

Spark GraphX ​​del cuadro le brinda la oportunidad de obtener mucha información sobre el gráfico, por ejemplo, para obtener el componente conectado del gráfico (componente conectado):

 def getMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = { graph.degrees.join(verts) .sortBy({ case (_, (userName, _)) => userName }, ascending = false) .take(amount) } 

O obtenga una métrica como el número de triángulos en el gráfico (recuento de triángulos):

 def socialGraphTriangleCount = graph.triangleCount() 

Rango de página


El algoritmo de PageRank surgió gracias a los estudiantes graduados de Stanford, Larry Page y Sergey Brin. Para cada vértice del gráfico, el algoritmo asigna importancia entre todos los demás. Por ejemplo, si un usuario de Twitter tiene una gran cantidad de suscripciones de otros usuarios, tendrá una calificación alta, por lo tanto, puede ser fácilmente encontrado en el motor de búsqueda.

GraphX ​​tiene una versión estática y dinámica de la implementación de PageRank. La versión estática tiene un número fijo de iteraciones, mientras que la versión dinámica funcionará hasta que la calificación comience a converger al valor dado.

Para nuestro gráfico, esto será lo siguiente:

 def dynamicRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.pageRank(tol = tolerance).vertices def staticRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.staticPageRank(numIter = 20).vertices 

Conclusión


Un lector atento señaló que el tema de este artículo es el procesamiento distribuido de gráficos, pero al escribir el código, no hicimos nada para que el procesamiento se distribuye realmente. Y aquí deberíamos recordar la cita de Edsger Dijkstra al principio. Spark simplifica drásticamente nuestras vidas al asumir la carga y la carga de la informática distribuida. Escribir código que se ejecutará en un clúster distribuido no es una tarea tan difícil como podría haber parecido al principio. Y aquí hay incluso varias opciones para administrar recursos de clúster: Hadoop YARN, Apache Mesos (personalmente, mi opción favorita), y recientemente hay soporte para Kubernetes. Todo el código fuente que se analizó en este artículo se puede encontrar en el github .

Source: https://habr.com/ru/post/es415939/


All Articles