Klassifizieren Sie große Datenmengen in Apache Spark mithilfe beliebiger Modelle für maschinelles Lernen

Teil 2: Lösung


Hallo nochmal! Heute werde ich meine Geschichte darüber fortsetzen, wie wir große Datenmengen in Apache Spark mithilfe beliebiger Modelle für maschinelles Lernen klassifizieren. Im ersten Teil des Artikels haben wir die Darstellung des Problems selbst sowie die Hauptprobleme untersucht, die bei der Organisation der Interaktion zwischen dem Cluster, in dem die Anfangsdaten gespeichert und verarbeitet werden, und dem externen Klassifizierungsdienst auftreten. Im zweiten Teil werden wir eine der Optionen zur Lösung dieses Problems mithilfe des Reactive Streams-Ansatzes und seiner Implementierung mithilfe der akka-Streams-Bibliothek betrachten.


Konzept für reaktive Streams


Um die im ersten Teil beschriebenen Probleme zu lösen, können Sie den Ansatz Reactive Streams verwenden . Sie können damit den Prozess der Übertragung von Datenströmen zwischen Verarbeitungsstufen steuern, die mit unterschiedlichen Geschwindigkeiten und unabhängig voneinander arbeiten, ohne dass eine Pufferung erforderlich ist. Wenn eine der Verarbeitungsstufen langsamer als die vorherige ist, muss der schnelleren Stufe signalisiert werden, wie viele Eingabedaten derzeit verarbeitet werden können. Diese Wechselwirkung nennt man Gegendruck. Es besteht in der Tatsache, dass die schnelleren Stufen genau so viele Elemente verarbeiten, wie für die langsamere Stufe erforderlich sind, und nicht mehr, und dann Rechenressourcen freisetzen.

Im Allgemeinen ist Reactive Streams eine Spezifikation für die Implementierung der Publisher-Subscriber- Vorlage. Diese Spezifikation definiert einen Satz von vier Schnittstellen (Publisher, Subscriber, Processor und Subscription) und einen Vertrag für deren Methoden.

Betrachten wir diese Schnittstellen genauer:

public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } 

Das Publisher-Subscriber-Modell hat zwei Seiten: Senden und Empfangen. Bei der Implementierung von Reactive Streams ist die Klasse, die die Publisher-Schnittstelle implementiert, für die Datenübertragung und der Abonnent für den Empfang verantwortlich. Um die Kommunikation zwischen ihnen herzustellen, muss der Abonnent durch Aufrufen seiner Abonnementmethode bei Publisher registriert sein. Gemäß der Spezifikation muss der Publisher nach der Registrierung eines Abonnenten seine Methoden in der folgenden Reihenfolge aufrufen:

  1. onSubscribe. Diese Methode wird unmittelbar nach der Registrierung des Abonnenten bei Publisher aufgerufen. Als Parameter wird ein Abonnementobjekt an dieses übergeben, über das der Abonnent Daten vom Publisher anfordert. Dieses Objekt sollte nur im Kontext dieses Abonnenten gespeichert und aufgerufen werden.
  2. Nachdem der Abonnent Daten von Publisher angefordert hat, indem er die Anforderungsmethode für das entsprechende Abonnementobjekt aufruft, kann Publisher die Methode Subscriber onNext aufrufen und das nächste Element übergeben.
  3. Der Abonnent kann dann die Anforderungsmethode für das Abonnement regelmäßig aufrufen, aber der Publisher kann die onNext-Methode nicht mehr als die über die Anforderungsmethode angeforderte Summe aufrufen.
  4. Wenn der Datenstrom endlich ist, muss Publisher nach dem Übergeben aller Elemente durch die onNext-Methode die onComplete-Methode aufrufen.
  5. Wenn in Publisher ein Fehler aufgetreten ist und eine weitere Verarbeitung der Elemente nicht möglich ist, sollte die onError-Methode aufgerufen werden
  6. Nach dem Aufrufen der Methoden onComplete oder onError sollte die weitere Interaktion des Publishers mit dem Abonnenten ausgeschlossen werden.

