Traitement graphique distribué avec Spark GraphX

image

"La simplicité est une condition préalable à la fiabilité" par Edsger Dijkstra

Prologue


Les graphiques sont une structure de données tellement claire et facile à comprendre, car depuis l'époque de Leonhard Euler, elle a forcé l'esprit de l'humanité à travers des tâches hétérogènes, telles que la façon dont vous pouvez traverser les sept ponts de Königsberg sans passer par l'un d'eux deux fois ou en tant qu'intermédiaire itinérant, trouver le plus itinéraire rentable.

Beaucoup de choses ont changé depuis Euler: les transistors, les langages de programmation et l'informatique distribuée sont apparus. C'est le dernier de cette liste qui a considérablement simplifié le stockage et le traitement des graphiques. En fait, c'est ce qui sera discuté dans cet article.

Si vous n'êtes pas familier avec les concepts de base d'Apache Spark tels que RDD, programme pilote, nœud de travail, etc., avant de poursuivre cet article, je vous recommande de lire la documentation de Databricks.

Quant à moi, la meilleure façon de gérer une technologie est d'essayer d'écrire quelque chose dessus. Dans cet article, nous ferons une analyse de la similitude d'un «réseau social» en utilisant les concepts de base de la théorie des graphes.

Écrire un code


La méthode de stockage de notre «réseau social» que j'ai choisie était extrêmement simple et intuitive: les fichiers tsv sur disque, bien sûr, il pouvait s'agir de fichiers de tout autre format comme Parquet, Avro. L'emplacement de stockage des fichiers n'a pas d'importance s'il peut s'agir de HDFS ou S3, même si nous devons changer quelque chose, alors Spark SQL fera le travail principal pour nous. La structure du réseau sera la suivante: le premier fichier est une paire d'ID utilisateur et son nom, le deuxième fichier d'ID utilisateur et une liste de pairs. Apache Spark prend en charge les langages de programmation Java, Scala et Python suivants en tant qu'API. J'ai choisi le second.

Immédiatement, je veux répondre à la question populaire de savoir s'il vaut la peine d'utiliser Spark GraphX ​​pour stocker des graphiques lorsque vous avez de nombreuses opérations d'insertion / mise à jour - la réponse est non, toutes les opérations de changement de RDD forcent à changer l'ensemble du RDD dans le cluster, ce qui n'est pas la solution optimale, des solutions spéciales conviennent à ce cas Solution NoSql comme Neo4J, Titanium ou encore Cassandra, Hbase. Rien ne vous empêche d'utiliser Spark GraphX ​​avec eux spécifiquement pour traiter des graphiques, charger les données elles-mêmes depuis la base de données, par exemple, par sheduler ou dans un style événementiel.

Eh bien, commençons à écrire du code. Tout d'abord, nous devons charger le graphique en mémoire, prendre les fichiers source et extraire les sommets et bords nécessaires (voici les principaux points, un lien vers la liste complète avec le code source se trouve à la fin de l'article):

def verts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES) .flatMap(InputDataFlow.parseNames) def edges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH) .flatMap(InputDataFlow.makeEdges) 

Pregel


Le principal mécanisme d'itération des graphes dans GraphX ​​est l'algorithme Pregel. L'algorithme a été développé par Google, le modèle Pregel utilise le transfert de messages entre les sommets du graphe. Le message passant par une série d'itérations appelées supersteps est l'idée principale derrière cet algorithme. De plus, l'idée principale peut être décrite comme suit: «penser comme un sommet» , c'est-à-dire que l'état du sommet actuel ne dépend que de l'état de ses voisins.

Pregel devient extrêmement nécessaire lorsque la résolution d'un problème avec un MapReduce régulier devient un processus extrêmement difficile. Fait intéressant, le nom Pregel vient du nom de la rivière, qui enjambait les sept ponts de Koenigsberg.

La principale primitive pour parcourir un graphe est un triplet - il se compose des composants suivants: le sommet actuel (un sommet source), le sommet auquel nous passons (un sommet de destination) et le bord entre eux (un bord se connectant) - tout est clair: où allez où nous allons et où nous allons. De plus, pour Pregel, vous devez spécifier la distance par défaut entre les sommets, en règle générale, c'est une fonction PositiveInfinity, UDF (fonction définie par l'utilisateur) pour chaque sommet pour traiter le message entrant et calculer le sommet suivant, et UDF pour fusionner les deux messages entrants, cette fonction doit être commutative et associatif. Puisque Scala est un langage fonctionnel, les deux dernières fonctions seront représentées comme deux expressions lambda.

