Classer de grandes quantités de données sur Apache Spark à l'aide de modèles d'apprentissage automatique arbitraires

Partie 1: Énoncé du problème


Bonjour, Habr! Je suis architecte de solution chez CleverDATA. Aujourd'hui, je vais parler de la façon dont nous classons de grandes quantités de données à l'aide de modèles construits à l'aide de presque toutes les bibliothèques d'apprentissage automatique disponibles. Dans cette série en deux parties, nous examinerons les questions suivantes.

  • Comment présenter un modèle de machine learning en tant que service (Model as a Service)?
  • Comment les tâches de traitement distribué de grandes quantités de données sont-elles physiquement effectuées à l'aide d'Apache Spark?
  • Quels problèmes surviennent lorsque Apache Spark interagit avec des services externes?
  • Comment organiser l'interaction d'Apache Spark avec des services externes en utilisant les bibliothèques akka-streams et akka-http, ainsi que l'approche Reactive Streams?

Au départ, j'avais prévu d'écrire un article, mais comme le volume de matériel s'est avéré assez important, j'ai décidé de le diviser en deux parties. Aujourd'hui, dans la première partie, nous examinerons l'énoncé général du problème, ainsi que les principaux problèmes qui doivent être résolus pendant la mise en œuvre. Dans la deuxième partie, nous parlerons de l'implémentation pratique de la solution à ce problème en utilisant l'approche Reactive Streams.


Notre société CleverDATA dispose d'une équipe d'analystes de données qui, à l'aide d'une large gamme d'outils (tels que scikit-learn, facebook fastText, xgboost, tensorFlow, etc.), forment des modèles d'apprentissage automatique. Le langage de programmation de base utilisé par les analystes est Python. Presque toutes les bibliothèques d'apprentissage automatique, même implémentées à l'origine dans d'autres langages, ont une interface Python et sont intégrées aux principales bibliothèques Python (principalement avec NumPy).

D'autre part, l'écosystème Hadoop est largement utilisé pour stocker et traiter de grandes quantités de données non structurées. Dans ce document, les données sont stockées sur le système de fichiers HDFS sous la forme de blocs répliqués distribués d'une certaine taille (généralement 128 Mo, mais il est possible de configurer). Les algorithmes de traitement des données distribués les plus efficaces tentent de minimiser l'interaction réseau entre les machines du cluster. Pour ce faire, les données doivent être traitées sur les mêmes machines où elles sont stockées.

Bien sûr, dans de nombreux cas, l'interaction réseau ne peut pas être complètement évitée, mais, néanmoins, vous devez essayer d'effectuer toutes les tâches localement et minimiser la quantité de données qui devront être transmises sur le réseau.

Ce principe de traitement des données distribuées est appelé «rapprocher les calculs des données». Tous les principaux frameworks, principalement Hadoop MapReduce et Apache Spark, adhèrent à ce principe. Ils déterminent la composition et la séquence d'opérations spécifiques qui devront être exécutées sur des machines où les blocs de données requis sont stockés.

Figure 1. Le cluster HDFS se compose de plusieurs machines, dont l'une est un nœud de nom et le reste est un nœud de données. Le nœud de nom stocke des informations sur les fichiers qui composent leurs blocs et sur les machines sur lesquelles ils se trouvent physiquement. Les blocs eux-mêmes sont stockés sur le nœud de données, qui sont répliqués sur plusieurs machines pour augmenter la fiabilité. Le nœud de données exécute également des tâches de traitement des données. Les tâches consistent en le processus principal (Master, M), qui coordonne le lancement des processus de travail (Worker, W) sur les machines où sont stockés les blocs de données nécessaires.

Presque tous les composants de l'écosystème Hadoop sont lancés à l'aide de la machine virtuelle Java (JVM) et sont étroitement intégrés les uns aux autres. Par exemple, pour exécuter des tâches écrites à l'aide d'Apache Spark pour travailler avec des données stockées sur HDFS, presque aucune manipulation supplémentaire n'est requise: le framework fournit cette fonctionnalité prête à l'emploi.

Malheureusement, la plupart des bibliothèques conçues pour l'apprentissage automatique supposent que les données sont stockées et traitées localement. En même temps, il existe des bibliothèques étroitement intégrées à l'écosystème Hadoop, par exemple Spark ML ou Apache Mahout. Cependant, ils présentent un certain nombre d'inconvénients importants. Premièrement, ils fournissent beaucoup moins d'implémentations d'algorithmes d'apprentissage automatique. Deuxièmement, tous les analystes de données ne peuvent pas travailler avec eux. Les avantages de ces bibliothèques incluent le fait qu'elles peuvent être utilisées pour former des modèles sur de grands volumes de données en utilisant l'informatique distribuée.