Methodenaufrufe können als Sendesignale zwischen Publisher und Subscriber betrachtet werden. Der Abonnent signalisiert dem Publisher, wie viele Elemente verarbeitet werden können, und der Publisher signalisiert ihm wiederum, dass entweder das nächste Element vorhanden ist oder keine weiteren Elemente vorhanden sind oder ein Fehler aufgetreten ist.

Um einen weiteren Einfluss von Publisher und Subscriber auf einander auszuschließen, dürfen Aufrufe aller Methoden, die Reactive Streams-Schnittstellen implementieren, nicht blockierend sein. In diesem Fall ist die Interaktion zwischen ihnen vollständig asynchron.

Weitere Details zur Spezifikation für Reactive Streams-Schnittstellen finden Sie hier .

Durch Verknüpfen des ursprünglichen und der resultierenden Iteratoren durch Konvertieren in Publisher bzw. Subscriber können wir die im vorherigen Teil des Artikels identifizierten Probleme lösen. Das Problem des Pufferüberlaufs zwischen den Stufen wird gelöst, indem eine bestimmte Anzahl von Elementen vom Abonnenten angefordert wird. Das Problem des erfolgreichen oder erfolglosen Abschlusses wird gelöst, indem Signale über die Methoden onComplete bzw. onError an den Abonnenten gesendet werden. Der Publisher ist für das Senden dieser Signale verantwortlich. In unserem Fall muss gesteuert werden, wie viele HTTP-Anforderungen gesendet wurden und wie viele von ihnen Antworten erhalten haben. Nachdem die letzte Antwort empfangen und alle darin enthaltenen Ergebnisse verarbeitet wurden, sollte ein onComplete-Signal gesendet werden. Falls eine der Anforderungen fehlschlägt, sollte sie ein onError-Signal senden und das Senden weiterer Elemente an den Abonnenten beenden sowie Elemente vom ursprünglichen Iterator subtrahieren.

Der resultierende Iterator sollte als Abonnent implementiert werden. In diesem Fall können wir nicht auf einen Puffer verzichten, in den Elemente geschrieben werden, wenn die onNext-Methode von der Subscriber-Schnittstelle aufgerufen und mit der hasNext- und der next-Methode von der Iterator-Schnittstelle subtrahiert wird. Als Pufferimplementierung können Sie eine Blockierungswarteschlange verwenden, z. B. LinkedBlockedQueue.

Ein aufmerksamer Leser wird sofort die Frage stellen: Warum befindet sich die Blockierungswarteschlange, da gemäß der Reactive Streams-Spezifikation die Implementierung aller Methoden nicht blockierend sein sollte? Aber hier ist alles in Ordnung: Da wir Publisher nach einer genau definierten Anzahl von Elementen fragen, wird die onNext-Methode nicht öfter aufgerufen, und die Warteschlange kann jederzeit ein neues Element hinzufügen, ohne es zu blockieren.

Andererseits kann es zu einer Blockierung kommen, wenn die hasNext-Methode im Falle einer leeren Warteschlange aufgerufen wird. Dies ist jedoch in Ordnung: Die hasNext-Methode ist nicht Bestandteil des Vertrags der Subscriber-Schnittstelle, sondern wird in der Iterator-Schnittstelle definiert, die, wie bereits erläutert, eine blockierende Datenstruktur darstellt. Wenn wir die nächste Methode aufrufen, subtrahieren wir das nächste Element von der Warteschlange, und wenn seine Größe einen bestimmten Schwellenwert unterschreitet, müssen wir den nächsten Teil der Elemente durch einen Aufruf der Anforderungsmethode anfordern.

Abbildung 7. Asynchrone Interaktion mit einem externen Dienst unter Verwendung des Reactive Streams-Ansatzes

In diesem Fall werden wir blockierende Anrufe natürlich nicht vollständig los. Dies wird durch eine Nichtübereinstimmung der Paradigmen zwischen reaktiven Streams, die eine vollständig asynchrone Interaktion voraussetzen, und einem Iterator verursacht, der beim Aufrufen der hasNext-Methode trueN oder false aufrufen muss. Im Gegensatz zur synchronen Interaktion mit einem externen Dienst können Ausfallzeiten aufgrund von Sperren durch Erhöhen der Gesamtlast der Prozessorkerne erheblich reduziert werden.

