Verteilte Grafikverarbeitung mit Spark GraphX

Bild

"Einfachheit ist Voraussetzung für Zuverlässigkeit" von Edsger Dijkstra

Prolog


Grafiken sind eine so klare und leicht verständliche Datenstruktur, dass sie seit der Zeit von Leonhard Euler gezwungen sind, den Verstand der Menschheit über heterogene Aufgaben zu brechen, wie man alle sieben Brücken von Königsberg durchqueren kann, ohne sie zweimal zu durchlaufen, oder als reisender Vermittler am meisten zu finden profitable Route.

Seit Euler hat sich viel geändert: Transistoren, Programmiersprachen und verteiltes Rechnen sind erschienen. Es ist der letzte Teil dieser Liste, der das Speichern und Verarbeiten von Diagrammen erheblich vereinfacht hat. Genau das wird in diesem Artikel besprochen.

Wenn Sie mit den grundlegenden Apache Spark-Konzepten wie RDD, Treiberprogramm, Worker-Knoten usw. nicht vertraut sind, empfehlen wir Ihnen, die Dokumentation von Databricks zu lesen, bevor Sie mit diesem Artikel fortfahren.

Für mich ist der beste Weg, mit einer Technologie umzugehen, zu versuchen, etwas darauf zu schreiben. In diesem Artikel werden wir die Ähnlichkeit eines „sozialen Netzwerks“ anhand der Grundkonzepte der Graphentheorie analysieren.

Einen Code schreiben


Die von mir gewählte Methode zum Speichern unseres „sozialen Netzwerks“ war äußerst einfach und intuitiv: tsv-Dateien auf der Festplatte, natürlich können dies Dateien in jedem anderen Format wie Parquet, Avro sein. Der Speicherort der Dateien spielt keine Rolle, ob es sich um HDFS oder S3 handelt, auch wenn wir etwas ändern müssen. Spark SQL erledigt die Hauptarbeit für uns. Die Netzwerkstruktur sieht wie folgt aus: Die erste Datei besteht aus zwei Benutzer-IDs und deren Namen, die zweite Benutzer-ID-Datei und einer Liste von Peers. Apache Spark unterstützt die folgenden Programmiersprachen Java, Scala und Python als APIs. Ich habe den zweiten gewählt.

Ich möchte sofort die beliebte Frage beantworten, ob es sich lohnt, Spark GraphX ​​zum Speichern von Diagrammen zu verwenden, wenn Sie viele Einfüge- / Aktualisierungsvorgänge haben. Die Antwort lautet: Nein, alle RDD-Änderungsvorgänge erzwingen das Ändern des gesamten RDD im Cluster. Dies ist nicht die optimale Lösung. Spezielle sind für diesen Fall geeignet NoSql-Lösung wie Neo4J, Titan oder sogar Cassandra, Hbase. Nichts hindert Sie daran, Spark GraphX ​​speziell für die Verarbeitung von Diagrammen zu verwenden und die Daten selbst aus der Datenbank zu laden, z. B. per Sheduler oder in einem ereignisgesteuerten Stil.

Dann fangen wir an, Code zu schreiben. Zuerst müssen wir das Diagramm in den Speicher laden, die Quelldateien nehmen und die erforderlichen Eckpunkte und Kanten herausziehen (hier sind die Hauptpunkte, ein Link zur vollständigen Liste mit dem Quellcode befindet sich am Ende des Artikels):

def verts: RDD[(VertexId, String)] = sc.textFile(USER_NAMES) .flatMap(InputDataFlow.parseNames) def edges: RDD[Edge[PartitionID]] = sc.textFile(USER_GRAPH) .flatMap(InputDataFlow.makeEdges) 

Pregel