Cependant, les analystes de données utilisent souvent des méthodes alternatives pour former des modèles, en particulier des bibliothèques qui permettent l'utilisation de GPU. Je ne considérerai pas les problèmes de formation des modèles dans cet article, car je veux me concentrer sur l'utilisation de modèles prêts à l'emploi construits à l'aide de toute bibliothèque d'apprentissage automatique disponible pour classer de grandes quantités de données.

Ainsi, la tâche principale que nous essayons de résoudre ici est d'appliquer des modèles d'apprentissage automatique à de grandes quantités de données stockées sur HDFS. Si nous pouvions utiliser le module SparkML de la bibliothèque Apache Spark, qui implémente les algorithmes d'apprentissage automatique de base, alors classer de grandes quantités de données serait une tâche triviale:

val model: LogisticRegressionModel = LogisticRegressionModel.load("/path/to/model") val dataset = spark.read.parquet("/path/to/data") val result = model.transform(dataset) 

Malheureusement, cette approche ne fonctionne que pour les algorithmes implémentés dans le module SparkML (une liste complète peut être trouvée ici ). Dans le cas de l'utilisation d'autres bibliothèques, d'ailleurs, non implémentées sur la JVM, tout devient beaucoup plus compliqué.

Pour résoudre ce problème, nous avons décidé d'envelopper le modèle dans un service REST. En conséquence, lors du démarrage de la tâche de classification des données stockées sur HDFS, il est nécessaire d'organiser l'interaction entre les machines sur lesquelles les données sont stockées et la machine (ou cluster de machines) sur laquelle le service de classification s'exécute.

Figure 2. Le concept de modèle en tant que service

Description du service de classification Python


Afin de présenter le modèle en tant que service, il est nécessaire de résoudre les tâches suivantes:

  1. implémenter un accès efficace au modèle via HTTP;
  2. assurer l'utilisation la plus efficace des ressources de la machine (principalement tous les cœurs de processeur et la mémoire);
  3. fournir une résistance aux charges élevées;
  4. offrent la possibilité d'évoluer horizontalement.

L'accès au modèle via HTTP est assez simple à implémenter: un grand nombre de bibliothèques ont été développées pour Python qui vous permettent d'implémenter un point d'accès REST en utilisant une petite quantité de code. L'une de ces microframes est Flask . La mise en œuvre du service de classification sur Flask est la suivante:

 from flask import Flask, request, Response model = load_model() n_features = 100 app = Flask(__name__) @app.route("/score", methods=['PUT']) def score(): inp = np.frombuffer(request.data, dtype='float32').reshape(-1, n_features) result = model.predict(inp) return Response(result.tobytes(), mimetype='application/octet-stream') if __name__ == "__main__": app.run() 

Ici, lorsque le service démarre, nous chargeons le modèle en mémoire, puis nous l'utilisons lors de l'appel de la méthode de classification. La fonction load_model charge le modèle à partir d'une source externe, que ce soit le système de fichiers, le stockage de valeurs-clés, etc.

Un modèle est un objet doté d'une méthode de prédiction. Dans le cas de la classification, il prend une entrée à un vecteur d'entité d'une certaine taille et produit soit une valeur booléenne indiquant si le vecteur spécifié convient à ce modèle, soit une valeur de 0 à 1, à laquelle vous pouvez ensuite appliquer le seuil de coupure: tout ce qui dépasse le seuil, est un résultat positif du classement, le reste ne l'est pas.