Es wäre praktisch, wenn Apache Spark-Entwickler in zukünftigen Versionen ein Analogon der mapPartitions-Methode implementieren würden, die mit Publisher und Subscriber funktioniert. Dies würde eine vollständig asynchrone Interaktion ermöglichen und somit die Möglichkeit des Blockierens von Threads ausschließen.

Akka-Streams und Akka-http als Implementierung der Reactive Streams-Spezifikation


Derzeit gibt es bereits mehr als ein Dutzend Implementierungen der Reactive Streams-Spezifikation. Eine solche Implementierung ist das Akka-Streams-Modul aus der Akka- Bibliothek. In der Welt von JVM hat sich akka als eines der effektivsten Mittel zum Schreiben paralleler und verteilter Systeme etabliert. Dies wird durch die Tatsache erreicht, dass das Grundprinzip, das in seiner Grundlage festgelegt ist, das Akteurmodell ist , mit dem Sie wettbewerbsfähige Anwendungen ohne direkte Kontrolle über Threads und deren Pools schreiben können.

Es wurde viel Literatur über die Umsetzung des Konzepts der Akteure in Akka geschrieben, daher werden wir hier nicht aufhören (die offizielle Website von Akka ist eine sehr gute Informationsquelle, ich empfehle auch die Akka im Aktionsbuch ). Hier werden wir uns die technologische Seite der Implementierung im Rahmen der JVM genauer ansehen.

Im Allgemeinen existieren Akteure nicht für sich, sondern bilden ein hierarchisches System. Um ein Akteursystem zu erstellen, müssen Sie Ressourcen dafür zuweisen. Der erste Schritt bei der Arbeit mit akka besteht darin, eine Instanz des ActorSystem-Objekts zu erstellen. Beim Start von ActorSystem wird ein separater Thread-Pool namens Dispatcher erstellt, in dem der gesamte in den Akteuren definierte Code ausgeführt wird. In der Regel führt ein einzelner Thread den Code mehrerer Akteure aus. Bei Bedarf können Sie jedoch einen separaten Dispatcher für eine bestimmte Gruppe von Akteuren konfigurieren (z. B. für Akteure, die direkt mit einer blockierenden API interagieren).

Eine der häufigsten Aufgaben, die mit Akteuren gelöst werden, ist die sequentielle Verarbeitung von Datenströmen. Zuvor war es dafür erforderlich, Ketten von Akteuren manuell zu erstellen und sicherzustellen, dass keine Engpässe zwischen ihnen bestehen (wenn beispielsweise ein Akteur Nachrichten schneller als der nächste verarbeitet, kann es zu einem Überlauf der Warteschlange für eingehende Nachrichten kommen, was zu einem OutOfMemoryError-Fehler führt).

Ab Version 2.4 wurde akka das Modul akka-streams hinzugefügt, mit dem Sie den Datenverarbeitungsprozess deklarativ definieren und anschließend die erforderlichen Akteure für seine Ausführung erstellen können. Akka-Streams implementiert auch das Prinzip des Gegendrucks, wodurch die Möglichkeit eines Überlaufens der Warteschlange eingehender Nachrichten für alle an der Verarbeitung beteiligten Akteure ausgeschlossen wird.

Die Hauptelemente zum Definieren des Datenflussverarbeitungsschemas in Akka-Streams sind Source, Flow und Sink. Durch die Kombination erhalten wir ein ausführbares Diagramm. Um den Verarbeitungsprozess zu starten, wird ein Materializer verwendet, der Akteure erstellt, die gemäß dem von uns definierten Diagramm (der Materializer-Schnittstelle und ihrer Implementierung ActorMaterializer) arbeiten.

Betrachten wir die Phasen Quelle, Fluss und Senke genauer. Quelle definiert die Datenquelle. Akka-Streams unterstützt über ein Dutzend verschiedene Möglichkeiten zum Erstellen von Quellen, einschließlich eines Iterators:

 val featuresSource: Source[Array[Float], NotUsed] = Source.fromIterator { () => featuresIterator } 