Der Hauptmechanismus für die Graphiteration in GraphX ​​ist der Pregel-Algorithmus. Der Algorithmus wurde von Google entwickelt. Das Pregel-Modell verwendet die Übertragung von Nachrichten zwischen den Scheitelpunkten im Diagramm. Die Nachricht, die eine Reihe von Iterationen durchläuft, die als Supersteps bezeichnet werden, ist die Hauptidee dieses Algorithmus. Die Hauptidee kann auch wie folgt beschrieben werden: „Denken Sie wie ein Scheitelpunkt“ , dh der Zustand des aktuellen Scheitelpunkts hängt nur vom Zustand seiner Nachbarn ab.

Pregel wird extrem notwendig, wenn das Lösen eines Problems mit einem regulären MapReduce zu einem extrem schwierigen Prozess wird. Interessanterweise leitet sich der Name Pregel vom Namen des Flusses ab, der die sieben Brücken von Königsberg überspannte.

Das Hauptprimitiv für das Durchlaufen eines Graphen ist ein Triplett - es besteht aus den folgenden Komponenten: dem aktuellen Scheitelpunkt (einem Quellscheitelpunkt), dem Scheitelpunkt, an den wir übergeben (einem Zielscheitelpunkt) und der Kante zwischen ihnen (einer Kantenverbindung) - alles ist klar: wo Geh wohin wir gehen und welchen Weg wir gehen. Außerdem müssen Sie für Pregel den Standardabstand zwischen den Scheitelpunkten angeben. In der Regel handelt es sich um eine PositiveInfinity, UDF-Funktion (benutzerdefinierte Funktion) für jeden Scheitelpunkt, um die eingehende Nachricht zu verarbeiten und den nächsten Scheitelpunkt zu berechnen, und UDF, um die beiden eingehenden Nachrichten zusammenzuführen. Diese Funktion sollte kommutativ sein assoziativ. Da Scala eine funktionale Sprache ist, werden die letzten beiden Funktionen als zwei Lambda-Ausdrücke dargestellt.

Wenn wir die Hauptkomponenten von Pregel auseinander genommen haben, lohnt es sich, zum Üben zu kommen. Der erste Algorithmus, den wir implementieren werden, ist der Dijkstra-Algorithmus zum Finden des kürzesten Wegs von einem beliebigen Scheitelpunkt zu allen anderen.

 def dijkstraShortestPath[VT](graph: GenericGraph[VT], sourceId: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (_, dist, newDist) => math.min(dist, newDist), triplet => { //Distance accumulator if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a, b) => math.min(a, b) ) sssp.vertices.sortByKey(ascending = true).collect.mkString("\n") } 

Hier ist alles klar: Wir gehen von einem bestimmten Scheitelpunkt aus und bestimmen mit der Minimalfunktion den Mindestabstand bei jedem Schritt. Die erste von Pregel verwendete Funktion behält den kürzesten Abstand zwischen der eingehenden Nachricht und dem aktuellen Scheitelpunkt bei. Die zweite Funktion verteilt Nachrichten an Nachbarn, während der Abstand eingehalten wird. Die letzte Funktion - dies ist ein Analogon der Stufe Reduzieren - wählt bei mehreren eingehenden Nachrichten den Mindestwert aus. Als nächstes bilden wir einfach eine bequeme Grafikausgabe.

Trennungsgrad


Ich bin sicher, dass viele Leser dieses Artikels von der Theorie der sechs Handshakes ( sechs Trennungsgrade ) gehört haben - dies ist eine unbewiesene Theorie, nach der zwei Personen durch nicht mehr als fünf Ebenen gemeinsamer Bekanntschaften getrennt sind, dh es sind maximal 6 Handshakes erforderlich, um zwei willkürliche zu verbinden Mann auf Erden. In Bezug auf die Graphentheorie klingt dies so: Der Durchmesser des Datierungsgraphen überschreitet 6 für zwei Menschen auf der Erde nicht.

Beginnen wir mit dem Schreiben des Codes wie folgt: Wir benötigen eine Breitensuche im Diagramm, um nach Kontakten des angegebenen Scheitelpunkts zu suchen. Dazu müssen wir den Dijkstra-Algorithmuscode ändern:

 def getBFS(root: VertexId) = { val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity) val bfs = initialGraph.pregel(Double.PositiveInfinity, maxIterations = 10)( (_, attr, msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr + 1)) } else { Iterator.empty } }, (a, b) => math.min(a, b)).cache() bfs } 

