
Dans cet article, nous examinerons l'accès à l'API Spark à partir de divers langages de programmation dans la JVM, ainsi que certains problèmes de performances lorsque nous allons au-delà du langage Scala. Même si vous travaillez en dehors de la JVM, cette section peut être utile, car les langages non JVM dépendent souvent de l'API Java et non de l'API Scala.
Travailler dans d'autres langages de programmation ne signifie pas toujours que vous devez aller au-delà de la JVM, et travailler dans la JVM présente de nombreux avantages en termes de performances - principalement en raison du fait que vous n'avez pas besoin de copier les données. Bien qu'il ne soit pas nécessaire d'utiliser des bibliothèques de liaison ou des adaptateurs spéciaux pour accéder à Spark depuis l'extérieur du langage Scala, l'invocation de code Scala à partir d'autres langages de programmation peut être difficile. Le framework Spark prend en charge l'utilisation de Java 8 dans les expressions lambda, et ceux qui utilisent des versions plus anciennes du JDK ont la possibilité d'implémenter l'interface appropriée à partir du package org.apache.spark.api.java.function. Même dans les cas où vous n'avez pas besoin de copier de données, le travail dans un autre langage de programmation peut avoir des nuances petites mais importantes liées aux performances.
Les difficultés d'accès à diverses API Scala sont particulièrement prononcées lors de l'appel de fonctions avec des balises de classe ou lors de l'utilisation de propriétés fournies à l'aide de conversions de types implicites (par exemple, toutes les fonctionnalités des ensembles RDD liés aux classes Double et Tuple). Pour les mécanismes qui dépendent de conversions de types implicites, des classes concrètes équivalentes sont souvent fournies avec des conversions explicites. Les balises de classe factices (par exemple, AnyRef) peuvent être transmises à des fonctions qui dépendent des balises de classe (souvent les adaptateurs le font automatiquement. L'utilisation de classes spécifiques au lieu de conversions de types implicites n'entraîne généralement pas de surcharge supplémentaire, mais les balises de classe factices peuvent imposer des restrictions sur certaines optimisations du compilateur.
L'API Java n'est pas trop différente de l'API Scala en termes de propriétés, seulement occasionnellement certaines fonctionnalités ou API de développeur manquent. D'autres langages de programmation JVM, tels que le langage Clojure avec DSL
Flambo et la bibliothèque
étincelante , sont
pris en
charge à l' aide de diverses API Java au lieu d'appeler directement l'API Scala. Étant donné que la plupart des liaisons de langage, même les langages non JVM comme Python et R, passent par l'API
Java , il sera utile de les gérer.
Les API Java sont très similaires aux API Scala, bien qu'elles soient indépendantes des balises de classe et des conversions implicites. L'absence de ce dernier signifie qu'au lieu de convertir automatiquement les ensembles RDD d'objets Tuple ou doubles en classes spéciales avec des fonctions supplémentaires, vous devez utiliser des fonctions de conversion de type explicite (par exemple, mapToDouble ou mapToPair). Les fonctions spécifiées sont définies uniquement pour les ensembles RDD Java; heureusement pour la compatibilité, ces types spéciaux ne sont que des adaptateurs pour les ensembles Scala RDD. De plus, ces fonctions spéciales renvoient divers types de données, tels que JavaDoubleRDD et JavaPairRDD, avec des fonctionnalités fournies par des transformations implicites du langage Scala.
Reprenons l'exemple canonique du comptage de mots à l'aide de l'API Java (exemple 7.1). Étant donné que l'appel de l'API Scala à partir de Java peut parfois être une tâche ardue, presque toutes les API du framework Java Spark sont implémentées dans le langage Scala avec des balises de classe cachées et des conversions implicites. Pour cette raison, les adaptateurs Java sont une couche très mince, composée en moyenne de seulement quelques lignes de code, et leur réécriture est pratiquement sans effort.
Exemple 7.1 Comptage de mots (Java)
import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.api.java.JavaSparkContext; import java.util.regex.Pattern; import java.util.Arrays; public final class WordCount { private static final Pattern pattern = Pattern.compile(" "); public static void main(String[] args) throws Exception { JavaSparkContext jsc = new JavaSparkContext(); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(e -> Arrays.asList( pattern.split(e)).iterator()); JavaPairRDD<String, Integer> wordsIntial = words.mapToPair( e -> new Tuple2<String, Integer>(e, 1)); } }
Parfois, vous devrez peut-être convertir des RDD Java en RDD Scala ou vice versa. Cela est le plus souvent nécessaire pour les bibliothèques nécessitant une entrée ou renvoyant des ensembles RDD Scala, mais parfois les propriétés de base de Spark peuvent ne pas encore être disponibles dans l'API Java. La conversion de RDD Java en RDD Scala est le moyen le plus simple d'utiliser ces nouvelles fonctionnalités.
Si vous devez transférer l'ensemble RDD Java vers la bibliothèque Scala, qui attend un étincelle RDD standard à l'entrée, vous pouvez accéder au Scala RDD sous-jacent à l'aide de la méthode rdd (). Le plus souvent, cela suffit pour transférer le RDD final vers n'importe quelle bibliothèque Scala souhaitée; Parmi les exceptions notables figurent les bibliothèques Scala, qui s'appuient sur des conversions implicites de types de jeux de types de contenu ou d'informations de balises de classe dans leur travail. Dans ce cas, le moyen le plus simple d'accéder aux conversions implicites consiste à écrire un petit adaptateur dans Scala. Si les shells Scala ne peuvent pas être utilisés, vous pouvez appeler la fonction correspondante de la classe
JavaConverters et former une balise de classe factice.
Pour créer une balise de classe factice, vous pouvez utiliser la méthode scala.reflect.ClassTag $ .MODULE $ .AnyRef () ou obtenir la vraie en utilisant scala.reflect.ClassTag $ .MODULE $ .apply (CLASS), comme illustré dans les exemples 7.2 et 7.3.
Pour convertir de Scala RDD en RDD Java, les informations de balise de classe sont souvent plus importantes que la plupart des bibliothèques Spark. La raison en est que bien que diverses classes JavaRDD fournissent des constructeurs accessibles au public qui prennent Scala RDD comme arguments, elles sont destinées à être appelées à partir du code Scala et nécessitent donc des informations sur la balise de classe.
Les balises de classe factices sont le plus souvent utilisées dans le code générique ou modèle, où les types exacts sont inconnus au moment de la compilation. De telles balises suffisent souvent, bien qu'il soit possible de perdre certaines nuances du côté du code Scala; dans de très rares cas, le code Scala nécessite des informations précises sur les balises de classe. Dans ce cas, vous devrez utiliser une vraie balise. Dans la plupart des cas, cela ne nécessite pas beaucoup d'efforts et améliore les performances, essayez donc d'utiliser ces balises dans la mesure du possible.
Exemple 7.2. Rendre Java / Scala RDD compatible avec une balise de classe factice
public static JavaPairRDD wrapPairRDDFakeCt( RDD<Tuple2<String, Object>> RDD) { // AnyRef — // , // , // // ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef(); return new JavaPairRDD(rdd, fake, fake); }
Exemple 7.3. Garantir la compatibilité RDD Java / Scala
public static JavaPairRDD wrapPairRDD( RDD<Tuple2<String, Object>> RDD) { // ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class); ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class); return new JavaPairRDD(rdd, strCt, longCt); }
Les API Spark SQL et ML pipeline ont été pour la plupart rendues cohérentes en Java et Scala. Cependant, il existe des fonctions d'assistance spécifiques à Java et les fonctions Scala équivalentes ne sont pas faciles à appeler. Voici leurs exemples: diverses fonctions numériques, telles que plus, moins, etc., pour la classe Column. Il est difficile d'appeler leurs équivalents surchargés de la langue Scala (+, -). Au lieu d'utiliser JavaDataFrame et JavaSQLContext, les méthodes requises par Java sont disponibles dans SQLContext et les ensembles DataFrame standard. Cela peut vous dérouter, car certaines des méthodes mentionnées dans la documentation Java ne peuvent pas être utilisées à partir du code Java, mais dans de tels cas, des fonctions portant les mêmes noms sont fournies pour appeler à partir de Java.
Les fonctions définies par l'utilisateur (UDF) dans le langage Java, et d'ailleurs, dans la plupart des autres langages à l'exception de Scala, elles nécessitent de spécifier le type de la valeur renvoyée par la fonction, car elle ne peut pas être déduite de manière logique, semblable à la façon dont elle est effectuée dans Scala (exemple 7.4) .
Exemple 7.4. Exemple UDF pour Java
sqlContext.udf() .register("strlen", (String s) -> s.length(), DataTypes.StringType);
Bien que les types requis par les API Scala et Java soient différents, l'encapsulation des types de collection Java ne nécessite pas de copie supplémentaire. Dans le cas des itérateurs, la conversion de type requise pour l'adaptateur est effectuée de manière retardée lors de l'accès aux éléments, ce qui permet au framework Spark de vider les données si nécessaire (comme expliqué dans la section "Effectuer des transformations itérateur-itérateur à l'aide de la fonction mapPartitions" à la page 125). Ceci est très important car pour de nombreuses opérations simples, le coût de la copie des données peut être supérieur au coût du calcul lui-même.
Au-delà de Scala et JVM
Si vous ne vous limitez pas à la JVM, le nombre de langages de programmation disponibles pour le travail augmente considérablement. Cependant, avec l'architecture Spark actuelle, travailler en dehors de la JVM - en particulier sur les nœuds de travail - peut entraîner des augmentations de coûts importantes en raison de la copie des données dans les nœuds de travail entre la JVM et le code de langue cible. Dans les opérations complexes, la part du coût de copie des données est relativement faible, mais dans les opérations simples, elle peut facilement conduire à un doublement du coût de calcul total.
Le premier langage de programmation non JVM directement pris en charge en dehors de Spark est Python, son API et son interface sont devenues le modèle sur lequel les implémentations pour d'autres langages de programmation non JVM sont basées.
Comment PySpark fonctionne
PySpark se connecte à JVM Spark en utilisant un mélange de canaux sur les travailleurs et Py4J, une bibliothèque spécialisée qui fournit une interaction Python / Java, sur le pilote. Sous cela, à première vue, une architecture simple cache beaucoup de nuances complexes, grâce auxquelles PySpark fonctionne, comme le montre la Fig. 7.1. L'un des principaux problèmes: même lorsque les données sont copiées d'un travailleur Python vers la JVM, ce n'est pas sous la forme qu'une machine virtuelle peut facilement analyser. Des efforts particuliers sont requis de la part du travailleur Python et Java pour garantir que la machine virtuelle Java dispose de suffisamment d'informations pour des opérations telles que le partitionnement.
Kits RDD PySpark
Le coût des ressources pour le transfert de données vers et depuis la JVM, ainsi que pour l'exécution de l'exécuteur Python, est important. Vous pouvez éviter de nombreux problèmes de performances avec les API PySpark RDD Suite à l'aide des API DataFrame / Dataset, car les données restent dans la JVM aussi longtemps que possible.
La copie des données de la JVM vers Python se fait à l'aide de sockets et d'octets sérialisés. Une version plus générale pour interagir avec des programmes dans d'autres langues est disponible via l'interface PipedRDD, dont l'application est présentée dans la sous-section «Utilisation de pipe».
L'organisation de canaux d'échange de données (dans les deux sens) pour chaque transformation serait trop coûteuse. Par conséquent, PySpark organise (si possible) le pipeline de transformation Python à l'intérieur de l'interpréteur Python, enchaînant l'opération de filtrage, puis la carte, sur l'itérateur d'objet Python à l'aide de la classe spécialisée PipelinedRDD. Même lorsque vous devez mélanger des données et que PySpark n'est pas en mesure de chaîner les conversions dans la machine virtuelle d'un travailleur individuel, vous pouvez réutiliser l'interpréteur Python, de sorte que le coût de démarrage de l'interpréteur ne ralentira pas davantage.
Ce n'est qu'une partie du puzzle. Les PipedRDD classiques fonctionnent avec le type String, qui n'est pas si facile à mélanger en raison de l'absence d'une clé naturelle. Dans PySpark, et dans son image et sa similitude dans les bibliothèques se liant à de nombreux autres langages de programmation, un type spécial de PairwiseRDD est utilisé, où la clé est un entier long, et sa désérialisation est effectuée par le code utilisateur dans le langage Scala, destiné à l'analyse des valeurs Python. Le coût de cette désérialisation n'est pas trop élevé, mais cela démontre que Scala dans le framework Spark considère essentiellement les résultats du code Python comme des tableaux d'octets «opaques».
Pour toute sa simplicité, cette approche d'intégration fonctionne étonnamment bien, et la plupart des opérations sur les ensembles Scala RDD sont disponibles en Python. Dans certains des endroits les plus difficiles du code, les bibliothèques sont accessibles, par exemple, MLlib, ainsi que le chargement / enregistrement des données à partir de diverses sources.
Travailler avec différents formats de données impose également ses limites, car une partie importante du code de chargement / enregistrement des données à partir du framework Spark est basée sur les interfaces Java Hadoop. Cela signifie que toutes les données chargées sont d'abord chargées dans la JVM, puis déplacées vers Python.
Deux approches sont généralement utilisées pour interagir avec MLlib: soit PySpark utilise un type de données spécialisé avec des conversions de type Scala, soit l'algorithme est réimplémenté en Python. Ces problèmes peuvent être évités avec le package Spark ML, qui utilise l'interface DataFrame / Dataset, qui stocke généralement les données dans la JVM.
Kits PySpark DataFrame et Dataset
Les ensembles DataFrame et Dataset n'ont pas beaucoup de problèmes de performances avec les API d'ensemble Python RDD car ils stockent les données dans la JVM aussi longtemps que possible. Le même test de performances que nous avons effectué pour illustrer la supériorité des ensembles DataFrame sur les ensembles RDD (voir figure 3.1) montre des différences significatives lors de l'exécution en Python (figure 7.2).
Pour de nombreuses opérations avec des ensembles DataFrame et Dataset, vous n'aurez peut-être pas besoin de déplacer les données de la JVM, bien que l'utilisation de diverses expressions lambda UDF, UDAF et Python nécessite naturellement de déplacer certaines données dans la JVM. Cela conduit au schéma simplifié suivant pour de nombreuses opérations, qui ressemble à celui illustré sur la Fig. 7.3.
Accès aux objets Java sous-jacents et au code mixte dans Scala
Une conséquence importante de l'architecture PySpark est que de nombreuses classes de framework Spark Python sont en fait des adaptateurs pour traduire les appels du code Python en une forme JVM compréhensible.
Si vous travaillez avec des développeurs Scala / Java et que vous souhaitez interagir avec leur code, à l'avance, il n'y aura pas d'adaptateurs pour accéder à votre code, mais vous pouvez enregistrer votre UDF Java / Scala et les utiliser à partir du code Python. À partir de Spark 2.1, cela peut être fait à l'aide de la méthode registerJavaFunction de l'objet sqlContext.
Parfois, ces adaptateurs ne disposent pas de tous les mécanismes nécessaires, et comme Python ne dispose pas d'une solide protection contre les appels de méthodes privées, vous pouvez immédiatement vous tourner vers la JVM. La même technique vous permettra d'accéder à votre propre code dans la JVM et, avec peu d'effort, de reconvertir les résultats en objets Python.
Dans la sous-section "Grands plans de requête et algorithmes itératifs" à la p. 91 nous avons noté l'importance d'utiliser la version JVM des ensembles DataFrame et RDD pour réduire le plan de requête. Il s'agit d'une solution de contournement, car lorsque les plans de requête deviennent trop volumineux pour être traités par l'optimiseur Spark SQL, l'optimiseur SQL, en raison du placement de l'ensemble RDD au milieu, perd la possibilité de regarder au-delà du moment où les données apparaissent dans RDD. La même chose peut être obtenue à l'aide des API Python publiques, cependant, de nombreux avantages des ensembles DataFrame seront perdus, car toutes les données devront aller et venir via les nœuds de travail de Python. Au lieu de cela, vous pouvez réduire le graphique d'origine en continuant à stocker des données dans la JVM (comme illustré dans l'exemple 7.5).
Exemple 7.5 Découpage d'un plan de requête volumineux pour un DataFrame à l'aide de Python
def cutLineage(df): """ DataFrame — .. : >>> df = RDD.toDF() >>> cutDf = cutLineage(df) >>> cutDf.count() 3 """ jRDD = df._jdf.toJavaRDD() jSchema = df._jdf.schema() jRDD.cache() sqlCtx = df.sql_ctx try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema) newDF = DataFrame(newJavaDF, sqlCtx) return newDF
De manière générale, par convention, la syntaxe _j [nom_abrégé] est utilisée pour accéder aux versions Java internes de la plupart des objets Python. Ainsi, par exemple, l'objet SparkContext a _jsc, ce qui vous permet d'obtenir l'objet Java interne SparkContext. Cela n'est possible que dans le programme du pilote, donc lorsque vous envoyez des objets PySpark aux nœuds de travail, vous ne pourrez pas accéder au composant Java interne et la plupart de l'API ne fonctionnera pas.
Pour accéder à la classe Spark dans la JVM, qui ne possède pas d'adaptateur Python, vous pouvez utiliser la passerelle Py4J sur le pilote. L'objet SparkContext contient un lien vers la passerelle dans la propriété _gateway. La syntaxe sc._gateway.jvm. [Full_class_name_in_JVM] permettra d'accéder à n'importe quel objet Java.
Une technique similaire fonctionnera pour vos propres classes Scala si elles sont organisées selon le chemin de classe. Vous pouvez ajouter des fichiers JAR au chemin de classe à l'aide de la commande spark-submit avec le paramètre --jars ou en définissant les propriétés de configuration spark.driver.extraClassPath. Exemple 7.6, qui a aidé à générer du riz. 7.2, est intentionnellement conçu pour générer des données pour les tests de performances en utilisant le code Scala existant.
Exemple 7.6 Appel de classes non Spark-JVM à l'aide de Py4J
sc = sqlCtx._sc # SQL Context, 2.1, 2.0 , # 2.0, — , :p try: try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx except: javaSqlCtx = sqlCtx._jwrapped jsc = sc._jsc scalasc = jsc.sc() gateway = sc._gateway # java-, RDD JVM- # Row (Int, Double). RDD Python # RDD Java ( Row), # , . # Java-RDD Row — # DataFrame, # RDD Row. java_rdd = (gateway.jvm.com.highperformancespark.examples. tools.GenerateScalingData. generateMiniScaleRows(scalasc, rows, numCols)) # JSON . # Python- Java-. schema = StructType([ StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]) # 2.1 / 2.1 try: jschema = javaSqlCtx.parseDataType(schema.json()) except: jschema = sqlCtx._jsparkSession.parseDataType(schema.json()) # RDD (Java) DataFrame (Java) java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema) # DataFrame (Java) DataFrame (Python) python_dataframe = DataFrame(java_dataframe, sqlCtx) # DataFrame (Python) RDD pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1])) return (python_dataframe, pairRDD)
Bien que de nombreuses classes Python soient simplement des adaptateurs d'objets Java, tous les objets Java ne peuvent pas être encapsulés dans des objets Python, puis utilisés dans Spark. Par exemple, les objets des ensembles RDD PySpark sont représentés comme des chaînes sérialisées, qui ne peuvent être analysées facilement qu'en code Python. Heureusement, les objets DataFrame sont standardisés entre différents langages de programmation, donc si vous pouvez convertir vos données en ensembles DataFrame, vous pouvez ensuite les encapsuler dans des objets Python et les utiliser directement comme Python DataFrame, ou convertir un Python DataFrame en RDD de cette même langue.
»Plus d'informations sur le livre sont disponibles sur
le site Web de l'éditeur»
Contenu»
Extrait20% de réduction sur les coupons pour pulvérisateurs -
Spark