Die Quelle kann auch durch Konvertieren einer vorhandenen Quelle erhalten werden:

 val newSource: Source[String, NotUsed] = source.map(item => transform(item)) 

Wenn die Transformation eine nicht triviale Operation ist, kann sie als Flow-Entität dargestellt werden. Akka-Streams unterstützen viele verschiedene Möglichkeiten, um Flow zu erstellen. Der einfachste Weg ist, aus einer Funktion zu erstellen:

 val someFlow: Flow[String, Int, NotUsed] = Flow.fromFunction((x: String) => x.length) 

Durch die Kombination von Quelle und Fluss erhalten wir eine neue Quelle.

 val newSource: Source[Int, NotUsed] = oldSource.via(someFlow) 

Die Spüle wird als letzte Stufe der Datenverarbeitung verwendet. Wie im Fall von Source bietet akka-stream mehr als ein Dutzend verschiedene Sink-Optionen. Beispielsweise führt Sink.foreach für jedes Element eine bestimmte Operation aus, Sink.seq sammelt alle Elemente in einer Sammlung usw.

 val printSink: Sink[Any, Future[Done]] = Sink.foreach(println) 

Quelle, Durchfluss und Senke werden durch die Arten von Eingabe- und / oder Ausgabeelementen parametrisiert. Darüber hinaus kann jede Verarbeitungsstufe ein Ergebnis ihrer Arbeit haben. Zu diesem Zweck werden Quelle, Durchfluss und Senke auch durch einen zusätzlichen Typ parametrisiert, der das Ergebnis der Operation bestimmt. Dieser Typ wird als Typ des materialisierten Werts bezeichnet. Wenn die Operation nicht das Vorhandensein eines zusätzlichen Ergebnisses ihrer Arbeit impliziert, z. B. wenn wir den Durchfluss durch eine Funktion definieren, wird der Typ NotUsed als materialisierter Wert verwendet.

Durch die Kombination von Source, Flow und Sink erhalten wir RunnableGraph. Es wird durch einen Typ parametrisiert, der den Werttyp bestimmt, der als Ergebnis der Ausführung dieses Diagramms erhalten wird. Bei Bedarf können Sie beim Kombinieren der Stufen angeben, welches der Stufen das Ergebnis des gesamten Operationsdiagramms sein soll. Standardmäßig wird das Ergebnis der Quellstufe verwendet:

 val graph: RunnableGraph[NotUsed] = someSource.to(Sink.foreach(println)) 

Wenn jedoch das Ergebnis der Sink-Phase für uns wichtiger ist, müssen wir dies ausdrücklich angeben:

 val graph: RunnableGraph[Future[Done]] = someSource.toMat(Sink.foreach(println))(Keep.right) 

Nachdem wir den Operationsgraphen definiert haben, müssen wir ihn ausführen. Dazu muss runnableGraph die run-Methode aufrufen. Als Parameter verwendet diese Methode ein ActorMaterializer-Objekt (das sich auch in einem impliziten Bereich befinden kann), das für die Erstellung von Akteuren verantwortlich ist, die Operationen ausführen. In der Regel wird ein ActorMaterializer unmittelbar nach der Erstellung eines ActorSystems erstellt, das an seinen Lebenszyklus angehängt ist, und verwendet es zum Erstellen von Akteuren. Betrachten Sie ein Beispiel:

 //   ActorSystem,       implicit val system = ActorSystem(“system name”) // ,       ActorSystem implicit val materializer = ActorMaterializer() //    ,       Sink val graph: RunnableGraph[Future[immutable.Seq[Int]]] = Source.fromIterator(() => (1 to 10).iterator).toMat(Sink.seq)(Keep.right) //   ,    implicit scope. val result: Future[immutable.Seq[Int]] = graph.run() 

Bei einfachen Kombinationen können Sie auf einen separaten RunnableGraph verzichten, aber einfach Source mit Sink verbinden und diese starten, indem Sie die runWith-Methode für Source aufrufen. Diese Methode setzt auch voraus, dass ein ActorMaterializer-Objekt im impliziten Bereich vorhanden ist. In diesem Fall wird außerdem der in Sink definierte materialisierte Wert verwendet. Mit dem folgenden Code können wir beispielsweise Source in Publisher aus der Reactive Streams-Spezifikation konvertieren:

 val source: Source[Score, NotUsed] = Source.fromIterator(() => sourceIterator).map(item => transform(item)) val publisher: Publisher[Score] = source.runWith(Sink.asPublisher(false)) 