Le vecteur de caractéristiques que nous devons classer est transmis sous forme binaire et désérialisé en un tableau numpy. Ce serait une surcharge de faire une requête HTTP pour chaque vecteur. Par exemple, dans le cas d'un vecteur à 100 dimensions et en utilisant des valeurs de type float32, une requête HTTP complète, y compris les en-têtes, ressemblerait à ceci:

 PUT /score HTTP/1.1 Host: score-node-1:8099 User-Agent: curl/7.58.0 Accept: */* Content-Type: application/binary Content-Length: 400 [400 bytes of data] 

Comme vous pouvez le constater, l'efficacité d'une telle requête est très faible (400 octets de charge utile / (en-tête de 133 octets + corps de 400 octets) = 75%). Heureusement, dans presque toutes les bibliothèques, la méthode de prédiction vous permet de recevoir non pas le vecteur [1 xn], mais la matrice [mxn] et, en conséquence, de générer immédiatement le résultat pour m valeurs d'entrée.

De plus, la bibliothèque numpy est optimisée pour travailler avec de grandes matrices, vous permettant d'utiliser efficacement toutes les ressources machine disponibles. Ainsi, nous pouvons envoyer non pas un mais un nombre assez important de vecteurs de caractéristiques en une seule demande, les désérialiser en une matrice numpy de taille [mxn], classer et renvoyer le vecteur [mx 1] à partir de valeurs booléennes ou float32. Par conséquent, l'efficacité de l'interaction HTTP lors de l'utilisation d'une matrice de 1000 lignes devient presque égale à 100%. Dans ce cas, la taille des en-têtes HTTP peut être négligée.

Pour tester le service Flask sur la machine locale, vous pouvez l'exécuter à partir de la ligne de commande. Cependant, cette méthode est totalement inadaptée à une utilisation industrielle. Le fait est que Flask est monothread et, si nous regardons le diagramme de charge du processeur pendant que le service est en cours d'exécution, nous verrons qu'un cœur est chargé à 100% et les autres sont inactifs. Heureusement, il existe des moyens d'utiliser tous les noyaux de la machine: pour cela, Flask doit être exécuté via le serveur d'applications Web uwsgi. Il vous permet de configurer de manière optimale le nombre de processus et de threads afin d'assurer une charge uniforme sur tous les cœurs de processeur. Vous trouverez plus de détails sur toutes les options de configuration d'uwsgi ici .

Il est préférable d'utiliser nginx comme point d'entrée HTTP, car uwsgi peut fonctionner de manière instable en cas de charges élevées. Nginx, d'autre part, prend sur lui le flux d'entrée complet des demandes, filtre les demandes invalides et dose la charge sur uwsgi. Nginx communique avec uwsgi via des sockets linux en utilisant un fichier de processus. Un exemple de configuration nginx est illustré ci-dessous:

 server { listen 80; server_name 127.0.0.1; location / { try_files $uri @score; } location @score { include uwsgi_params; uwsgi_pass unix:/tmp/score.sock; } } 

Comme nous pouvons le voir, cela s'est avéré être une configuration assez compliquée pour une machine. Si nous devons classer de grandes quantités de données, un nombre élevé de demandes arrivera à ce service, et cela peut devenir un goulot d'étranglement. La solution à ce problème est la mise à l'échelle horizontale.

Pour plus de commodité, nous emballons le service dans un conteneur Docker, puis le déployons sur le nombre requis de machines. Si vous le souhaitez, vous pouvez utiliser des outils de déploiement automatisé tels que Kubernetes. Un exemple de structure Dockerfile pour créer un conteneur avec un service est donné ci-dessous.

 FROM ubuntu #Installing required ubuntu and python modules RUN apt-get update RUN apt-get -y install python3 python3-pip nginx RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 RUN update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 RUN pip install uwsgi flask scipy scikit-learn #copying script files WORKDIR /etc/score COPY score.py . COPY score.ini . COPY start.sh . RUN chmod +x start.sh RUN rm /etc/nginx/sites-enabled/default COPY score.nginx /etc/nginx/sites-enabled/ EXPOSE 80 ENTRYPOINT ["./start.sh"] 

Ainsi, la structure du service de classement est la suivante:

Figure 3. Schéma de service pour la classification

Un bref résumé du travail d'Apache Spark dans l'écosystème Hadoop


Considérons maintenant le processus de traitement des données stockées sur HDFS. Comme je l'ai noté précédemment, le principe du transfert des calculs vers les données est utilisé pour cela. Pour commencer le traitement des tâches, vous devez savoir sur quelles machines les blocs de données dont nous avons besoin sont stockés afin d'exécuter des processus directement impliqués dans leur traitement. Il est également nécessaire de coordonner le lancement de ces processus, de les redémarrer en cas d'urgence, si nécessaire, d'agréger les résultats des différentes sous-tâches, etc.

Toutes ces tâches sont accomplies par une variété de cadres travaillant avec l'écosystème Hadoop. Apache Spark est l'un des plus populaires et des plus pratiques. Le concept principal autour duquel l'ensemble du cadre est construit est RDD (Resilient Distributed Dataset). En général, RDD peut être considéré comme une collection distribuée qui résiste aux chutes. RDD peut être obtenu de deux manières principales:

  1. création à partir d'une source externe, telle qu'une collection en mémoire, un fichier ou un répertoire sur le système de fichiers, etc .;
  2. conversion à partir d'un autre RDD en appliquant des opérations de transformation. RDD prend en charge toutes les opérations de base de l'utilisation des collections, telles que map, flatMap, filter, groupBy, join, etc.

Il est important de comprendre que RDD, contrairement aux collections, n'est pas directement des données, mais une séquence d'opérations qui doivent être effectuées sur les données. Par conséquent, lorsque les opérations de transformation sont appelées, aucun travail ne se produit réellement et nous obtenons simplement un nouveau RDD, qui contiendra une opération de plus que dans la précédente. Le travail lui-même commence lorsque les opérations ou actions dites terminales sont appelées. Il s'agit notamment de l'enregistrement dans un fichier, de l'enregistrement dans une collection en mémoire, du comptage du nombre d'éléments, etc.

Lors du démarrage d'une opération de terminal, Spark crée un graphique d'opération acyclique (DAG, Directed Acyclic Graph) basé sur le RDD résultant et les exécute séquentiellement sur le cluster en fonction du graphique reçu. Lors de la construction d'un DAG basé sur RDD, Spark effectue un certain nombre d'optimisations, par exemple, si possible, combine plusieurs transformations successives en une seule opération.

RDD était la principale unité d'interaction avec l'API Spark dans les versions de Spark 1.x. Dans Spark 2.x, les développeurs ont déclaré que le principal concept d'interaction est désormais Dataset. Dataset est un module complémentaire pour RDD avec prise en charge d'une interaction de type SQL. Lorsque vous utilisez l'API Dataset, Spark vous permet d'utiliser un large éventail d'optimisations, y compris celles de niveau assez bas. Mais en général, les principes de base qui s'appliquent aux RDD s'appliquent également au Dataset.

Plus de détails sur le travail de Spark peuvent être trouvés dans la documentation sur le site officiel .

Prenons un exemple de la classification la plus simple sur Spark sans utiliser de services externes. Un algorithme plutôt dénué de sens est implémenté ici, qui prend en compte la proportion de chacune des lettres latines dans le texte, puis considère l'écart-type. Ici, tout d'abord, il est important de faire attention directement aux étapes de base utilisées lors de l'utilisation de Spark.

 case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) def std(vector: Array[Float]): Float = ??? //(2) val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] //(3) val result: Dataset[Score] = ds.map {d: Data => //(4) val filteredText = d.text.toLowerCase.filter { letter => 'a' <= letter && letter <= 'z' } val featureVector = new Array[Float](26) if (filteredText.nonEmpty) { filteredText.foreach(letter => featureVector(letter) += 1) featureVector.indicies.foreach { i => featureVector(i) = featureVector(i) / filteredText.length() } } Features(d.id, featureVector) }.map {f: Features => Score(f.id, std(f.vector)) //(5) } result.write.parquet("/path/to/result") //(6) 

Dans cet exemple, nous:

  1. nous déterminons la structure des données d'entrée, intermédiaires et de sortie (les données d'entrée sont définies comme du texte auquel un certain identifiant est associé, les données intermédiaires correspondent à l'identifiant avec le vecteur de caractéristiques et la sortie correspond à l'identifiant avec une certaine valeur numérique);
  2. nous définissons une fonction pour calculer la valeur résultante par un vecteur caractéristique (par exemple, écart type, implémentation non représentée);
  3. définir le jeu de données d'origine comme des données stockées sur HDFS au format parquet le long du chemin / chemin / vers / données;
  4. Définissez un Dataset intermédiaire comme une carte bitmap à partir du Dataset d'origine.
  5. De même, nous déterminons l'ensemble de données résultant par une transformation au niveau du bit à partir de l'intermédiaire;
  6. enregistrer le jeu de données résultant dans HDFS au format parquet le long du chemin / chemin / vers / résultat. L'enregistrement dans un fichier étant une opération terminale, les calculs eux-mêmes sont lancés précisément à ce stade.

Apache Spark fonctionne sur le principe du maître-ouvrier. Lorsque l'application démarre, le processus principal, appelé pilote, démarre. Il exécute le code responsable de la formation du RDD, sur la base duquel les calculs seront effectués.

Lorsqu'une opération de terminal est appelée, le pilote génère un DAG basé sur le RDD résultant. Ensuite, le pilote lance le lancement de workflows appelés exécuteurs, dans lesquels les données seront traitées directement. Après avoir démarré les workflows, le pilote leur transmet le bloc exécutable qui doit être exécuté et indique également à quelle partie des données il doit être appliqué.

Ci-dessous est le code de notre exemple, dans lequel les sections de code exécutées sur l'exécuteur (entre les lignes début de la partie exécuteur et fin de la partie exécuteur) sont mises en évidence. Le reste du code est exécuté sur le pilote.

 case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) def std(vector: Array[Float]): Float = ??? val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] val result: Dataset[Score] = ds.map { // --------------- EXECUTOR PART BEGIN ----------------------- d: Data => val filteredText = d.text.toLowerCase.filter { letter => 'a' <= letter && letter <= 'z' } val featureVector = new Array[Float](26) if (filteredText.nonEmpty) { filteredText.foreach(letter => featureVector(letter) += 1) featureVector.indicies.foreach { i => featureVector(i) = featureVector(i) / filteredText.length() } } Features(d.id, featureVector) // --------------- EXECUTOR PART END ----------------------- }.map { // --------------- EXECUTOR PART BEGIN ----------------------- f: Features => Score(f.id, std(f.vector)) // --------------- EXECUTOR PART END ----------------------- } result.write.parquet(“/path/to/result”) 

Dans l'écosystème Hadoop, toutes les applications s'exécutent dans des conteneurs. Un conteneur est un processus s'exécutant sur l'une des machines d'un cluster à laquelle est allouée une certaine quantité de ressources. Le lancement des conteneurs est géré par le YARN Resource Manager. Il détermine laquelle des machines a un nombre suffisant de cœurs de processeur et de RAM, ainsi que si elle contient les blocs de données nécessaires au traitement.

Lors du lancement de l'application Spark, YARN crée et exécute le conteneur sur l'une des machines de cluster dans laquelle il lance le pilote. Ensuite, lorsque le pilote prépare le DAG à partir d'opérations qui doivent être exécutées sur les exécuteurs, YARN lance des conteneurs supplémentaires sur les machines souhaitées.

En règle générale, il suffit que le pilote alloue un cœur et une petite quantité de mémoire (à moins, bien sûr, que le résultat du calcul ne soit agrégé sur le pilote en mémoire). Pour les exécuteurs, afin d'optimiser les ressources et de réduire le nombre total de processus dans le système, plusieurs cœurs peuvent être distingués: dans ce cas, l'exécuteur pourra effectuer plusieurs tâches simultanément.

Mais ici, il est important de comprendre qu'en cas de défaillance de l'une des tâches en cours d'exécution dans le conteneur ou en cas de ressources insuffisantes, YARN peut décider d'arrêter le conteneur, puis toutes les tâches qui y ont été exécutées devront être redémarrées sur un autre artiste. De plus, si nous allouons un nombre suffisamment important de cœurs par conteneur, il est probable que YARN ne pourra pas le démarrer. Par exemple, si nous avons deux machines sur lesquelles deux cœurs restent inutilisés, nous pouvons alors démarrer sur chaque conteneur qui nécessite deux cœurs, mais nous ne pouvons pas démarrer un conteneur qui nécessite quatre cœurs.

Voyons maintenant comment le code de notre exemple sera exécuté directement sur le cluster. Imaginez que la taille des données source soit de 2 téraoctets. Par conséquent, si la taille de bloc sur HDFS est de 128 mégaoctets, il y aura 16384 blocs au total. Chaque bloc est répliqué sur plusieurs machines pour garantir la fiabilité. Pour simplifier, nous prenons le facteur de réplication égal à deux, c'est-à-dire qu'il y aura au total 32 768 blocs disponibles. Supposons que nous utilisons un cluster de 16 machines pour le stockage. En conséquence, sur chacune des machines en cas de distribution uniforme, il y aura environ 2048 blocs, soit 256 gigaoctets par machine. Sur chacune des machines, nous avons 8 cœurs de processeur et 64 gigaoctets de RAM.

Pour notre tâche, le pilote n'a pas besoin de beaucoup de ressources, nous allons donc lui allouer 1 cœur et 1 Go de mémoire. Nous donnerons aux interprètes 2 cœurs et 4 Go de mémoire. Supposons que nous voulons maximiser l'utilisation des ressources du cluster. Ainsi, nous obtenons 64 conteneurs: un pour le conducteur et 63 pour les interprètes.

Figure 4. Processus exécutés sur le nœud de données et les ressources qu'ils utilisent.

Étant donné que dans notre cas, nous n'utilisons que des opérations cartographiques, notre DAG consistera en une seule opération. Il comprend les actions suivantes:

  1. prendre un bloc de données du disque dur local,
  2. Convertir des données
  3. enregistrez le résultat dans un nouveau bloc sur votre propre disque local.

Au total, nous devons traiter 16384 blocs, donc chaque exécuteur doit effectuer 16384 / (63 exécuteurs * 2 cœurs) = 130 opérations. Ainsi, le cycle de vie de l'exécuteur testamentaire en tant que processus distinct (au cas où tout se passe sans chutes) se présentera comme suit.

  1. Lancement de conteneurs.
  2. Recevoir du conducteur une tâche dans laquelle il y aura un identifiant de bloc et l'opération nécessaire. Puisque nous avons alloué deux cœurs au conteneur, l'exécuteur reçoit deux tâches à la fois.
  3. Exécution d'une tâche et envoi du résultat au pilote.
  4. Récupération de la tâche suivante à partir du pilote et répétition des étapes 2 et 3 jusqu'à ce que tous les blocs de cette machine locale soient traités.
  5. Arrêt de conteneur

Remarque : des DAG plus complexes sont obtenus si les données intermédiaires sont redistribuées entre les machines, généralement pour le regroupement (groupBy, ReduceByKey, etc.) et les opérations de jointure, qui dépassent le cadre de cet article.

Les principaux problèmes d'interaction entre Apache Spark et les services externes


Si, dans le cadre de l'opération de cartographie, nous avons besoin d'accéder à un service externe, la tâche devient moins triviale. Supposons qu'un objet de la classe ExternalServiceClient soit responsable de l'interaction avec un service externe. En général, avant de commencer le travail, nous devons l'initialiser, puis l'appeler si nécessaire:

 val client = ExternalServiceClient.create() // val score = client.score(featureVector) // . 

Habituellement, l'initialisation du client prend un certain temps, par conséquent, en règle générale, elle est initialisée au démarrage de l'application, puis elle est utilisée pour obtenir une instance client à partir d'un contexte ou d'un pool global. Par conséquent, lorsqu'un conteneur avec Spark executor reçoit une tâche qui nécessite une interaction avec un service externe, il serait intéressant d'obtenir un client déjà initialisé avant de commencer à travailler sur le tableau de données, puis de le réutiliser pour chaque élément.

Il existe deux façons de procéder dans Spark. Premièrement, si le client est sérialisable (le client lui-même et tous ses champs doivent étendre l'interface java.io.Serializable), il peut alors être initialisé sur le pilote puis transmis aux exécuteurs via le mécanisme de variable de diffusion .

 val client = ExternalServiceClient.create() val clientBroadcast = sparkContext.broadcast(client) ds.map { f: Features => val score = clientBroadcast.value.score(f.vector) Score(f.id, score) } 

Dans le cas où le client n'est pas sérialisable, ou l'initialisation du client est un processus qui dépend des paramètres de la machine particulière sur laquelle il s'exécute (par exemple, pour équilibrer, les demandes d'une partie des machines doivent aller vers la première machine de service et pour l'autre vers la seconde), alors le client peut être initialisé directement sur l'exécuteur.

Pour ce faire, RDD (et Dataset) a une opération mapPartitions, qui est une version généralisée de l'opération map (si vous regardez le code source de la classe RDD, l'opération map est implémentée via mapPartitions). La fonction passée à l'opération mapPartitions est exécutée une fois pour chaque bloc. , , , :

 ds.mapPartitions {fi: Iterator[Features] => val client = ExternalServiceClient.create() fi.map { f: Features => val score = client.score(f.vector) Score(f.id, score) } } 

. , , , , . , , , .

. , hasNext next:

 while (i.hasNext()) { val item = i.next() … } 

, , . , 8 , YARN 4 2 , , 8 . , . .

. , , , , . : , , . , hasNext , . (, , ) , , , . , .

5. , , mapPartitions, . .

, , . , , , .

6.

, , , -, , , , -, , .


, . , . , . , . , , , , , , .

.

  1. , , , .
  2. , , . , . , .
  3. , hasNext false, , , , . : hasNext = false, , , . , , , .

, . Stay tuned!

Source: https://habr.com/ru/post/fr413137/


All Articles