Classer de grandes quantités de données sur Apache Spark à l'aide de modèles d'apprentissage automatique arbitraires

Partie 2: Solution


Bonjour encore! Aujourd'hui, je vais continuer mon histoire sur la façon dont nous classons de grandes quantités de données sur Apache Spark à l'aide de modèles d'apprentissage automatique arbitraires. Dans la première partie de l'article, nous avons examiné l'énoncé du problème lui-même, ainsi que les principaux problèmes qui se posent lors de l'organisation de l'interaction entre le cluster sur lequel les données initiales sont stockées et traitées, et le service de classification externe. Dans la deuxième partie, nous considérerons l'une des options pour résoudre ce problème en utilisant l'approche Reactive Streams et son implémentation en utilisant la bibliothèque akka-streams.


Concept de flux réactifs


Pour résoudre les problèmes décrits dans la première partie, vous pouvez utiliser l'approche, appelée Reactive Streams . Il vous permet de contrôler le processus de transfert des flux de données entre les étapes de traitement, fonctionnant à différentes vitesses et indépendamment les uns des autres sans avoir besoin de mise en mémoire tampon. Si l'une des étapes de traitement est plus lente que la précédente, il est nécessaire de signaler l'étape la plus rapide sur la quantité de données d'entrée qu'elle est prête à traiter pour le moment. Cette interaction est appelée contre-pression. Elle consiste dans le fait que les étapes les plus rapides traitent exactement autant d'éléments que nécessaire pour l'étape la plus lente, et pas plus, puis libèrent des ressources informatiques.

En général, Reactive Streams est une spécification pour implémenter le modèle Publisher-Subscriber . Cette spécification définit un ensemble de quatre interfaces (éditeur, abonné, processeur et abonnement) et un contrat pour leurs méthodes.

Examinons ces interfaces plus en détail:

public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } 

Le modèle Publisher-Subscriber a deux côtés: la transmission et la réception. Lors de l'implémentation de Reactive Streams, la classe qui implémente l'interface Publisher est responsable du transfert de données et l'Abonné est responsable de la réception. Pour établir une communication entre eux, l'Abonné doit être enregistré auprès de Publisher en appelant sa méthode d'abonnement. Selon la spécification, après avoir enregistré un abonné, Publisher doit appeler ses méthodes dans l'ordre suivant:

  1. onSubscribe. Cette méthode est appelée immédiatement après l'enregistrement de l'abonné auprès de Publisher. En tant que paramètre, un objet Subscription lui est transmis via lequel l'Abonné demandera des données à Publisher. Cet objet doit être stocké et appelé uniquement dans le contexte de cet abonné.
  2. Une fois que l'Abonné a demandé des données à Publisher en appelant la méthode de demande sur l'objet Subscription correspondant, Publisher peut appeler la méthode Subscriber onNext, en passant l'élément suivant.
  3. L'abonné peut ensuite appeler périodiquement la méthode de demande sur l'abonnement, mais Publisher ne peut pas appeler la méthode onNext plus que le total demandé via la méthode de demande.
  4. Si le flux de données est fini, après avoir passé tous les éléments via la méthode onNext, Publisher doit appeler la méthode onComplete.
  5. Si une erreur s'est produite dans Publisher et qu'un traitement ultérieur des éléments n'est pas possible, il doit appeler la méthode onError
  6. Après avoir appelé les méthodes onComplete ou onError, l'interaction ultérieure de Publisher avec l'abonné doit être exclue.

Les appels de méthode peuvent être considérés comme l'envoi de signaux entre l'éditeur et l'abonné. L'abonné signale à Publisher le nombre d'éléments qu'il est prêt à traiter, et Publisher, à son tour, lui signale qu'il existe soit l'élément suivant, soit qu'il n'y a plus d'éléments, ou qu'une erreur s'est produite.

Afin d'exclure une autre influence de Publisher et Subscriber l'un sur l'autre, les appels à toutes les méthodes qui implémentent les interfaces Reactive Streams doivent être non bloquants. Dans ce cas, l'interaction entre eux sera complètement asynchrone.

Plus de détails sur la spécification des interfaces Reactive Streams peuvent être trouvés ici .