Nun haben wir gezeigt, wie Sie Reactive Streams Publisher erhalten können, indem Sie eine Quelle aus dem Quelliterator erstellen und einige Transformationen an seinen Elementen durchführen. Jetzt können wir es einem Abonnenten zuordnen, der dem resultierenden Iterator Daten liefert. Es bleibt die letzte Frage zu prüfen: Wie organisiert man die HTTP-Interaktion mit einem externen Dienst?

Die Struktur von akka umfasst das akka-http- Modul, mit dem Sie die asynchrone, nicht blockierende Kommunikation über HTTP organisieren können. Darüber hinaus basiert dieses Modul auf Akka-Streams, mit denen Sie die HTTP-Interaktion als zusätzlichen Schritt im Diagramm der Datenflussverarbeitungsvorgänge hinzufügen können.

Um eine Verbindung zu externen Diensten herzustellen, bietet akka-http drei verschiedene Schnittstellen.

  1. API auf Anforderungsebene - ist die einfachste Option für den Fall einzelner Anforderungen an einen beliebigen Computer. Auf dieser Ebene werden HTTP-Verbindungen vollständig automatisch verwaltet, und bei jeder Anforderung muss die vollständige Adresse des Computers übertragen werden, an den die Anforderung gesendet wird.
  2. Host-Level-API - geeignet, wenn wir wissen, auf welchen Port auf welchem ​​Computer wir zugreifen werden. In diesem Fall übernimmt akka-http die Kontrolle über den Pool von HTTP-Verbindungen. Bei Anforderungen reicht es aus, den relativen Pfad zur angeforderten Ressource anzugeben.
  3. API auf Verbindungsebene - Ermöglicht Ihnen die vollständige Kontrolle über die Verwaltung von HTTP-Verbindungen, dh das Öffnen, Schließen und Verteilen von Anforderungen über Verbindungen.

In unserem Fall ist uns die Adresse des Klassifizierungsdienstes im Voraus bekannt, daher ist es erforderlich, die HTTP-Interaktion nur mit diesem bestimmten Computer zu organisieren. Daher ist die Host-Level-API für uns am besten geeignet. Lassen Sie uns nun sehen, wie der Pool von HTTP-Verbindungen bei Verwendung erstellt wird:

 val httpFlow: Flow[(HttpRequest,Id), (Try[HttpResponse],Id), Http.HostConnectionPool] = Http().cachedHostConnectionPool[Id](hostAddress, portNumber) 

Beim Aufruf von Http (). CachedHostConnectionPool [T] (hostAddress, portNumber) im ActorSystem, das sich in einem impliziten Bereich befindet, werden Ressourcen zugewiesen, um einen Verbindungspool zu erstellen, aber die Verbindungen selbst werden nicht hergestellt. Als Ergebnis dieses Aufrufs wird Flow zurückgegeben, der ein Paar einer HTTP-Anforderung und eines ID-Identifikationsobjekts als Eingabe empfängt. Das Identifikationsobjekt wird benötigt, um die Anforderung mit der entsprechenden Antwort abzugleichen, da der akka-http-HTTP-Aufruf eine asynchrone Operation ist und die Reihenfolge, in der die Antworten empfangen werden, nicht unbedingt der Reihenfolge entspricht, in der die Anforderungen gesendet werden. Daher gibt Flow am Ausgang ein paar Ergebnisse der Abfrage und das entsprechende Identifikationsobjekt aus.

Direkt werden HTTP-Verbindungen hergestellt, wenn ein Diagramm (einschließlich dieses Flusses) gestartet (materialisiert) wird. Akka-http ist so implementiert, dass innerhalb eines ActorSystems immer ein gemeinsamer Pool von HTTP-Verbindungen vorhanden ist, der von allen Materialisierungen verwendet wird, unabhängig davon, wie oft die Diagramme mit httpFlow materialisiert wurden. Auf diese Weise können Sie die Verwendung von Netzwerkressourcen besser steuern und eine Überlastung vermeiden.