Alles ist sehr ähnlich zu dem, was oben war, aber wir geben bereits die Anzahl der Iterationen an - für Ihr Diagramm kann dies eine andere Zahl sein - 10 für mein Diagramm, das ich empirisch erhalten habe. Als nächstes verbinden Sie sich mit Benutzernamen und nehmen die ersten 100 Werte für einen beliebigen Benutzer:

 def degreeOfSeparation(root: VertexId): Array[(VertexId, DegreeOfSeparation)] = { getBFS(root).vertices.join(verts).take(100) } 

Jetzt suchen wir nach dem Trennungsgrad vom angegebenen Scheitelpunkt zu allen anderen. Sie können auch nach dem Trennungsgrad für zwei beliebige Scheitelpunkte suchen:

 def degreeOfSeparationTwoUser(firstUser: VertexId, secondUser: VertexId) = { getBFS(firstUser) .vertices .filter { case (vertexId, _) => vertexId == secondUser } .collect.map { case (_, degree) => degree } } 

Spark GraphX ​​aus der Box bietet Ihnen die Möglichkeit, viele Informationen über das Diagramm abzurufen, z. B. um die verbundene Komponente des Diagramms (verbundene Komponente) abzurufen:

 def getMostConnectedUsers(amount: Int): Array[(VertexId, ConnectedUser)] = { graph.degrees.join(verts) .sortBy({ case (_, (userName, _)) => userName }, ascending = false) .take(amount) } 

Oder erhalten Sie eine Metrik wie die Anzahl der Dreiecke im Diagramm (Anzahl der Dreiecke):

 def socialGraphTriangleCount = graph.triangleCount() 

Seitenrang


Der PageRank-Algorithmus wurde dank der Stanford-Studenten Larry Page und Sergey Brin entwickelt. Für jeden Scheitelpunkt des Graphen weist der Algorithmus allen anderen eine Bedeutung zu. Wenn ein Twitter-Benutzer beispielsweise eine große Anzahl von Abonnements von anderen Benutzern hat, hat er eine hohe Bewertung, sodass er leicht in der Suchmaschine gefunden werden kann.

GraphX ​​verfügt über eine statische und dynamische Version der PageRank-Implementierung. Die statische Version hat eine feste Anzahl von Iterationen, während die dynamische Version so lange funktioniert, bis die Bewertung auf den angegebenen Wert konvergiert.

Für unser Diagramm ist dies wie folgt:

 def dynamicRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.pageRank(tol = tolerance).vertices def staticRanks(socialGraph: SocialGraph, tolerance: Double) = socialGraph.graph.staticPageRank(numIter = 20).vertices 

Fazit


Ein aufmerksamer Leser stellte fest, dass das Thema dieses Artikels die verteilte Verarbeitung von Diagrammen ist. Beim Schreiben von Code haben wir jedoch nichts unternommen, um die Verarbeitung wirklich zu verteilen. Und hier sollten wir uns ganz am Anfang an das Zitat von Edsger Dijkstra erinnern. Spark vereinfacht unser Leben dramatisch, indem es die Last und die Lasten des verteilten Rechnens übernimmt. Das Schreiben von Code, der in einem verteilten Cluster ausgeführt wird, ist keine so schwierige Aufgabe, wie es am Anfang schien. Und hier gibt es sogar mehrere Optionen zum Verwalten von Clusterressourcen: Hadoop YARN, Apache Mesos (persönlich meine Lieblingsoption) und in jüngerer Zeit gibt es Unterstützung für Kubernetes. Der gesamte Quellcode, der in diesem Artikel analysiert wurde, befindet sich auf dem Github .

Source: https://habr.com/ru/post/de415939/


All Articles