Ainsi, en reliant les itérateurs d'origine et résultants en les convertissant respectivement en Publisher et Subscriber, nous pouvons résoudre les problèmes identifiés dans la partie précédente de l'article. Le problème de dépassement de tampon entre les étapes est résolu en demandant un certain nombre d'éléments à l'Abonné. Le problème de l'achèvement réussi ou échoué est résolu en envoyant des signaux à l'Abonné via les méthodes onComplete ou onError, respectivement. L'éditeur devient responsable de l'envoi de ces signaux, qui dans notre cas doivent contrôler le nombre de requêtes HTTP envoyées et le nombre de réponses reçues. Après avoir reçu la dernière réponse et traité tous les résultats qui y sont entrés, il doit envoyer un signal onComplete. En cas d'échec de l'une des demandes, il doit envoyer un signal onError et cesser d'envoyer d'autres éléments à l'Abonné, ainsi que de soustraire des éléments de l'itérateur d'origine.

L'itérateur résultant doit être implémenté en tant qu'abonné. Dans ce cas, nous ne pouvons pas nous passer d'un tampon dans lequel les éléments seront écrits lorsque la méthode onNext est appelée à partir de l'interface Subscriber et soustraits à l'aide des méthodes hasNext et next de l'interface Iterator. En tant qu'implémentation de tampon, vous pouvez utiliser une file d'attente de blocage, par exemple, LinkedBlockedQueue.

Un lecteur attentif posera immédiatement la question: pourquoi la file d'attente est-elle bloquante, car selon la spécification Reactive Streams, l'implémentation de toutes les méthodes devrait être non bloquante? Mais tout va bien ici: puisque nous demandons à Publisher un nombre d'éléments strictement défini, la méthode onNext ne sera pas appelée plus que ce nombre de fois, et la file d'attente peut toujours ajouter un nouvel élément sans bloquer.

En revanche, un blocage peut se produire lorsque la méthode hasNext est appelée en cas de file d'attente vide. Cependant, tout va bien: la méthode hasNext ne fait pas partie du contrat de l'interface Abonné, elle est définie dans l'interface Iterator, qui, comme nous l'avons expliqué précédemment, est une structure de données bloquante. Lors de l'appel de la méthode suivante, nous soustrayons l'élément suivant de la file d'attente, et lorsque sa taille devient inférieure à un certain seuil, nous devrons demander la partie suivante des éléments via un appel à la méthode request.

Figure 7. Interaction asynchrone avec un service externe à l'aide de l'approche Reactive Streams

Bien sûr, dans ce cas, nous ne nous débarrasserons pas complètement du blocage des appels. Cela est dû à une incompatibilité de paradigmes entre les flux réactifs, qui supposent une interaction complètement asynchrone, et un itérateur, qui doit appeler trueN ou false lors de l'appel de la méthode hasNext. Cependant, contrairement à l'interaction synchrone avec un service externe, les temps d'arrêt dus aux verrous peuvent être considérablement réduits en augmentant la charge globale des cœurs de processeur.

Il serait pratique que les développeurs Apache Spark dans les futures versions implémentent un analogue de la méthode mapPartitions, qui fonctionne avec Publisher et Subscriber. Cela permettrait une interaction complètement asynchrone, éliminant ainsi la possibilité de bloquer les threads.

Akka-streams et akka-http comme implémentation de la spécification Reactive Streams


Actuellement, il existe déjà plus d'une douzaine d'implémentations de la spécification Reactive Streams. Une telle implémentation est le module akka-streams de la bibliothèque akka . Dans le monde de la JVM, akka s'est imposé comme l'un des moyens les plus efficaces pour écrire des systèmes parallèles et distribués. Ceci est obtenu grâce au fait que le principe de base énoncé dans sa fondation est le modèle d'acteur , qui vous permet d'écrire des applications hautement compétitives sans contrôle direct des threads et de leurs pools.

Beaucoup de littérature a été écrite sur la mise en œuvre du concept d'acteurs dans l'akka, donc nous ne nous arrêterons pas là (le site officiel de l' akka est une très bonne source d'information, je recommande également l' akka dans le livre d' action ). Ici, nous allons examiner de plus près le côté technologique de la mise en œuvre dans le cadre de la JVM.