Somit ist der Lebenszyklus des HTTP-Verbindungspools an das ActorSystem gebunden. Wie bereits erwähnt, ist auch der Lebenszyklus des Thread-Pools damit verbunden, in dem die in den Akteuren definierten Operationen (oder in unserem Fall als die Stufen akka-Streams und akka-http definiert) ausgeführt werden. Um maximale Effizienz zu erzielen, müssen wir daher eine Instanz von ActorSystem innerhalb desselben JVM-Prozesses wiederverwenden.

Alles zusammen: Ein Beispiel für die Implementierung der Interaktion mit dem Klassifizierungsdienst


Jetzt können wir mit der Klassifizierung großer Mengen verteilter Daten in Apache Spark mithilfe der asynchronen Interaktion mit externen Diensten fortfahren. Das allgemeine Schema dieser Wechselwirkung wurde bereits in Abbildung 7 gezeigt.

Angenommen, wir haben einen anfänglichen Datensatz [Features] definiert. Wenn Sie die mapPartitions-Operation darauf anwenden, sollten Sie einen Datensatz erhalten, in dem jede ID aus dem Quellensatz mit einem bestimmten Wert versehen ist, der als Ergebnis der Klassifizierung erhalten wurde (Datensatz [Score]). Um die asynchrone Verarbeitung auf Executoren zu organisieren, müssen wir die Quelle und die resultierenden Iteratoren in Publisher bzw. Subscriber aus der Reactive Streams-Spezifikation umschließen und miteinander verknüpfen.

 case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) val batchesRequestCount = config.getInt(“scoreService. batchesRequestCount”)//(2) //... val scoreDs: Dataset[Score] = featuresDs.mapPartitions { fi: Iterator[Features] => val publisher: Publisher[Iterable[Score]] = createPublisher(fi) //(3) val iteratorSubscriber: Iterator[Score] = new IteratorSubscriber(batchesRequestCount) //(4) publisher.subscribe(batchesRequestCount) //(5) iteratorSubscriber //(6) } 

Bei dieser Implementierung wird berücksichtigt, dass der Klassifizierungsdienst für einen Aufruf eine Gruppe von Merkmalsvektoren gleichzeitig verarbeiten kann. Daher ist das Klassifizierungsergebnis nach einem Aufruf auch sofort für die gesamte Gruppe verfügbar. Daher haben wir als Parametertyp für Publisher nicht nur Score, wie Sie vielleicht erwarten, sondern Iterable [Score]. Daher senden wir die Klassifizierungsergebnisse für diese Gruppe durch einen einzelnen Aufruf der onNext-Methode an den resultierenden Iterator (der auch ein Abonnent ist). Dies ist viel effizienter als der Aufruf von onNext für jedes Element. Jetzt werden wir diesen Code genauer analysieren.

  1. Wir bestimmen die Struktur der Eingabe- und Ausgabedaten. Als Eingabe haben wir eine Reihe von ID-Bezeichnern mit einem Merkmalsvektor, und als Ausgabe haben wir eine Reihe von Bezeichnern mit einem numerischen Wert, der als Ergebnis der Klassifizierung erhalten wird.
  2. Wir bestimmen die Anzahl der Gruppen, die der Abonnent gleichzeitig vom Publisher anfordert. Da davon ausgegangen wird, dass diese Werte im Puffer liegen und warten, bis sie aus dem resultierenden Iterator gelesen werden, hängt dieser Wert von der dem Executor zugewiesenen Speichermenge ab.
  3. Erstellen Sie Publisher aus dem Quelliterator. Er ist für die Interaktion mit dem Klassifizierungsdienst verantwortlich. Die Funktion createPublisher wird unten erläutert.
  4. Erstellen Sie einen Abonnenten, der der resultierende Iterator ist. Der IteratorSubscriber-Klassencode ist ebenfalls unten angegeben.
  5. Abonnenten beim Publisher registrieren.
  6. Gibt IteratorSubscriber als Ergebnis der mapPartitions-Operation zurück.