Lorsque nous avons déjà démonté les principaux composants de Pregel, cela vaut la peine de venir pratiquer. Le premier algorithme que nous allons implémenter est l'algorithme de Dijkstra pour trouver le chemin le plus court d'un sommet arbitraire à tous les autres.

 def dijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (_, dist, newDist) => math.min(dist, newDist), triplet => { //Distance accumulator if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a, b) => math.min(a, b) ) sssp.vertices.sortByKey(ascending = true).collect.mkString("\n") } 

Tout est évident ici: on part d'un sommet donné, on utilise la fonction minimum pour déterminer la distance minimum à chaque pas. La première fonction utilisée par Pregel conserve la distance la plus courte entre le message entrant et le sommet actuel. La deuxième fonction distribue des messages aux voisins tout en maintenant la distance. La dernière fonction - c'est un analogue de l'étape Réduire - sélectionne la valeur minimale dans le cas de plusieurs messages entrants. Ensuite, nous formons simplement une sortie graphique pratique.

Degré de séparation


Je suis sûr que de nombreux lecteurs de cet article ont entendu parler de la théorie des six poignées de main ( Six degrés de séparation ) - c'est une théorie non prouvée selon laquelle deux personnes sont séparées par pas plus de cinq niveaux de connaissances communes, c'est-à-dire, un maximum de 6 poignées de main est nécessaire pour connecter deux arbitraires homme sur terre. En termes de théorie des graphes, cela ressemble à ceci: le diamètre du graphique de datation ne dépasse pas 6 pour deux personnes sur Terre.

Commençons à écrire le code avec ce qui suit, nous avons besoin d'une recherche en largeur sur le graphique pour rechercher des contacts du sommet indiqué, pour cela, nous devons modifier le code de l'algorithme de Dijkstra:

 def getBFS(root: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity) val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)( (_, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr + 1)) } else { Iterator.empty } }, (a, b) => math.min(a, b)).cache() bfs } 

Tout est très similaire à ce qui était ci-dessus, mais nous indiquons déjà le nombre d'itérations - pour votre graphique, cela peut être un nombre différent - 10 pour mon graphique, j'ai obtenu empiriquement. Ensuite, joignez-vous aux noms d'utilisateur et prenez les 100 premières valeurs pour un utilisateur arbitraire:

 def degreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = { getBFS(root).vertices.join(verts).take(100) } 

Maintenant, nous recherchons le degré de séparation du sommet donné à tous les autres, vous pouvez également rechercher le degré de séparation pour deux sommets arbitraires:

 def degreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = { getBFS(firstUser) .vertices .filter { case (vertexId, _) => vertexId == secondUser } .collect.map { case (_, degree) => degree } } 

Spark GraphX ​​de la boîte vous donne la possibilité d'obtenir beaucoup d'informations sur le graphique, par exemple, pour obtenir le composant connecté du graphique (composant connecté):

 def getMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = { graph.degrees.join(verts) .sortBy({ case (_, (userName, _)) => userName }, ascending = false) .take(amount) } 

Ou obtenez une métrique telle que le nombre de triangles dans le graphique (nombre de triangles):

 def socialGraphTriangleCount = graph.triangleCount() 

Classement des pages


L'algorithme PageRank est né grâce aux étudiants diplômés de Stanford Larry Page et Sergey Brin. Pour chaque sommet du graphique, l'algorithme attribue une importance parmi tous les autres. Par exemple, si un utilisateur Twitter a un grand nombre d'abonnements d'autres utilisateurs, alors il aura une note élevée, par conséquent, il peut être facilement trouvé dans le moteur de recherche.

GraphX ​​a une version statique et dynamique de l'implémentation du PageRank. La version statique a un nombre fixe d'itérations, tandis que la version dynamique fonctionnera jusqu'à ce que la cote commence à converger vers la valeur donnée.

Pour notre graphique, ce sera comme suit:

 def dynamicRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.pageRank(tol = tolerance).vertices def staticRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.staticPageRank(numIter = 20).vertices 

Conclusion


Un lecteur attentif a noté que le sujet de cet article est le traitement distribué des graphiques, mais lors de l'écriture de code, nous n'avons rien fait pour que le traitement soit vraiment distribué. Et ici, nous devons rappeler la citation d'Edsger Dijkstra au tout début. Spark simplifie considérablement nos vies en assumant le fardeau et les charges de l'informatique distribuée. Écrire du code qui s'exécutera sur un cluster distribué n'est pas une tâche aussi difficile qu'elle aurait pu l'être au début. Et ici, il existe même plusieurs options pour gérer les ressources du cluster: Hadoop YARN, Apache Mesos (personnellement, mon option préférée), et plus récemment, il existe un support pour Kubernetes. Tout le code source qui a été analysé dans cet article peut être trouvé sur le github .

Source: https://habr.com/ru/post/fr415939/


All Articles