Teil 1: Problemstellung
Hallo Habr! Ich bin Lösungsarchitekt bei CleverDATA. Heute werde ich darüber sprechen, wie wir große Datenmengen mithilfe von Modellen klassifizieren, die mit fast jeder verfügbaren Bibliothek für maschinelles Lernen erstellt wurden. In dieser zweiteiligen Reihe werden wir die folgenden Fragen betrachten.
- Wie präsentiere ich ein Modell für maschinelles Lernen als Service (Model as a Service)?
- Wie werden die Aufgaben der verteilten Verarbeitung großer Datenmengen mit Apache Spark physisch ausgeführt?
- Welche Probleme treten auf, wenn Apache Spark mit externen Diensten interagiert?
- Wie kann die Apache Spark-Interaktion mit externen Diensten mithilfe der Akka-Streams und Akka-http-Bibliotheken sowie des Reactive Streams-Ansatzes organisiert werden?
Ursprünglich wollte ich einen Artikel schreiben, aber da sich herausstellte, dass das Materialvolumen ziemlich groß war, beschloss ich, es in zwei Teile zu teilen. Heute werden wir im ersten Teil die allgemeine Erklärung des Problems sowie die Hauptprobleme betrachten, die während der Implementierung gelöst werden müssen. Im
zweiten Teil werden wir über die praktische Umsetzung der Lösung dieses Problems unter Verwendung des Reactive Streams-Ansatzes sprechen.
Unsere Firma
CleverDATA verfügt über ein Team von Datenanalysten, die mithilfe einer Vielzahl von Tools (wie Scikit-Learn, Facebook FastText, XGBOST, TensorFlow usw.) Modelle für maschinelles Lernen trainieren. Die De-facto-Kernprogrammiersprache, die Analysten verwenden, ist Python. Fast alle Bibliotheken für maschinelles Lernen, auch ursprünglich in anderen Sprachen implementiert, verfügen über eine Python-Oberfläche und sind in die wichtigsten Python-Bibliotheken integriert (hauptsächlich in NumPy).
Andererseits wird das Hadoop-Ökosystem häufig zum Speichern und Verarbeiten großer Mengen unstrukturierter Daten verwendet. Darin werden Daten in Form von verteilten replizierten Blöcken einer bestimmten Größe (normalerweise 128 MB, aber es ist möglich, sie zu konfigurieren) im HDFS-Dateisystem gespeichert. Die effizientesten verteilten Datenverarbeitungsalgorithmen versuchen, die Netzwerkinteraktion zwischen Cluster-Computern zu minimieren. Dazu müssen die Daten auf denselben Computern verarbeitet werden, auf denen sie gespeichert sind.
In vielen Fällen kann die Netzwerkinteraktion natürlich nicht vollständig vermieden werden. Sie müssen jedoch versuchen, alle Aufgaben lokal auszuführen und die Datenmenge zu minimieren, die über das Netzwerk übertragen werden muss.
Dieses Prinzip der Verarbeitung verteilter Daten wird als "Verschieben von Berechnungen in die Nähe von Daten" bezeichnet. Alle wichtigen Frameworks, hauptsächlich Hadoop MapReduce und Apache Spark, halten sich an dieses Prinzip. Sie bestimmen die Zusammensetzung und Reihenfolge bestimmter Vorgänge, die auf Computern ausgeführt werden müssen, auf denen die erforderlichen Datenblöcke gespeichert sind.
Abbildung 1. Der HDFS-Cluster besteht aus mehreren Computern, von denen einer ein Namensknoten und der Rest ein Datenknoten ist. Der Namensknoten speichert Informationen zu den Dateien, aus denen ihre Blöcke bestehen, und zu den Computern, auf denen sie sich physisch befinden. Die Blöcke selbst werden auf dem Datenknoten gespeichert, der zur Erhöhung der Zuverlässigkeit auf mehrere Computer repliziert wird. Der Datenknoten führt auch Datenverarbeitungsaufgaben aus. Aufgaben bestehen aus dem Hauptprozess (Master, M), der den Start von Arbeitsprozessen (Worker, W) auf den Maschinen koordiniert, auf denen die erforderlichen Datenblöcke gespeichert sind.Fast alle Komponenten des Hadoop-Ökosystems werden mit der Java Virtual Machine (JVM) gestartet und sind eng miteinander integriert. Um beispielsweise mit Apache Spark geschriebene Aufgaben für die Arbeit mit in HDFS gespeicherten Daten auszuführen, sind fast keine zusätzlichen Manipulationen erforderlich: Das Framework bietet diese Funktionalität sofort.
Leider geht der Großteil der für maschinelles Lernen entwickelten Bibliotheken davon aus, dass Daten lokal gespeichert und verarbeitet werden. Gleichzeitig gibt es Bibliotheken, die eng in das Hadoop-Ökosystem integriert sind, z. B. Spark ML oder Apache Mahout. Sie weisen jedoch eine Reihe von erheblichen Nachteilen auf. Erstens bieten sie weitaus weniger Implementierungen von Algorithmen für maschinelles Lernen. Zweitens können nicht alle Datenanalysten mit ihnen arbeiten. Zu den Vorteilen dieser Bibliotheken gehört die Tatsache, dass sie zum Trainieren von Modellen für große Datenmengen mithilfe von verteiltem Computing verwendet werden können.
Datenanalysten verwenden jedoch häufig alternative Methoden, um Modelle zu trainieren, insbesondere Bibliotheken, die die Verwendung von GPUs ermöglichen. Ich werde die Probleme mit Trainingsmodellen in diesem Artikel nicht berücksichtigen, da ich mich auf die Verwendung von vorgefertigten Modellen konzentrieren möchte, die unter Verwendung einer verfügbaren Bibliothek für maschinelles Lernen erstellt wurden, um große Datenmengen zu klassifizieren.
Die Hauptaufgabe, die wir hier zu lösen versuchen, besteht darin, Modelle für maschinelles Lernen auf große Datenmengen anzuwenden, die in HDFS gespeichert sind. Wenn wir das SparkML-Modul aus der Apache Spark-Bibliothek verwenden könnten, das die grundlegenden Algorithmen für maschinelles Lernen implementiert, wäre die Klassifizierung großer Datenmengen eine triviale Aufgabe:
val model: LogisticRegressionModel = LogisticRegressionModel.load("/path/to/model") val dataset = spark.read.parquet("/path/to/data") val result = model.transform(dataset)
Leider funktioniert dieser Ansatz nur für Algorithmen, die im SparkML-Modul implementiert sind (eine vollständige Liste finden Sie
hier ). Bei Verwendung anderer Bibliotheken, die nicht in der JVM implementiert sind, wird alles viel komplizierter.
Um dieses Problem zu lösen, haben wir beschlossen, das Modell in einen REST-Service zu verpacken. Dementsprechend ist es beim Starten der Aufgabe zum Klassifizieren von in HDFS gespeicherten Daten erforderlich, die Interaktion zwischen den Computern, auf denen die Daten gespeichert sind, und dem Computer (oder Cluster von Computern), auf dem der Klassifizierungsdienst ausgeführt wird, zu organisieren.
Abbildung 2. Das Konzept von Model as a ServiceBeschreibung des Python-Klassifizierungsdienstes
Um das Modell als Service zu präsentieren, müssen folgende Aufgaben gelöst werden:
- Implementieren eines effizienten Zugriffs auf das Modell über HTTP;
- Gewährleistung der effizientesten Nutzung der Maschinenressourcen (hauptsächlich aller Prozessorkerne und des Arbeitsspeichers);
- bieten Widerstand gegen hohe Lasten;
- bieten die Möglichkeit, horizontal zu skalieren.
Der Zugriff auf das Modell über HTTP ist recht einfach zu implementieren: Für Python wurde eine große Anzahl von Bibliotheken entwickelt, mit denen Sie einen REST-Zugriffspunkt mit einer kleinen Menge Code implementieren können. Einer dieser Mikroframes ist
Flask . Die Implementierung des Klassifizierungsdienstes für Flask ist wie folgt:
from flask import Flask, request, Response model = load_model() n_features = 100 app = Flask(__name__) @app.route("/score", methods=['PUT']) def score(): inp = np.frombuffer(request.data, dtype='float32').reshape(-1, n_features) result = model.predict(inp) return Response(result.tobytes(), mimetype='application/octet-stream') if __name__ == "__main__": app.run()
Hier laden wir beim Start des Dienstes das Modell in den Speicher und verwenden es dann beim Aufrufen der Klassifizierungsmethode. Die Funktion load_model lädt das Modell von einer externen Quelle, sei es das Dateisystem, der Schlüsselwertspeicher usw.
Ein Modell ist ein Objekt mit einer Vorhersagemethode. Bei der Klassifizierung wird eine Eingabe in einen Merkmalsvektor einer bestimmten Größe vorgenommen und entweder ein Boolescher Wert erzeugt, der angibt, ob der angegebene Vektor für dieses Modell geeignet ist, oder ein Wert von 0 bis 1, auf den Sie dann den Grenzwert anwenden können: alles über dem Schwellenwert, ist ein positives Ergebnis der Klassifizierung, der Rest nicht.
Der zu klassifizierende Merkmalsvektor wird in binärer Form übergeben und in ein Numpy-Array deserialisiert. Es wäre ein Aufwand, für jeden Vektor eine HTTP-Anfrage zu stellen. Im Fall eines 100-dimensionalen Vektors und unter Verwendung von Werten vom Typ float32 würde eine vollständige HTTP-Anforderung, einschließlich Header, ungefähr so aussehen:
PUT /score HTTP/1.1 Host: score-node-1:8099 User-Agent: curl/7.58.0 Accept: */* Content-Type: application/binary Content-Length: 400 [400 bytes of data]
Wie Sie sehen können, ist die Effizienz einer solchen Anforderung sehr gering (400 Byte Nutzlast / (133 Byte Header + 400 Byte Body) = 75%). Glücklicherweise können Sie mit der Vorhersagemethode in fast allen Bibliotheken nicht den [1 xn] -Vektor, sondern die [mxn] -Matrix empfangen und dementsprechend das Ergebnis sofort für m Eingabewerte ausgeben.
Darüber hinaus ist die Numpy-Bibliothek für die Arbeit mit großen Matrizen optimiert, sodass Sie alle verfügbaren Maschinenressourcen effektiv nutzen können. Somit können wir nicht einen, sondern eine ziemlich große Anzahl von Merkmalsvektoren in einer Anforderung senden, sie in eine Numpy-Matrix der Größe [mxn] deserialisieren, klassifizieren und den Vektor [mx 1] von Booleschen oder float32-Werten zurückgeben. Infolgedessen wird die Effizienz der HTTP-Interaktion bei Verwendung einer Matrix mit 1000 Zeilen nahezu 100%. Die Größe der HTTP-Header kann in diesem Fall vernachlässigt werden.
Um den Flask-Dienst auf dem lokalen Computer zu testen, können Sie ihn über die Befehlszeile ausführen. Dieses Verfahren ist jedoch für den industriellen Einsatz völlig ungeeignet. Tatsache ist, dass Flask Single-Threaded ist. Wenn wir uns das Prozessor-Lastdiagramm ansehen, während der Dienst ausgeführt wird, werden wir feststellen, dass ein Kern zu 100% geladen ist und der Rest inaktiv ist. Glücklicherweise gibt es Möglichkeiten, alle Kernel des Computers zu verwenden: Dazu muss Flask über den uwsgi-Webanwendungsserver ausgeführt werden. Sie können die Anzahl der Prozesse und Threads optimal konfigurieren, um eine gleichmäßige Belastung aller Prozessorkerne sicherzustellen. Weitere Details zu allen Optionen zum Konfigurieren von uwsgi finden Sie
hier .
Es ist besser, nginx als HTTP-Einstiegspunkt zu verwenden, da uwsgi bei hohen Lasten instabil arbeiten kann. Nginx hingegen nimmt den gesamten Eingabestrom von Anforderungen auf sich, filtert ungültige Anforderungen heraus und dosiert die Last auf uwsgi. Nginx kommuniziert mit uwsgi über Linux-Sockets unter Verwendung einer Prozessdatei. Eine beispielhafte Nginx-Konfiguration ist unten dargestellt:
server { listen 80; server_name 127.0.0.1; location / { try_files $uri @score; } location @score { include uwsgi_params; uwsgi_pass unix:/tmp/score.sock; } }
Wie wir sehen können, stellte sich heraus, dass die Konfiguration für eine Maschine ziemlich kompliziert war. Wenn wir große Datenmengen klassifizieren müssen, wird eine große Anzahl von Anforderungen an diesen Dienst gesendet, und dies kann zu einem Engpass werden. Die Lösung für dieses Problem ist die horizontale Skalierung.
Der Einfachheit halber packen wir den Service in einen Docker-Container und stellen ihn dann auf der erforderlichen Anzahl von Computern bereit. Bei Bedarf können Sie automatisierte Bereitstellungstools wie Kubernetes verwenden. Eine beispielhafte Dockerfile-Struktur zum Erstellen eines Containers mit einem Dienst ist unten angegeben.
FROM ubuntu #Installing required ubuntu and python modules RUN apt-get update RUN apt-get -y install python3 python3-pip nginx RUN update-alternatives
Die Struktur des Dienstes für die Klassifizierung ist also wie folgt:
Abbildung 3. Serviceschema für die KlassifizierungEine kurze Zusammenfassung von Apache Spark im Hadoop-Ökosystem
Betrachten Sie nun den Prozess der Verarbeitung von in HDFS gespeicherten Daten. Wie bereits erwähnt, wird hierfür das Prinzip der Übertragung von Berechnungen auf Daten verwendet. Um mit der Verarbeitung von Aufgaben zu beginnen, müssen Sie wissen, auf welchen Computern die von uns benötigten Datenblöcke gespeichert sind, um Prozesse ausführen zu können, die direkt an der Verarbeitung beteiligt sind. Es ist auch notwendig, den Start dieser Prozesse zu koordinieren, sie im Notfall neu zu starten, gegebenenfalls die Ergebnisse verschiedener Unteraufgaben zu aggregieren usw.
All diese Aufgaben werden durch eine Vielzahl von Frameworks erfüllt, die mit dem Hadoop-Ökosystem zusammenarbeiten. Eines der beliebtesten und bequemsten ist Apache Spark. Das Hauptkonzept, auf dem das gesamte Framework basiert, ist RDD (Resilient Distributed Dataset). Im Allgemeinen kann RDD als verteilte Sammlung betrachtet werden, die tropfenresistent ist. RDD kann auf zwei Arten erhalten werden:
- Erstellung aus einer externen Quelle, z. B. einer Sammlung im Speicher, einer Datei oder einem Verzeichnis im Dateisystem usw.;
- Konvertierung von einem anderen RDD durch Anwenden von Transformationsoperationen. RDD unterstützt alle grundlegenden Vorgänge beim Arbeiten mit Sammlungen wie Map, FlatMap, Filter, GroupBy, Join usw.
Es ist wichtig zu verstehen, dass RDD im Gegensatz zu Sammlungen keine direkten Daten sind, sondern eine Folge von Operationen, die an den Daten ausgeführt werden müssen. Wenn die Transformationsoperationen aufgerufen werden, geschieht daher tatsächlich keine Arbeit, und wir erhalten nur eine neue RDD, die eine Operation mehr enthält als die vorherige. Die Arbeit selbst beginnt, wenn die sogenannten Terminaloperationen oder Aktionen aufgerufen werden. Dazu gehören das Speichern in einer Datei, das Speichern in einer Sammlung im Speicher, das Zählen der Anzahl der Elemente usw.
Beim Starten einer Terminaloperation erstellt Spark ein azyklisches Operationsdiagramm (DAG, Directed Acyclic Graph) basierend auf der resultierenden RDD und führt sie nacheinander im Cluster gemäß dem empfangenen Diagramm aus. Beim Erstellen einer DAG auf RDD-Basis führt Spark eine Reihe von Optimierungen durch, z. B. kombiniert er nach Möglichkeit mehrere aufeinanderfolgende Transformationen zu einer Operation.
RDD war die Haupteinheit für die Interaktion mit der Spark-API in Versionen von Spark 1.x. In Spark 2.x sagten die Entwickler, dass das Hauptkonzept für die Interaktion jetzt Dataset ist. Dataset ist ein Add-On für RDD mit Unterstützung für SQL-ähnliche Interaktion. Wenn Sie die Dataset-API verwenden, können Sie mit Spark eine Vielzahl von Optimierungen verwenden, einschließlich Optimierungen auf relativ niedriger Ebene. Im Allgemeinen gelten die für RDDs geltenden Grundprinzipien jedoch auch für Dataset.
Weitere Details zur Arbeit von Spark finden Sie in der
Dokumentation auf der offiziellen Website .
Betrachten wir ein Beispiel für die einfachste Klassifizierung von Spark ohne Verwendung externer Dienste. Hier wird ein ziemlich bedeutungsloser Algorithmus implementiert, der den Anteil jedes lateinischen Buchstabens im Text und dann die Standardabweichung berücksichtigt. Hierbei ist es zunächst wichtig, direkt auf die grundlegenden Schritte zu achten, die bei der Arbeit mit Spark ausgeführt werden.
case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) def std(vector: Array[Float]): Float = ???
In diesem Beispiel haben wir:
- Wir bestimmen die Struktur der Eingabe-, Zwischen- und Ausgabedaten (die Eingabedaten sind als Text definiert, dem ein bestimmter Bezeichner zugeordnet ist, die Zwischendaten stimmen mit dem Bezeichner mit dem Merkmalsvektor überein und die Ausgabe stimmt mit dem Bezeichner mit einem numerischen Wert überein).
- Wir definieren eine Funktion zum Berechnen des resultierenden Werts durch einen Merkmalsvektor (zum Beispiel Standardabweichung, Implementierung nicht gezeigt).
- Definieren Sie den ursprünglichen Datensatz als Daten, die auf HDFS im Parkettformat entlang des Pfads / Pfads / zu / Daten gespeichert sind.
- Definieren Sie einen Zwischendatensatz als Bitmap-Map aus dem ursprünglichen Datensatz.
- In ähnlicher Weise bestimmen wir den resultierenden Datensatz durch eine bitweise Transformation vom Zwischenprodukt;
- Speichern Sie den resultierenden Datensatz in HDFS im Parkettformat entlang des Pfads / Pfads / zu / Ergebnis. Da das Speichern in einer Datei eine Terminaloperation ist, werden die Berechnungen selbst genau in diesem Stadium gestartet.
Apache Spark arbeitet nach dem Prinzip des Master-Workers. Wenn die Anwendung gestartet wird, wird der Hauptprozess gestartet, der als Treiber bezeichnet wird. Es führt den Code aus, der für die Generierung der RDD verantwortlich ist, auf deren Grundlage die Berechnungen durchgeführt werden.
Wenn eine Terminaloperation aufgerufen wird, generiert der Treiber eine DAG basierend auf der resultierenden RDD. Anschließend leitet der Treiber den Start von Workflows ein, die als Executoren bezeichnet werden und in denen Daten direkt verarbeitet werden. Nach dem Starten von Workflows übergibt der Treiber ihnen den ausführbaren Block, der ausgeführt werden muss, und gibt auch an, auf welchen Teil der Daten er angewendet werden muss.
Unten finden Sie den Code unseres Beispiels, in dem die Codeabschnitte hervorgehoben werden, die auf dem Executor ausgeführt werden (zwischen den Zeilen Beginn des Executor-Teils und Ende des Executor-Teils). Der Rest des Codes wird auf dem Treiber ausgeführt.
case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) def std(vector: Array[Float]): Float = ??? val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] val result: Dataset[Score] = ds.map {
Im Hadoop-Ökosystem werden alle Anwendungen in Containern ausgeführt. Ein Container ist ein Prozess, der auf einem der Computer in einem Cluster ausgeführt wird und dem eine bestimmte Menge an Ressourcen zugewiesen ist. Der Start des Containers wird vom YARN Resource Manager durchgeführt. Es bestimmt, welche der Maschinen über eine ausreichende Anzahl von Prozessorkernen und RAM verfügt und ob sie die für die Verarbeitung erforderlichen Datenblöcke enthält.
Beim Starten der Spark-Anwendung erstellt YARN den Container und führt ihn auf einem der Cluster-Computer aus, auf denen der Treiber gestartet wird. Wenn der Treiber die DAG auf Vorgänge vorbereitet, die auf den Ausführenden ausgeführt werden müssen, startet YARN zusätzliche Container auf den gewünschten Computern.
In der Regel reicht es aus, wenn der Treiber einen Kern und eine kleine Menge Speicher reserviert (es sei denn, das Berechnungsergebnis wird dann nicht auf dem Treiber im Speicher zusammengefasst). Für Ausführende kann zur Optimierung der Ressourcen und zur Reduzierung der Gesamtzahl der Prozesse im System mehr als ein Kern unterschieden werden. In diesem Fall kann der Ausführende mehrere Aufgaben gleichzeitig ausführen.
Hier ist es jedoch wichtig zu verstehen, dass YARN im Falle eines Ausfalls einer der im Container ausgeführten Aufgaben oder bei unzureichenden Ressourcen möglicherweise beschließt, den Container zu stoppen, und dass alle darin ausgeführten Aufgaben auf einem anderen Künstler erneut gestartet werden müssen. Wenn wir eine ausreichend große Anzahl von Kernen pro Container zuweisen, ist es außerdem wahrscheinlich, dass YARN nicht in der Lage ist, diese zu starten. Wenn wir beispielsweise zwei Maschinen haben, auf denen zwei Kerne nicht verwendet werden, können wir mit jedem Container beginnen, für den zwei Kerne erforderlich sind, aber nicht mit einem Container, für den vier Kerne erforderlich sind.
Nun wollen wir sehen, wie der Code aus unserem Beispiel direkt auf dem Cluster ausgeführt wird. Stellen Sie sich vor, die Größe der Quelldaten beträgt 2 Terabyte. Wenn die Blockgröße in HDFS 128 Megabyte beträgt, sind dementsprechend insgesamt 16384 Blöcke vorhanden. Jeder Block wird auf mehrere Computer repliziert, um die Zuverlässigkeit sicherzustellen. Der Einfachheit halber nehmen wir den Replikationsfaktor gleich zwei, dh es sind insgesamt 32768 Blöcke verfügbar. Angenommen, wir verwenden einen Cluster von 16 Maschinen als Speicher. Dementsprechend gibt es auf jeder der Maschinen im Falle einer gleichmäßigen Verteilung ungefähr 2048 Blöcke oder 256 Gigabyte pro Maschine. Auf jedem Computer befinden sich 8 Prozessorkerne und 64 Gigabyte RAM.
Für unsere Aufgabe benötigt der Treiber nicht viele Ressourcen, daher werden wir ihm 1 Kern und 1 GB Speicher zuweisen. Wir werden den Darstellern 2 Kerne und 4 GB Speicher geben. Angenommen, wir möchten die Verwendung von Clusterressourcen maximieren. Somit erhalten wir 64 Container: einen für den Fahrer und 63 für die Darsteller.
Abbildung 4. Auf dem Datenknoten ausgeführte Prozesse und die von ihnen verwendeten Ressourcen.Da wir in unserem Fall nur Kartenoperationen verwenden, besteht unsere DAG aus einer Operation. Es besteht aus folgenden Aktionen:
- Nehmen Sie einen Datenblock von der lokalen Festplatte.
- Daten konvertieren
- Speichern Sie das Ergebnis in einem neuen Block auf Ihrer eigenen lokalen Festplatte.
Insgesamt müssen 16384 Blöcke verarbeitet werden, sodass jeder Executor 16384 / (63 Executors * 2 Kerne) = 130 Operationen ausführen muss. Somit sieht der Lebenszyklus des Executors als separater Prozess (falls alles ohne Stürze abläuft) wie folgt aus.
- Containerstart.
- Empfangen einer Aufgabe vom Fahrer, in der eine Blockkennung und die erforderliche Operation vorhanden sind. Da wir dem Container zwei Kerne zugewiesen haben, erhält der Executor zwei Aufgaben gleichzeitig.
- Ausführen einer Aufgabe und Senden des Ergebnisses an den Treiber.
- Abrufen der nächsten Aufgabe vom Treiber und Wiederholen der Schritte 2 und 3, bis alle Blöcke für diesen lokalen Computer verarbeitet sind.
- Container Stop
Hinweis : Komplexere DAGs werden erhalten, wenn es erforderlich ist, Zwischendaten zwischen Maschinen neu zu verteilen, normalerweise für Gruppierungsvorgänge (groupBy, reductByKey usw.) und Verbindungen (join), deren Berücksichtigung den Rahmen dieses Artikels sprengt.
Die Hauptprobleme der Interaktion zwischen Apache Spark und externen Diensten
Wenn wir im Rahmen der Kartenoperation auf einen externen Dienst zugreifen müssen, wird die Aufgabe weniger trivial. Angenommen, ein Objekt der ExternalServiceClient-Klasse ist für die Interaktion mit einem externen Dienst verantwortlich. Im Allgemeinen müssen wir es vor Beginn der Arbeit initialisieren und dann nach Bedarf aufrufen:
val client = ExternalServiceClient.create()
Normalerweise dauert die Client-Initialisierung einige Zeit. Daher wird sie in der Regel beim Start der Anwendung initialisiert und dann zum Abrufen einer Client-Instanz aus einem globalen Kontext oder Pool verwendet. Wenn ein Container mit Spark Executor eine Aufgabe empfängt, die eine Interaktion mit einem externen Dienst erfordert, ist es daher hilfreich, einen bereits initialisierten Client zu erhalten, bevor Sie mit der Arbeit am Datenarray beginnen, und ihn dann für jedes Element wiederzuverwenden.
In Spark gibt es zwei Möglichkeiten, dies zu tun. Wenn der Client serialisierbar ist (der Client selbst und alle seine Felder müssen die Schnittstelle java.io.Serializable erweitern), kann er zunächst auf dem Treiber initialisiert und dann
über den Broadcast-Variablenmechanismus an die Ausführenden übergeben werden .
val client = ExternalServiceClient.create() val clientBroadcast = sparkContext.broadcast(client) ds.map { f: Features => val score = clientBroadcast.value.score(f.vector) Score(f.id, score) }
Für den Fall, dass der Client nicht serialisierbar ist oder die Initialisierung des Clients ein Prozess ist, der von den Einstellungen des jeweiligen Computers abhängt, auf dem er ausgeführt wird (zum Ausgleich müssen beispielsweise Anforderungen von einem Teil des Computers an den ersten Servicemaschinen und vom anderen an den zweiten gesendet werden). dann kann der Client direkt auf dem Executor initialisiert werden.
Zu diesem Zweck verfügt RDD (und Dataset) über eine mapPartitions-Operation, bei der es sich um eine verallgemeinerte Version der Map-Operation handelt (wenn Sie sich den Quellcode der RDD-Klasse ansehen, wird die Map-Operation über mapPartitions implementiert). Die an die Operation mapPartitions übergebene Funktion wird für jeden Block einmal ausgeführt. , , , :
ds.mapPartitions {fi: Iterator[Features] => val client = ExternalServiceClient.create() fi.map { f: Features => val score = client.score(f.vector) Score(f.id, score) } }
. , , , , . , , , .
. , hasNext next:
while (i.hasNext()) { val item = i.next() … }
, , . , 8 , YARN 4 2 , , 8 . , . .
. , , , , . : , , . , hasNext , . (, , ) , , , . ,
.
5. , , mapPartitions, . ., , . , , , .
6., , , -, , , , -, , .
, . , . , . , . , , , , , , .
.
- , , , .
- , , . , . , .
- , hasNext false, , , , . : hasNext = false, , , . , , , .
,
. Stay tuned!