Betrachten Sie nun die Implementierung der Funktion createPublisher.

 type Ids = Seq[String] //(1) val batchSize = config.getInt("scoreService.batchSize") val parallelismLevel = config.getInt("scoreService.parallelismLevel") //(2) //... def createPublisher(fi: Iterator[Features]): Publisher[Iterable[Score]] = { import ActorSystemHolder._ //(3) Source .fromIterator(() => fi) //(4) .grouped(batchSize) //(5) .map { groupedFeatures: Seq[Features] => val request: (HttpRequest, Ids) = createHttpRequest(groupedFeatures) //(6) logger.debug(s"Sending request for the first id: ${request._2(0)}") request } .via(httpFlow) //(7) .flatMapMerge(parallelismLevel, { //(8) case (Success(response), ids) if response.status.isSuccess() => logger.debug(s"Processing successful result for the first id: ${ids(0)}") val resultSource: Source[Iterable[Score], _] = response.entity.dataBytes.reduce(_ ++ _).map { responseBytes => processSuccessfulResponse(responseBytes, ids) } //(9) resultScore case (Success(response), ids) => logger.warn( s"Failed result for the first id: ${ids(0)}, HTTP status: ${response.status}" ) response.discardEntityBytes() Source.failed( new IOException(s"Non-successful HTTP status: ${response.status}") ) //(10) case (Failure(ex), ids) => logger.warn(s"Failed result: an exception has occured", ex) Source.failed(ex) //(11) }) .runWith(Sink.asPublisher(false)) //(12) } def createHttpRequest(featuresSeq: Seq[Features]): (HttpRequest, ProfileIds) = { val requestBytes: Array[Byte] = featuresToMatrixBytes(featuresSeq) val ids: ProfileIds = extractIds(featuresSeq) val httpRequest = HttpRequest( method = HttpMethods.PUT, uri = "/score", entity = requestBytes ) httpRequest -> ids } 

  1. - , . httpFlow, .
  2. : , (batchSize) (parallelismLevel).
  3. implicit scope ActorSystem, ActorMaterializer httpFlow. Spark-. ActorSystemHolder .
  4. akka-streams . Source[Features] .
  5. batchSize .
  6. HttpRequest . HttpRequest createHttpRequest. createPublisher. feature-, , ( predict). , HTTP-. , HTTP-, HTTP-, URI .
  7. httpFlow.
  8. , . flatMapMerge, akka-http Source[ByteString], , . . parallelismLevel , ( ). HTTP-: , , , .
  9. : . akka ByteString. , ByteString O(1), ByteString . , , . , .
  10. HTTP- , Stream . , discardEntityBytes , , .
  11. . akka-http , .
  12. , Publisher, . , . false Sink.asPublisher , Publisher Subscriber-.