En général, les acteurs n'existent pas par eux-mêmes, mais forment un système hiérarchique. Afin de créer un système d'acteur, vous devez lui allouer des ressources, donc la première étape lorsque vous travaillez avec akka est de créer une instance de l'objet ActorSystem. Lorsque ActorSystem démarre, un pool séparé de threads est créé, appelé le répartiteur, dans lequel tout le code défini dans les acteurs est exécuté. En règle générale, un seul thread exécute le code de plusieurs acteurs, cependant, si nécessaire, vous pouvez configurer un répartiteur distinct pour un groupe spécifique d'acteurs (par exemple, pour les acteurs interagissant directement avec une API de blocage).

L'une des tâches les plus courantes résolues à l'aide d'acteurs est le traitement séquentiel des flux de données. Auparavant, pour cela, il était nécessaire de créer manuellement des chaînes d'acteurs et de s'assurer qu'il n'y avait pas de goulots d'étranglement entre eux (par exemple, si un acteur traite les messages plus rapidement que le suivant, il peut alors avoir un débordement de la file d'attente des messages entrants, conduisant à une erreur OutOfMemoryError).

À partir de la version 2.4, le module akka-streams a été ajouté à akka, ce qui vous permet de définir de manière déclarative le processus de traitement des données, puis de créer les acteurs nécessaires à son exécution. Akka-streams met également en œuvre le principe de la contre-pression, ce qui élimine la possibilité de déborder la file d'attente des messages entrants pour tous les acteurs impliqués dans le traitement.

Les principaux éléments permettant de définir le schéma de traitement des flux de données dans les akka-streams sont Source, Flow et Sink. En les combinant les uns avec les autres, nous obtenons un graphique exécutable. Pour démarrer le processus de traitement, un matérialiseur est utilisé, ce qui crée des acteurs travaillant selon le graphe défini par nous (l'interface Materializer et son implémentation ActorMaterializer).

Examinons plus en détail les étapes Source, Flow et Sink. La source définit la source de données. Akka-streams prend en charge plus d'une douzaine de façons différentes de créer des sources, y compris à partir d'un itérateur:

 val featuresSource: Source[Array[Float], NotUsed] = Source.fromIterator { () => featuresIterator } 

La source peut également être obtenue en convertissant une source existante:

 val newSource: Source[String, NotUsed] = source.map(item => transform(item)) 

Si la transformation est une opération non triviale, elle peut être représentée comme une entité Flow. Akka-streams prend en charge de nombreuses façons de créer Flow. La façon la plus simple est de créer à partir d'une fonction:

 val someFlow: Flow[String, Int, NotUsed] = Flow.fromFunction((x: String) => x.length) 

En combinant Source et Flow, nous obtenons une nouvelle Source.

 val newSource: Source[Int, NotUsed] = oldSource.via(someFlow) 

L'évier est utilisé comme étape finale du traitement des données. Comme dans le cas de Source, akka-streams fournit plus d'une douzaine d'options Sink différentes, par exemple, Sink.foreach effectue une certaine opération pour chaque élément, Sink.seq collecte tous les éléments d'une collection, etc.

 val printSink: Sink[Any, Future[Done]] = Sink.foreach(println) 

Source, Flow et Sink sont paramétrés respectivement par les types d'éléments d'entrée et / ou de sortie. De plus, chaque étape de traitement peut avoir un certain résultat de son travail. Pour cela, Source, Flow et Sink sont également paramétrés par un type supplémentaire qui détermine le résultat de l'opération. Ce type est appelé type de valeur matérialisée. Si l'opération n'implique pas la présence d'un résultat supplémentaire de son travail, par exemple, lorsque nous définissons Flow through a function, le type NotUsed est utilisé comme valeur matérialisée.

En combinant la source, le flux et le puits nécessaires, nous obtenons RunnableGraph. Il est paramétré par un type, qui détermine le type de valeur obtenu suite à l'exécution de ce graphe. Si nécessaire, lors de la combinaison des étapes, vous pouvez spécifier le résultat de laquelle des étapes sera le résultat de l'ensemble du graphique des opérations. Par défaut, le résultat de l'étape Source est pris:

 val graph: RunnableGraph[NotUsed] = someSource.to(Sink.foreach(println)) 

Cependant, si le résultat de l'étape Sink est plus important pour nous, alors nous devons l'indiquer explicitement:

 val graph: RunnableGraph[Future[Done]] = someSource.toMat(Sink.foreach(println))(Keep.right) 

Après avoir défini le graphe des opérations, nous devons l'exécuter. Pour ce faire, runnableGraph doit appeler la méthode run. En tant que paramètre, cette méthode prend un objet ActorMaterializer (qui peut également être dans une portée implicite), qui est responsable de la création d'acteurs qui effectueront des opérations. En règle générale, un ActorMaterializer est créé immédiatement après la création d'un ActorSystem, attaché à son cycle de vie, et l'utilise pour créer des acteurs. Prenons un exemple:

 //   ActorSystem,       implicit val system = ActorSystem(“system name”) // ,       ActorSystem implicit val materializer = ActorMaterializer() //    ,       Sink val graph: RunnableGraph[Future[immutable.Seq[Int]]] = Source.fromIterator(() => (1 to 10).iterator).toMat(Sink.seq)(Keep.right) //   ,    implicit scope. val result: Future[immutable.Seq[Int]] = graph.run() 

Dans le cas de combinaisons simples, vous pouvez vous passer de la création d'un RunnableGraph distinct, mais connectez simplement Source à Sink et démarrez-les en appelant la méthode runWith sur Source. Cette méthode suppose également qu'un objet ActorMaterializer est présent dans la portée implicite. De plus, dans ce cas, la valeur matérialisée définie dans Sink sera utilisée. Par exemple, en utilisant le code suivant, nous pouvons convertir Source en Publisher à partir de la spécification Reactive Streams:

 val source: Source[Score, NotUsed] = Source.fromIterator(() => sourceIterator).map(item => transform(item)) val publisher: Publisher[Score] = source.runWith(Sink.asPublisher(false)) 

Ainsi, nous avons maintenant montré comment obtenir Reactive Streams Publisher en créant une source à partir de l'itérateur source et en effectuant des transformations sur ses éléments. Nous pouvons maintenant l'associer à un abonné qui fournit des données à l'itérateur résultant. Reste à considérer la dernière question: comment organiser l'interaction HTTP avec un service externe.

La structure d'akka comprend le module akka-http , qui vous permet d'organiser une communication asynchrone non bloquante sur HTTP. De plus, ce module est construit sur la base de flux akka, ce qui vous permet d'ajouter une interaction HTTP comme étape supplémentaire dans le graphique des opérations de traitement des flux de données.

Pour se connecter à des services externes, akka-http propose trois interfaces différentes.

  1. API au niveau de la demande - est l'option la plus simple pour le cas de demandes uniques à une machine arbitraire. À ce niveau, les connexions HTTP sont gérées de manière entièrement automatique et, à chaque demande, il est nécessaire de transmettre l'adresse complète de la machine à laquelle la demande est envoyée.
  2. API au niveau de l'hôte - appropriée lorsque nous savons à quel port sur quelle machine nous allons accéder. Dans ce cas, akka-http prend le contrôle du pool de connexions HTTP, et dans les requêtes, il suffit de spécifier le chemin relatif vers la ressource demandée.
  3. API au niveau de la connexion - vous permet d'obtenir un contrôle total sur la gestion des connexions HTTP, c'est-à-dire l'ouverture, la fermeture et la distribution des demandes sur les connexions.

Dans notre cas, l'adresse du service de classification nous est connue à l'avance, il est donc nécessaire d'organiser l'interaction HTTP uniquement avec cette machine particulière. Par conséquent, l'API au niveau de l'hôte est la meilleure pour nous. Voyons maintenant comment le pool de connexions HTTP est créé lors de son utilisation:

 val httpFlow: Flow[(HttpRequest,Id), (Try[HttpResponse],Id), Http.HostConnectionPool] = Http().cachedHostConnectionPool[Id](hostAddress, portNumber) 

Lors de l'appel à Http (). CachedHostConnectionPool [T] (hostAddress, portNumber) dans l'ActorSystem, qui est dans une portée implicite, les ressources sont allouées pour créer un pool de connexions, mais les connexions elles-mêmes ne sont pas établies. À la suite de cet appel, Flow est renvoyé, qui reçoit une paire d'une demande HTTP et un objet d'identification Id en entrée. L'objet d'identification est nécessaire pour faire correspondre la demande avec la réponse correspondante, car l'appel HTTP dans akka-http est une opération asynchrone, et l'ordre dans lequel les réponses sont reçues ne correspond pas nécessairement à l'ordre dans lequel les demandes sont envoyées. Par conséquent, à la sortie, Flow donne un couple du résultat de la requête et de l'objet d'identification correspondant.

Directement, les connexions HTTP sont établies lorsqu'un graphique (y compris ce flux) est lancé (matérialisé). Akka-http est implémenté de telle manière que peu importe le nombre de fois où les graphiques contenant httpFlow ont été matérialisés, au sein du même ActorSystem, il y aura toujours un pool commun de connexions HTTP qui sera utilisé par toutes les matérialisations. Cela vous permet de mieux contrôler l'utilisation des ressources réseau et d'éviter de les surcharger.

Ainsi, le cycle de vie du pool de connexions HTTP est lié à l'ActorSystem. Comme déjà mentionné, le cycle de vie du pool de threads lui est également attaché, dans lequel les opérations définies dans les acteurs (ou dans notre cas, définies comme les étapes akka-streams et akka-http) sont effectuées. Par conséquent, pour atteindre une efficacité maximale, nous devons réutiliser une instance d'ActorSystem au sein du même processus JVM.

Mettre tout cela ensemble: un exemple de mise en œuvre de l'interaction avec le service de classification


Ainsi, nous pouvons maintenant passer au processus de classification de grands volumes de données distribuées sur Apache Spark en utilisant une interaction asynchrone avec des services externes. Le schéma général de cette interaction a déjà été illustré à la figure 7.

Supposons que nous ayons défini un [jeu de données] initial. En lui appliquant l'opération mapPartitions, nous devrions obtenir un ensemble de données, dans lequel chaque identifiant de l'ensemble source est estampillé avec une certaine valeur obtenue à la suite de la classification (ensemble de données [score]). Pour organiser le traitement asynchrone sur les exécuteurs, nous devons encapsuler la source et les itérateurs résultants dans Publisher et Subscriber, respectivement, à partir de la spécification des flux réactifs et les lier ensemble.

 case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) val batchesRequestCount = config.getInt(“scoreService. batchesRequestCount”)//(2) //... val scoreDs: Dataset[Score] = featuresDs.mapPartitions { fi: Iterator[Features] => val publisher: Publisher[Iterable[Score]] = createPublisher(fi) //(3) val iteratorSubscriber: Iterator[Score] = new IteratorSubscriber(batchesRequestCount) //(4) publisher.subscribe(batchesRequestCount) //(5) iteratorSubscriber //(6) } 

Dans cette implémentation, il est pris en compte que le service de classification pour un appel peut traiter un groupe de vecteurs de caractéristiques à la fois, par conséquent, le résultat de la classification après un appel à celui-ci sera également disponible immédiatement pour l'ensemble du groupe. Par conséquent, en tant que type de paramètre pour Publisher, nous n'avons pas seulement Score, comme vous pouvez vous y attendre, mais Iterable [Score]. Ainsi, nous envoyons les résultats de classification de ce groupe à l'itérateur résultant (qui est également un abonné) par un seul appel à la méthode onNext. C'est beaucoup plus efficace que d'appeler onNext pour chaque élément. Nous allons maintenant analyser ce code plus en détail.

  1. Nous déterminons la structure des données d'entrée et de sortie. En entrée, nous aurons un tas d'identifiant avec un vecteur d'entité, et en sortie, nous aurons un tas d'identifiant avec une valeur numérique obtenue à la suite de la classification.
  2. Nous déterminons le nombre de groupes que l'abonné demandera à l'éditeur à la fois. Comme il est supposé que ces valeurs se trouveront dans le tampon et attendront d'être lues dans l'itérateur résultant, cette valeur dépend de la quantité de mémoire allouée à l'exécuteur.
  3. Créez Publisher à partir de l'itérateur source. Il sera responsable de l'interaction avec le service de classification. La fonction createPublisher est décrite ci-dessous.
  4. Créez un abonné, qui sera l'itérateur résultant. Le code de la classe IteratorSubscriber est également donné ci-dessous.
  5. Enregistrement de l'abonné auprès de Publisher.
  6. Renvoie IteratorSubscriber comme résultat de l'opération mapPartitions.

Considérons maintenant l'implémentation de la fonction createPublisher.

 type Ids = Seq[String] //(1) val batchSize = config.getInt("scoreService.batchSize") val parallelismLevel = config.getInt("scoreService.parallelismLevel") //(2) //... def createPublisher(fi: Iterator[Features]): Publisher[Iterable[Score]] = { import ActorSystemHolder._ //(3) Source .fromIterator(() => fi) //(4) .grouped(batchSize) //(5) .map { groupedFeatures: Seq[Features] => val request: (HttpRequest, Ids) = createHttpRequest(groupedFeatures) //(6) logger.debug(s"Sending request for the first id: ${request._2(0)}") request } .via(httpFlow) //(7) .flatMapMerge(parallelismLevel, { //(8) case (Success(response), ids) if response.status.isSuccess() => logger.debug(s"Processing successful result for the first id: ${ids(0)}") val resultSource: Source[Iterable[Score], _] = response.entity.dataBytes.reduce(_ ++ _).map { responseBytes => processSuccessfulResponse(responseBytes, ids) } //(9) resultScore case (Success(response), ids) => logger.warn( s"Failed result for the first id: ${ids(0)}, HTTP status: ${response.status}" ) response.discardEntityBytes() Source.failed( new IOException(s"Non-successful HTTP status: ${response.status}") ) //(10) case (Failure(ex), ids) => logger.warn(s"Failed result: an exception has occured", ex) Source.failed(ex) //(11) }) .runWith(Sink.asPublisher(false)) //(12) } def createHttpRequest(featuresSeq: Seq[Features]): (HttpRequest, ProfileIds) = { val requestBytes: Array[Byte] = featuresToMatrixBytes(featuresSeq) val ids: ProfileIds = extractIds(featuresSeq) val httpRequest = HttpRequest( method = HttpMethods.PUT, uri = "/score", entity = requestBytes ) httpRequest -> ids } 

  1. - , . httpFlow, .
  2. : , (batchSize) (parallelismLevel).
  3. implicit scope ActorSystem, ActorMaterializer httpFlow. Spark-. ActorSystemHolder .
  4. akka-streams . Source[Features] .
  5. batchSize .
  6. HttpRequest . HttpRequest createHttpRequest. createPublisher. feature-, , ( predict). , HTTP-. , HTTP-, HTTP-, URI .
  7. httpFlow.
  8. , . flatMapMerge, akka-http Source[ByteString], , . . parallelismLevel , ( ). HTTP-: , , , .
  9. : . akka ByteString. , ByteString O(1), ByteString . , , . , .
  10. HTTP- , Stream . , discardEntityBytes , , .
  11. . akka-http , .
  12. , Publisher, . , . false Sink.asPublisher , Publisher Subscriber-.

, akka ActorSystem, . , Spark , . Spark JVM , , , ActorSystem ActorMatrializer httpFlow.

 object ActorSystemHolder { implicit lazy val actorSystem: ActorSystem = { //(1) val actorSystemName = s"score-service-client" logger.debug(s"Creating actor system $actorSystemName") val as = ActorSystem(actorSystemName) //(2) logger.debug("Adding shutdown hook for the actor system") scala.sys.addShutdownHook { //(3) logger.debug(s"Terminating actor system $actorSystemName") Await.result(as.terminate(), 30.seconds) //to Mars :) logger.debug(s"The actor system $actorSystemName has been terminated") } as } implicit lazy val materializer: ActorMaterializer = { //(4) logger.debug(s"Creating actor materializer for actor system ${actorSystem.name}") ActorMaterializer() } lazy val httpFlow: Flow[ (HttpRequest,ProfileIds), (Try[HttpResponse], ProfileIds), Http.HostConnectionPool] = { //(5) val httpFlowSettings = ConnectionPoolSettings(actorSystem) logger.debug(s"Creating http flow with settings $httpFlowSettings") Http().cachedHostConnectionPool[ProfileIds]( config.getString("scoreService.host"), config.getInt("scoreService.int"), settings = httpFlowSettings ) } } 

  1. , , , .
  2. ActorSystem .
  3. , , ActorSystem, terminate, , , , . , JVM-.
  4. ActorMaterializer, akka-streams, ActorSystem.
  5. , httpFlow . , HTTP- ActorSystem.

Subscriber- HTTP-.

 sealed trait QueueItem[+T] case class Item[+T](item: T) extends QueueItem[T] case object Done extends QueueItem[Nothing] case class Failure(cause: Throwable) extends QueueItem[Nothing] //(1) class StreamErrorCompletionException(cause: Throwable) extends Exception(cause) //(2) class IteratorSubscriber[T](requestSize: Int) extends Subscriber[Iterable[T]] with Iterator[T] { //(3) private val buffer: BlockingQueue[QueueItem[Iterable[T]]] = new LinkedBlockingQueue[QueueItem[Iterable[T]]]() //(4) private var expecting: Int = 0 //(5) private val subscriptionPromise: Promise[Subscription] = Promise() private lazy val subscription: Subscription = Await.result(subscriptionPromise.future, 5.minutes) //(6) private var currentIterator: Iterator[T] = Iterator.empty //(7) private var isDone = false //(8) override def onSubscribe(s: Subscription): Unit = { subscriptionPromise.success(s) //(9) logger.trace("The iterator has been subscribed") } override def onNext(t: Iterable[T]): Unit = { logger.trace("Putting a next batch to the buffer") buffer.put(Item(t)) //(10) } override def onComplete(): Unit = { logger.debug("The stream has been succesfully completed") buffer.put(Done) //(11) } override def onError(t: Throwable): Unit = { logger.warn("The stream has been completed with error", t) buffer.put(Failure(t)) //(12) } override def hasNext: Boolean = { logger.trace("Asking hasNext") if (currentIterator.hasNext) { //(13) true } else if (isDone) { //(14) false } else { if (expecting < requestSize) { requestNextBatches() //(15) } buffer.take() match { //(16) case Item(batch) => currentIterator = batch.iterator expecting -= 1 this.hasNext //(17) case Done => isDone = true false //(18) case Failure(exception) => throw new StreamErrorCompletionException(exception) //(19) } } } override def next(): T = { val out = currentIterator.next() logger.trace("The next element is {}", out) out //(20) } private def requestNextBatches(): Unit = { logger.debug(s"Requesting {} batches", requestSize) subscription.request(requestSize) expecting += requestSize //(21) } } 

IteratorSubscriber Producer-Consumer. , Subscriber, Producer-, , Iterator, – Consumer-. , . Iterator Apache Spark, Subscriber – , ActorSystem.

IteratorSubscriber .

  1. . , Done, , Throwable, .
  2. , hasNext .
  3. , , Publisher-.
  4. , . LinkedBlockingQueue, . , .
  5. , . , , Publisher-. , , Publisher- . hasNext next ( requestNextBatches hasNext), , .
  6. subscriptionPromise subscription Subscription, Publisher onSubscribe. , Reactive Streams Subscriber- Publisher- , , hasNext , onSubscribe. , subscription, Publisher-. lazy subscription, Promise.
  7. . hasNext next, , .
  8. , , hasNext false . hasNext, .
  9. onSubscribe Publisher- Subscription Promise, subscription.
  10. onNext Publisher-, . .
  11. Publisher onComplete, Done.
  12. Publisher onError. .
  13. hasNext , . , true, . , .
  14. , false.
  15. , , requestSize, Publisher. , , , Publisher- , HTTP- .
  16. . , , , . , , ( , , subscription), , , , .
  17. , currentIterator. , . , hasNext , ( , ), .
  18. , false hasNext. , isDone, , . - , hasNext , false. , hasNext , false , . , .
  19. , , , .
  20. next . , hasNext, next .
  21. Publisher- , , subscription, Publisher-. requestSize. .

, , , :

8. .

:


, , . , HTTP , . .

– . , , Hadoop , . , , - . , , hdfs, , , , .

, . , akka-http , . , -, - Apache Spark , , , -.

, , . , , http-, , .

, . , , . , . , .

, . , , Hadoop , , .

, , Hadoop- , , .

, , CleverDATA . . , , , , , . , .

Source: https://habr.com/ru/post/fr413141/


All Articles