
En esta publicación, analizaremos el acceso a la API de Spark desde varios lenguajes de programación en la JVM, así como algunos problemas de rendimiento al ir más allá del lenguaje Scala. Incluso si trabaja fuera de la JVM, esta sección puede ser útil, ya que los lenguajes que no son JVM a menudo dependen de la API de Java y no de la API de Scala.
Trabajar en otros lenguajes de programación no siempre significa que deba ir más allá de la JVM, y trabajar en la JVM tiene muchas ventajas en términos de rendimiento, principalmente debido al hecho de que no necesita copiar datos. Aunque no es necesario utilizar bibliotecas o adaptadores de enlace especiales para acceder a Spark desde fuera del lenguaje Scala, invocar el código Scala desde otros lenguajes de programación puede ser difícil. El marco de Spark admite el uso de Java 8 en expresiones lambda, y aquellos que usan versiones anteriores de JDK tienen la oportunidad de implementar la interfaz adecuada desde el paquete org.apache.spark.api.java.function. Incluso en los casos en que no necesite copiar datos, trabajar en otro lenguaje de programación puede tener pequeños pero importantes matices relacionados con el rendimiento.
Particularmente llamativas son las dificultades para acceder a varias API de Scala al invocar funciones con etiquetas de clase o al usar propiedades proporcionadas mediante conversiones de tipo implícitas (por ejemplo, toda la funcionalidad de conjuntos RDD relacionados con las clases Double y Tuple). Para los mecanismos que dependen de conversiones de tipo implícito, a menudo se proporcionan clases concretas equivalentes junto con conversiones explícitas a ellas. Las etiquetas de clase ficticias (digamos, AnyRef) se pueden pasar a funciones que dependen de etiquetas de clase (a menudo los adaptadores lo hacen automáticamente. El uso de clases específicas en lugar de conversiones de tipo implícitas generalmente no genera gastos generales adicionales, pero las etiquetas de clase ficticias pueden imponer restricciones en algunas optimizaciones del compilador.
La API de Java no es muy diferente de la API de Scala en términos de propiedades, solo ocasionalmente faltan algunas funcionalidades o API de desarrollador. Otros lenguajes de programación JVM, como el lenguaje Clojure con DSL
Flambo y la biblioteca
brillante , son
compatibles con varias API de Java en lugar de llamar directamente a la API de Scala. Dado que la mayoría de los enlaces de idiomas, incluso los lenguajes que no son JVM como Python y R, pasan por la API de
Java , será útil tratar con ellos.
Las API de Java son muy similares a las API de Scala, aunque son independientes de las etiquetas de clase y las conversiones implícitas. La ausencia de este último significa que, en lugar de convertir automáticamente los conjuntos RDD de Tuple u objetos dobles en clases especiales con funciones adicionales, debe usar funciones de conversión de tipo explícito (por ejemplo, mapToDouble o mapToPair). Las funciones especificadas se definen solo para conjuntos Java RDD; Afortunadamente para la compatibilidad, estos tipos especiales son solo adaptadores para conjuntos Scala RDD. Además, estas funciones especiales devuelven varios tipos de datos, como JavaDoubleRDD y JavaPairRDD, con características proporcionadas por transformaciones de lenguaje Scala implícitas.
Pasemos nuevamente al ejemplo canónico del conteo de palabras usando la API de Java (Ejemplo 7.1). Dado que llamar a la API de Scala desde Java a veces puede ser una tarea desalentadora, casi todas las API de framework de Spark Java se implementan en el lenguaje Scala con etiquetas de clase ocultas y conversiones implícitas. Debido a esto, los adaptadores Java son una capa muy delgada, que en promedio consta de solo unas pocas líneas de código, y reescribirlos es prácticamente sin esfuerzo.
Ejemplo 7.1 Recuento de palabras (Java)
import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.api.java.JavaSparkContext; import java.util.regex.Pattern; import java.util.Arrays; public final class WordCount { private static final Pattern pattern = Pattern.compile(" "); public static void main(String[] args) throws Exception { JavaSparkContext jsc = new JavaSparkContext(); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(e -> Arrays.asList( pattern.split(e)).iterator()); JavaPairRDD<String, Integer> wordsIntial = words.mapToPair( e -> new Tuple2<String, Integer>(e, 1)); } }
A veces puede necesitar convertir RDD de Java a RDD de Scala o viceversa. Esto se necesita con mayor frecuencia para las bibliotecas que requieren entrada o devuelven conjuntos Scala RDD, pero a veces las propiedades básicas de Spark aún no están disponibles en la API Java. La conversión de RDD de Java a RDD de Scala es la forma más fácil de usar estas nuevas funciones.
Si necesita transferir el conjunto RDD de Java a la biblioteca Scala, que espera un RDD Spark regular en la entrada, puede acceder al RDD Scala subyacente utilizando el método rdd (). Muy a menudo, esto es suficiente para transferir el RDD final a cualquier biblioteca Scala deseada; Entre las excepciones notables se encuentran las bibliotecas Scala, que se basan en conversiones implícitas de tipos de tipos de contenido o información de etiquetas de clase en su trabajo. En este caso, la forma más fácil de acceder a las conversiones implícitas es escribir un pequeño adaptador en Scala. Si no se pueden usar los shells de Scala, puede llamar a la función correspondiente de la clase
JavaConverters y formar una etiqueta de clase ficticia.
Para crear una etiqueta de clase ficticia, puede usar el método scala.reflect.ClassTag $ .MODULE $ .AnyRef () u obtener el real usando scala.reflect.ClassTag $ .MODULE $ .apply (CLASS), como se muestra en los ejemplos 7.2 y 7.3.
Para convertir de Scala RDD a RDD Java, la información de etiqueta de clase suele ser más importante que la mayoría de las bibliotecas de Spark. La razón es que, aunque varias clases JavaRDD proporcionan constructores de acceso público que toman Scala RDD como argumentos, están destinados a ser llamados desde el código Scala y, por lo tanto, requieren información sobre la etiqueta de clase.
Las etiquetas de clase ficticias se usan con mayor frecuencia en código genérico o de plantilla, donde los tipos exactos son desconocidos en el momento de la compilación. Tales etiquetas son a menudo suficientes, aunque existe la posibilidad de perder algunos matices en el lado del código Scala; en casos muy raros, el código Scala requiere información precisa de la etiqueta de clase. En este caso, tendrá que usar una etiqueta real. En la mayoría de los casos, esto no requiere mucho esfuerzo y mejora el rendimiento, por lo tanto, intente utilizar dichas etiquetas siempre que sea posible.
Ejemplo 7.2. Hacer que Java / Scala RDD sea compatible con una etiqueta de clase ficticia
public static JavaPairRDD wrapPairRDDFakeCt( RDD<Tuple2<String, Object>> RDD) { // AnyRef — // , // , // // ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef(); return new JavaPairRDD(rdd, fake, fake); }
Ejemplo 7.3. Garantizar la compatibilidad de Java / Scala RDD
public static JavaPairRDD wrapPairRDD( RDD<Tuple2<String, Object>> RDD) { // ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class); ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class); return new JavaPairRDD(rdd, strCt, longCt); }
Tanto las API de canalización de Spark SQL como las de ML se hicieron en su mayor parte consistentes en Java y Scala. Sin embargo, existen funciones auxiliares específicas de Java, y las funciones de Scala equivalentes a ellas no son fáciles de llamar. Estos son sus ejemplos: varias funciones numéricas, como más, menos, etc., para la clase Columna. Es difícil llamar a sus equivalentes sobrecargados del lenguaje Scala (+, -). En lugar de utilizar JavaDataFrame y JavaSQLContext, los métodos requeridos por Java están disponibles en SQLContext y en conjuntos de DataFrame regulares. Esto puede confundirlo, porque algunos de los métodos mencionados en la documentación de Java no se pueden usar desde el código de Java, pero en tales casos se proporcionan funciones con los mismos nombres para llamar desde Java.
Las funciones definidas por el usuario (UDF) en el lenguaje Java, y para el caso, en la mayoría de los otros idiomas, excepto Scala, requieren especificar el tipo del valor devuelto por la función, ya que no se puede deducir lógicamente, de forma similar a cómo se realiza en el lenguaje Scala (ejemplo 7.4) .
Ejemplo 7.4. Muestra UDF para Java
sqlContext.udf() .register("strlen", (String s) -> s.length(), DataTypes.StringType);
Aunque los tipos requeridos por las API de Scala y Java son diferentes, envolver los tipos de colección Java no requiere copia adicional. En el caso de los iteradores, la conversión de tipo requerida para el adaptador se realiza de manera retardada a medida que se accede a los elementos, lo que permite que el marco de Spark descargue datos si es necesario (como se discutió en la sección "Realización de transformaciones de iterador-iterador usando la función mapPartitions" en la página 121). Esto es muy importante porque para muchas operaciones simples el costo de copiar datos puede ser mayor que el costo del cálculo en sí.
Más allá de Scala y JVM
Si no se limita a la JVM, entonces la cantidad de lenguajes de programación disponibles para trabajar aumenta dramáticamente. Sin embargo, con la arquitectura actual de Spark, trabajar fuera de la JVM, especialmente en los nodos de trabajo, puede generar un aumento significativo de los costos debido a la copia de datos en los nodos de trabajo entre la JVM y el código de idioma de destino. En operaciones complejas, la parte del costo de copiar datos es relativamente pequeña, pero en operaciones simples puede conducir fácilmente a una duplicación del costo computacional total.
El primer lenguaje de programación no JVM que se admite directamente fuera de Spark es Python, su API e interfaz se han convertido en el modelo en el que se basan las implementaciones para otros lenguajes de programación no JVM.
Cómo funciona PySpark
PySpark se conecta a JVM Spark utilizando una combinación de canales en los trabajadores y Py4J, una biblioteca especializada que proporciona interacción Python / Java, en el controlador. Bajo esto, a primera vista, la arquitectura simple esconde muchos matices complejos, gracias a los cuales funciona PySpark, como se muestra en la Fig. 7.1. Uno de los principales problemas: incluso cuando los datos se copian de un trabajador de Python a la JVM, no es en la forma en que una máquina virtual puede analizar fácilmente. Se requieren esfuerzos especiales tanto del trabajador de Python como de Java para garantizar que la JVM tenga suficiente información para operaciones como la partición.
Kits PySpark RDD
El costo de los recursos para transferir datos hacia y desde la JVM, así como para ejecutar el ejecutor de Python, es significativo. Puede evitar muchos problemas de rendimiento con las API de PySpark RDD Suite utilizando las API DataFrame / Dataset, porque los datos permanecen en la JVM durante el mayor tiempo posible.
La copia de datos de la JVM a Python se realiza mediante sockets y bytes serializados. Una versión más general para interactuar con programas en otros idiomas está disponible a través de la interfaz PipedRDD, cuya aplicación se muestra en la subsección "Uso de la tubería".
La organización de canales para el intercambio de datos (en dos direcciones) para cada transformación sería demasiado costosa. Como resultado, PySpark organiza (si es posible) la tubería de transformación de Python dentro del intérprete de Python, encadenando la operación de filtro, y luego el mapa, en el iterador de objetos de Python usando la clase especializada PipelinedRDD. Incluso cuando necesite mezclar datos y PySpark no pueda encadenar conversiones en la máquina virtual de un trabajador individual, puede reutilizar el intérprete de Python, por lo que el costo de iniciar el intérprete no disminuirá aún más.
Esto es solo una parte del rompecabezas. Los PipedRDD normales funcionan con el tipo String, que no es tan fácil de mezclar debido a la falta de una clave natural. En PySpark, y en su imagen y similitud en las bibliotecas vinculadas a muchos otros lenguajes de programación, se utiliza un tipo especial de PairwiseRDD, donde la clave es un entero largo, y su deserialización se realiza mediante código de usuario en el lenguaje Scala, destinado a analizar los valores de Python. El costo de esta deserialización no es demasiado alto, pero demuestra que Scala en el marco de Spark básicamente considera que los resultados del código Python funcionan como conjuntos de bytes "opacos".
Para toda su simplicidad, este enfoque de integración funciona sorprendentemente bien, y la mayoría de las operaciones en conjuntos Scala RDD están disponibles en Python. En algunos de los lugares más difíciles del código, se accede a las bibliotecas, por ejemplo, MLlib, así como a cargar / guardar datos de varias fuentes.
Trabajar con varios formatos de datos también impone sus limitaciones, ya que una parte importante del código para cargar / guardar datos del marco de Spark se basa en las interfaces Java de Hadoop. Esto significa que todos los datos cargados se cargan primero en la JVM, y solo luego se mueven a Python.
Por lo general, se utilizan dos enfoques para interactuar con MLlib: PySpark usa un tipo de datos especializado con conversiones de tipo Scala o el algoritmo se vuelve a implementar en Python. Estos problemas se pueden evitar con el paquete Spark ML, que utiliza la interfaz DataFrame / Dataset, que generalmente almacena datos en la JVM.
PySpark DataFrame y kits de conjuntos de datos
Los conjuntos de DataFrame y Dataset no tienen muchos problemas de rendimiento con las API de Python RDD Set porque almacenan datos en la JVM durante el mayor tiempo posible. La misma prueba de rendimiento que realizamos para ilustrar la superioridad de los conjuntos de DataFrame sobre los conjuntos RDD (consulte la Figura 3.1) muestra diferencias significativas cuando se ejecuta en Python (Figura 7.2).
Para muchas operaciones con conjuntos de DataFrame y Dataset, es posible que no necesite mover datos de la JVM, aunque usar varias expresiones lambda UDF, UDAF y Python naturalmente requiere mover algunos de los datos a la JVM. Esto lleva al siguiente esquema simplificado para muchas operaciones, que se parece al que se muestra en la Fig. 7.3.
Acceso a objetos Java subyacentes y código mixto en Scala
Una consecuencia importante de la arquitectura PySpark es que muchas de las clases de framework de Spark Python son en realidad adaptadores para traducir llamadas del código de Python a una forma JVM comprensible.
Si trabaja con desarrolladores de Scala / Java y desea interactuar con su código, de antemano no habrá adaptadores para acceder a su código, pero puede registrar su UDF de Java / Scala y usarlos desde el código Python. Comenzando con Spark 2.1, esto se puede hacer usando el método registerJavaFunction del objeto sqlContext.
A veces, estos adaptadores no tienen todos los mecanismos necesarios, y dado que Python no tiene una protección sólida contra la llamada a métodos privados, puede recurrir inmediatamente a la JVM. La misma técnica le permitirá acceder a su propio código en la JVM y, con poco esfuerzo, volver a convertir los resultados en objetos de Python.
En la subsección "Grandes planes de consulta y algoritmos iterativos" en la pág. 91 notamos la importancia de usar la versión JVM de los conjuntos DataFrame y RDD para reducir el plan de consulta. Esta es una solución alternativa, porque cuando los planes de consulta se vuelven demasiado grandes para ser procesados por el optimizador Spark SQL, el optimizador SQL, al colocar el conjunto RDD en el medio, pierde la capacidad de mirar más allá del momento en que los datos aparecen en RDD. Lo mismo se puede lograr con la ayuda de las API públicas de Python, sin embargo, muchas de las ventajas de los conjuntos de DataFrame se perderán, porque todos los datos tendrán que ir y venir a través de los nodos de trabajo de Python. En cambio, puede reducir el gráfico de origen si continúa almacenando datos en la JVM (como se muestra en el Ejemplo 7.5).
Ejemplo 7.5 Recortar un plan de consulta grande para un DataFrame usando Python
def cutLineage(df): """ DataFrame — .. : >>> df = RDD.toDF() >>> cutDf = cutLineage(df) >>> cutDf.count() 3 """ jRDD = df._jdf.toJavaRDD() jSchema = df._jdf.schema() jRDD.cache() sqlCtx = df.sql_ctx try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema) newDF = DataFrame(newJavaDF, sqlCtx) return newDF
En términos generales, por convención, la sintaxis _j [nombre_ abreviado] se utiliza para acceder a las versiones internas de Java de la mayoría de los objetos de Python. Entonces, por ejemplo, el objeto SparkContext tiene _jsc, que le permite obtener el objeto interno SparkContext Java. Esto solo es posible en el programa controlador, por lo que cuando envíe objetos PySpark a nodos de trabajo, no podrá acceder al componente interno de Java y la mayoría de las API no funcionarán.
Para acceder a la clase Spark en la JVM, que no tiene un adaptador Python, puede usar la puerta de enlace Py4J en el controlador. El objeto SparkContext contiene un enlace a la puerta de enlace en la propiedad _gateway. La sintaxis sc._gateway.jvm. [Full_class_name_in_JVM] permitirá el acceso a cualquier objeto Java.
Una técnica similar funcionará para sus propias clases Scala si están organizadas de acuerdo con el classpath. Puede agregar archivos JAR al classpath usando el comando spark-submit con el parámetro --jars o estableciendo las propiedades de configuración spark.driver.extraClassPath. Ejemplo 7.6, que ayudó a generar arroz. 7.2, está diseñado intencionalmente para generar datos para pruebas de rendimiento utilizando el código Scala existente.
Ejemplo 7.6 Llamando a clases que no son Spark-JVM usando Py4J
sc = sqlCtx._sc # SQL Context, 2.1, 2.0 , # 2.0, — , :p try: try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx except: javaSqlCtx = sqlCtx._jwrapped jsc = sc._jsc scalasc = jsc.sc() gateway = sc._gateway # java-, RDD JVM- # Row (Int, Double). RDD Python # RDD Java ( Row), # , . # Java-RDD Row — # DataFrame, # RDD Row. java_rdd = (gateway.jvm.com.highperformancespark.examples. tools.GenerateScalingData. generateMiniScaleRows(scalasc, rows, numCols)) # JSON . # Python- Java-. schema = StructType([ StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]) # 2.1 / 2.1 try: jschema = javaSqlCtx.parseDataType(schema.json()) except: jschema = sqlCtx._jsparkSession.parseDataType(schema.json()) # RDD (Java) DataFrame (Java) java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema) # DataFrame (Java) DataFrame (Python) python_dataframe = DataFrame(java_dataframe, sqlCtx) # DataFrame (Python) RDD pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1])) return (python_dataframe, pairRDD)
Aunque muchas clases de Python son simplemente adaptadores de objetos de Java, no todos los objetos de Java pueden envolverse en objetos de Python y luego usarse en Spark. Por ejemplo, los objetos en los conjuntos PySpark RDD se representan como cadenas serializadas, que solo se pueden analizar fácilmente en el código Python. Afortunadamente, los objetos DataFrame están estandarizados entre diferentes lenguajes de programación, por lo que si puede convertir sus datos en conjuntos de DataFrame, puede envolverlos en objetos Python y usarlos directamente como un Python DataFrame o convertir un Python DataFrame en un RDD de este mismo idioma
»Se puede encontrar más información sobre el libro en
el sitio web del editor»
Contenidos»
Extracto20% de descuento en cupones para pulverizadores -
Spark