, akka ActorSystem, . , Spark , . Spark JVM , , , ActorSystem ActorMatrializer httpFlow.

 object ActorSystemHolder { implicit lazy val actorSystem: ActorSystem = { //(1) val actorSystemName = s"score-service-client" logger.debug(s"Creating actor system $actorSystemName") val as = ActorSystem(actorSystemName) //(2) logger.debug("Adding shutdown hook for the actor system") scala.sys.addShutdownHook { //(3) logger.debug(s"Terminating actor system $actorSystemName") Await.result(as.terminate(), 30.seconds) //to Mars :) logger.debug(s"The actor system $actorSystemName has been terminated") } as } implicit lazy val materializer: ActorMaterializer = { //(4) logger.debug(s"Creating actor materializer for actor system ${actorSystem.name}") ActorMaterializer() } lazy val httpFlow: Flow[ (HttpRequest,ProfileIds), (Try[HttpResponse], ProfileIds), Http.HostConnectionPool] = { //(5) val httpFlowSettings = ConnectionPoolSettings(actorSystem) logger.debug(s"Creating http flow with settings $httpFlowSettings") Http().cachedHostConnectionPool[ProfileIds]( config.getString("scoreService.host"), config.getInt("scoreService.int"), settings = httpFlowSettings ) } } 

  1. , , , .
  2. ActorSystem .
  3. , , ActorSystem, terminate, , , , . , JVM-.
  4. ActorMaterializer, akka-streams, ActorSystem.
  5. , httpFlow . , HTTP- ActorSystem.

Subscriber- HTTP-.

 sealed trait QueueItem[+T] case class Item[+T](item: T) extends QueueItem[T] case object Done extends QueueItem[Nothing] case class Failure(cause: Throwable) extends QueueItem[Nothing] //(1) class StreamErrorCompletionException(cause: Throwable) extends Exception(cause) //(2) class IteratorSubscriber[T](requestSize: Int) extends Subscriber[Iterable[T]] with Iterator[T] { //(3) private val buffer: BlockingQueue[QueueItem[Iterable[T]]] = new LinkedBlockingQueue[QueueItem[Iterable[T]]]() //(4) private var expecting: Int = 0 //(5) private val subscriptionPromise: Promise[Subscription] = Promise() private lazy val subscription: Subscription = Await.result(subscriptionPromise.future, 5.minutes) //(6) private var currentIterator: Iterator[T] = Iterator.empty //(7) private var isDone = false //(8) override def onSubscribe(s: Subscription): Unit = { subscriptionPromise.success(s) //(9) logger.trace("The iterator has been subscribed") } override def onNext(t: Iterable[T]): Unit = { logger.trace("Putting a next batch to the buffer") buffer.put(Item(t)) //(10) } override def onComplete(): Unit = { logger.debug("The stream has been succesfully completed") buffer.put(Done) //(11) } override def onError(t: Throwable): Unit = { logger.warn("The stream has been completed with error", t) buffer.put(Failure(t)) //(12) } override def hasNext: Boolean = { logger.trace("Asking hasNext") if (currentIterator.hasNext) { //(13) true } else if (isDone) { //(14) false } else { if (expecting < requestSize) { requestNextBatches() //(15) } buffer.take() match { //(16) case Item(batch) => currentIterator = batch.iterator expecting -= 1 this.hasNext //(17) case Done => isDone = true false //(18) case Failure(exception) => throw new StreamErrorCompletionException(exception) //(19) } } } override def next(): T = { val out = currentIterator.next() logger.trace("The next element is {}", out) out //(20) } private def requestNextBatches(): Unit = { logger.debug(s"Requesting {} batches", requestSize) subscription.request(requestSize) expecting += requestSize //(21) } } 

IteratorSubscriber Producer-Consumer. , Subscriber, Producer-, , Iterator, – Consumer-. , . Iterator Apache Spark, Subscriber – , ActorSystem.

IteratorSubscriber .

  1. . , Done, , Throwable, .
  2. , hasNext .
  3. , , Publisher-.
  4. , . LinkedBlockingQueue, . , .
  5. , . , , Publisher-. , , Publisher- . hasNext next ( requestNextBatches hasNext), , .
  6. subscriptionPromise subscription Subscription, Publisher onSubscribe. , Reactive Streams Subscriber- Publisher- , , hasNext , onSubscribe. , subscription, Publisher-. lazy subscription, Promise.
  7. . hasNext next, , .
  8. , , hasNext false . hasNext, .
  9. onSubscribe Publisher- Subscription Promise, subscription.
  10. onNext Publisher-, . .
  11. Publisher onComplete, Done.
  12. Publisher onError. .
  13. hasNext , . , true, . , .
  14. , false.
  15. , , requestSize, Publisher. , , , Publisher- , HTTP- .
  16. . , , , . , , ( , , subscription), , , , .
  17. , currentIterator. , . , hasNext , ( , ), .
  18. , false hasNext. , isDone, , . - , hasNext , false. , hasNext , false , . , .
  19. , , , .
  20. next . , hasNext, next .
  21. Publisher- , , subscription, Publisher-. requestSize. .

, , , :

8. .

:


, , . , HTTP , . .

– . , , Hadoop , . , , - . , , hdfs, , , , .

, . , akka-http , . , -, - Apache Spark , , , -.

, , . , , http-, , .

, . , , . , . , .

, . , , Hadoop , , .

, , Hadoop- , , .

, , CleverDATA . . , , , , , . , .

Source: https://habr.com/ru/post/de413141/


All Articles