
Nesta postagem, veremos como acessar a API Spark de várias linguagens de programação na JVM, bem como alguns problemas de desempenho ao ir além da linguagem Scala. Mesmo se você trabalhar fora da JVM, esta seção poderá ser útil, pois as linguagens que não são da JVM geralmente dependem da API Java e não da API Scala.
Trabalhar em outras linguagens de programação nem sempre significa que você precisa ir além da JVM, e trabalhar na JVM tem muitas vantagens em termos de desempenho - principalmente devido ao fato de você não precisar copiar dados. Embora não seja necessário usar bibliotecas de ligação ou adaptadores especiais para acessar o Spark de fora da linguagem Scala, pode ser difícil chamar o código Scala de outras linguagens de programação. A estrutura Spark suporta o uso do Java 8 em expressões lambda, e aqueles que usam versões mais antigas do JDK têm a oportunidade de implementar a interface apropriada no pacote org.apache.spark.api.java.function. Mesmo nos casos em que você não precisa copiar dados, o trabalho em outra linguagem de programação pode ter pequenas mas importantes nuances relacionadas ao desempenho.
As dificuldades em acessar várias APIs Scala são especialmente pronunciadas ao chamar funções com tags de classe ou ao usar propriedades fornecidas usando conversões implícitas de tipo (por exemplo, toda a funcionalidade dos conjuntos RDD relacionados às classes Double e Tuple). Para mecanismos que dependem de conversões implícitas de tipo, classes concretas equivalentes geralmente são fornecidas junto com conversões explícitas para elas. Tags de classe fictícias (digamos, AnyRef) podem ser passadas para funções que dependem de tags de classe (geralmente os adaptadores fazem isso automaticamente. O uso de classes específicas em vez de conversões implícitas de tipo geralmente não gera sobrecarga adicional, mas as tags de classe falsas podem impor restrições a algumas otimizações do compilador.
A API Java não é muito diferente da API Scala em termos de propriedades, apenas ocasionalmente algumas funcionalidades ou APIs de desenvolvedor estão ausentes. Outras linguagens de programação da JVM, como a linguagem Clojure com DSL
Flambo e a biblioteca
espumante , são
suportadas usando várias APIs Java em vez de chamar diretamente a API Scala. Como a maioria das ligações de idiomas, mesmo as linguagens não JVM, como Python e R, passam pela API
Java , será útil lidar com isso.
As APIs Java são muito semelhantes às APIs Scala, embora sejam independentes de tags de classe e conversões implícitas. A ausência deste último significa que, em vez de converter automaticamente os conjuntos RDD de tupla ou de objetos duplos em classes especiais com funções adicionais, é necessário usar funções explícitas de conversão de tipo (por exemplo, mapToDouble ou mapToPair). As funções especificadas são definidas apenas para conjuntos Java RDD; Felizmente pela compatibilidade, esses tipos especiais são apenas adaptadores para os conjuntos Scala RDD. Além disso, essas funções especiais retornam vários tipos de dados, como JavaDoubleRDD e JavaPairRDD, com recursos fornecidos por transformações implícitas da linguagem Scala.
Vamos voltar ao exemplo canônico de contagem de palavras usando a API Java (Exemplo 7.1). Como chamar a API Scala de Java às vezes pode ser difícil, as APIs Java da estrutura Spark são quase todas implementadas na linguagem Scala com tags de classe ocultas e conversões implícitas. Por esse motivo, os adaptadores Java são uma camada muito fina, consistindo, em média, de apenas algumas linhas de código, e reescrevê-los é praticamente fácil.
Exemplo 7.1 Contagem de palavras (Java)
import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.api.java.JavaSparkContext; import java.util.regex.Pattern; import java.util.Arrays; public final class WordCount { private static final Pattern pattern = Pattern.compile(" "); public static void main(String[] args) throws Exception { JavaSparkContext jsc = new JavaSparkContext(); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(e -> Arrays.asList( pattern.split(e)).iterator()); JavaPairRDD<String, Integer> wordsIntial = words.mapToPair( e -> new Tuple2<String, Integer>(e, 1)); } }
Às vezes, pode ser necessário converter RDDs Java em RDDs Scala ou vice-versa. Isso geralmente é necessário para bibliotecas que requerem entrada ou retorno de conjuntos Scala RDD, mas às vezes as propriedades básicas do Spark ainda não estão disponíveis na API Java. A conversão de RDDs Java em RDDs Scala é a maneira mais fácil de usar esses novos recursos.
Se você precisar transferir o conjunto Java RDD para a biblioteca Scala, que espera um RDD Spark regular na entrada, poderá acessar o RDD Scala subjacente usando o método rdd (). Geralmente, isso é suficiente para transferir o RDD final para qualquer biblioteca Scala desejada; Entre as exceções notáveis estão as bibliotecas Scala, que dependem de conversões implícitas de tipos de tipos de conteúdo ou informações de tags de classe em seus trabalhos. Nesse caso, a maneira mais fácil de acessar conversões implícitas é escrever um pequeno adaptador no Scala. Se os shells Scala não puderem ser utilizados, você poderá chamar a função correspondente da classe
JavaConverters e formar uma tag de classe fictícia.
Para criar uma tag de classe fictícia, você pode usar o método scala.reflect.ClassTag $ .MODULE $ .AnyRef () ou obter o real usando o método scala.reflect.ClassTag $ .MODULE $ .apply (CLASS), conforme mostrado nos exemplos 7.2 e 7.3.
Para converter de Scala RDD para RDD Java, as informações de tags de classe geralmente são mais importantes que a maioria das bibliotecas Spark. O motivo é que, embora várias classes JavaRDD forneçam construtores acessíveis ao público que usam o Scala RDD como argumentos, eles devem ser chamados a partir do código Scala e, portanto, requerem informações sobre a tag da classe.
As tags de classe fictícia são usadas com mais frequência no código genérico ou de modelo, onde os tipos exatos são desconhecidos no momento da compilação. Essas tags costumam ser suficientes, embora exista a possibilidade de perder algumas nuances ao lado do código Scala; em casos muito raros, o código Scala requer informações precisas sobre tags de classe. Nesse caso, você precisará usar uma tag real. Na maioria dos casos, isso não requer muito esforço e melhora o desempenho; portanto, tente usar essas tags sempre que possível.
Exemplo 7.2 Tornando Java / Scala RDD Compatível com um Tag de Classe Dummy
public static JavaPairRDD wrapPairRDDFakeCt( RDD<Tuple2<String, Object>> RDD) { // AnyRef — // , // , // // ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef(); return new JavaPairRDD(rdd, fake, fake); }
Exemplo 7.3 Garantindo a compatibilidade com Java / Scala RDD
public static JavaPairRDD wrapPairRDD( RDD<Tuple2<String, Object>> RDD) { // ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class); ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class); return new JavaPairRDD(rdd, strCt, longCt); }
As APIs de pipeline Spark SQL e ML foram, na maioria das vezes, tornadas consistentes em Java e Scala. No entanto, existem funções auxiliares específicas de Java e as funções Scala equivalentes a elas não são fáceis de chamar. Aqui estão seus exemplos: várias funções numéricas, como mais, menos, etc., para a classe Column. É difícil chamar seus equivalentes sobrecarregados do idioma Scala (+, -). Em vez de usar JavaDataFrame e JavaSQLContext, os métodos necessários para Java são disponibilizados no SQLContext e nos conjuntos regulares de DataFrame. Isso pode confundir você, porque alguns dos métodos mencionados na documentação Java não podem ser usados no código Java, mas, nesses casos, funções com os mesmos nomes são fornecidas para a chamada do Java.
UDFs (funções definidas pelo usuário) na linguagem Java e, na verdade, na maioria das outras linguagens, exceto Scala, exigem a especificação do tipo de valor retornado pela função, uma vez que não pode ser deduzido logicamente, semelhante à maneira como é executado na linguagem Scala (exemplo 7.4) .
Exemplo 7.4 Amostra de UDF para Java
sqlContext.udf() .register("strlen", (String s) -> s.length(), DataTypes.StringType);
Embora os tipos exigidos pelas APIs Scala e Java sejam diferentes, agrupar os tipos de coleção Java não requer cópia adicional. No caso de iteradores, a conversão de tipo necessária para o adaptador é realizada de maneira retardada à medida que os elementos são acessados, o que permite que a estrutura do Spark despeje dados, se necessário (conforme discutido na seção "Executando transformações de iterador-iterador usando a função mapPartitions" na página 121). Isso é muito importante porque, para muitas operações simples, o custo da cópia de dados pode ser maior que o custo do próprio cálculo.
Além de Scala e JVM
Se você não se limitar à JVM, o número de linguagens de programação disponíveis para o trabalho aumentará dramaticamente. No entanto, com a arquitetura atual do Spark, trabalhar fora da JVM - especialmente em nós de trabalho - pode levar a aumentos de custos significativos devido à cópia de dados nos nós de trabalho entre a JVM e o código do idioma de destino. Em operações complexas, a parcela do custo de cópia de dados é relativamente pequena, mas em operações simples, pode facilmente levar a uma duplicação do custo computacional total.
A primeira linguagem de programação não JVM diretamente suportada fora do Spark é o Python, sua API e interface se tornaram o modelo no qual as implementações para outras linguagens de programação não JVM são baseadas.
Como funciona o PySpark
O PySpark se conecta à JVM Spark usando uma mistura de canais nos trabalhadores e o Py4J, uma biblioteca especializada que fornece interação Python / Java, no driver. Sob esse aspecto, à primeira vista, a arquitetura simples oculta muitas nuances complexas, graças às quais o PySpark funciona, como mostra a Fig. 7.1 Um dos principais problemas: mesmo quando os dados são copiados de um trabalhador Python para a JVM, não é da forma que uma máquina virtual pode analisar facilmente. Esforços especiais são necessários por parte do trabalhador Python e Java para garantir que a JVM tenha informações suficientes para operações como particionamento.
Kits PySpark RDD
O custo dos recursos para transferir dados de e para a JVM, bem como para executar o executor Python, é significativo. Você pode evitar muitos problemas de desempenho com as APIs do PySpark RDD Suite usando as APIs DataFrame / Dataset, porque os dados permanecem na JVM pelo maior tempo possível.
A cópia de dados da JVM para Python é feita usando soquetes e bytes serializados. Uma versão mais geral para interagir com programas em outros idiomas está disponível na interface do PipedRDD, cuja aplicação é mostrada na subseção “Usando pipe”.
A organização de canais para troca de dados (em duas direções) para cada transformação seria muito cara. Como resultado, o PySpark organiza (se possível) o pipeline de transformação Python dentro do interpretador Python, encadeando a operação de filtro e, depois dele, o mapa, no iterador de objetos Python, usando a classe PipelinedRDD especializada. Mesmo quando você precisa embaralhar dados e o PySpark não conseguir encadear conversões na máquina virtual de um trabalhador individual, você pode reutilizar o interpretador Python, para que o custo de iniciar o intérprete não diminua ainda mais.
Isso é apenas parte do quebra-cabeça. Os PipedRDDs regulares funcionam com o tipo String, que não é tão fácil de embaralhar devido à falta de uma chave natural. No PySpark, e em sua imagem e semelhança nas bibliotecas vinculadas a muitas outras linguagens de programação, é usado um tipo especial de PairwiseRDD, em que a chave é um número inteiro longo e sua desserialização é realizada pelo código do usuário na linguagem Scala, destinado à análise dos valores do Python. O custo dessa desserialização não é muito alto, mas demonstra que o Scala na estrutura do Spark basicamente considera os resultados do código Python para trabalhar como matrizes de bytes "opacas".
Por toda a sua simplicidade, essa abordagem de integração funciona surpreendentemente bem, e a maioria das operações nos conjuntos Scala RDD estão disponíveis no Python. Em alguns dos lugares mais difíceis do código, as bibliotecas são acessadas, por exemplo, MLlib, além de carregar / salvar dados de várias fontes.
Trabalhar com vários formatos de dados também impõe suas limitações, pois uma parte significativa do código para carregar / salvar dados da estrutura Spark é baseada nas interfaces Java do Hadoop. Isso significa que todos os dados carregados são carregados primeiro na JVM e somente depois são movidos para Python.
Normalmente, duas abordagens são usadas para interagir com o MLlib: ou o PySpark usa um tipo de dados especializado com conversões do tipo Scala ou o algoritmo é reimplementado no Python. Esses problemas podem ser evitados com o pacote Spark ML, que usa a interface DataFrame / Dataset, que geralmente armazena dados na JVM.
Kits de DataFrame e Conjunto de Dados PySpark
Os conjuntos DataFrame e Conjunto de dados não apresentam muitos problemas de desempenho com as APIs do Python RDD Set, porque armazenam dados na JVM pelo maior tempo possível. O mesmo teste de desempenho que realizamos para ilustrar a superioridade dos conjuntos DataFrame sobre os conjuntos RDD (veja a Figura 3.1) mostra diferenças significativas ao executar no Python (Figura 7.2).
Para muitas operações com conjuntos de conjuntos de dados e conjuntos de dados, talvez não seja necessário mover os dados da JVM, embora o uso de várias expressões lambda UDF, UDAF e Python exija naturalmente mover alguns dados para a JVM. Isso leva ao seguinte esquema simplificado para muitas operações, que se parece com o mostrado na Fig. 7.3
Acesso a objetos Java subjacentes e código misto no Scala
Uma conseqüência importante da arquitetura PySpark é que muitas das classes de estrutura do Spark Python são realmente adaptadores para converter chamadas do código Python em um formato JVM compreensível.
Se você trabalha com desenvolvedores Scala / Java e deseja interagir com seu código, então não haverá adaptadores para acessar seu código, mas é possível registrar seu UDF Java / Scala e usá-los no código Python. A partir do Spark 2.1, isso pode ser feito usando o método registerJavaFunction do objeto sqlContext.
Às vezes, esses adaptadores não possuem todos os mecanismos necessários e, como o Python não possui proteção forte contra a chamada de métodos privados, você pode recorrer imediatamente à JVM. A mesma técnica permitirá acessar seu próprio código na JVM e, com pouco esforço, converter os resultados novamente em objetos Python.
Na subseção "Grandes planos de consulta e algoritmos iterativos" na p. 91, observamos a importância de usar a versão JVM dos conjuntos DataFrame e RDD para reduzir o plano de consulta. Essa é uma solução alternativa, porque quando os planos de consulta se tornam muito grandes para serem processados pelo otimizador Spark SQL, o otimizador SQL, devido à colocação do RDD definido no meio, perde a capacidade de olhar além do momento em que os dados aparecem no RDD. O mesmo pode ser alcançado com a ajuda das APIs públicas do Python, no entanto, muitas das vantagens dos conjuntos de DataFrame serão perdidas, porque todos os dados precisarão ir e voltar pelos nós de trabalho do Python. Em vez disso, você pode reduzir o gráfico de origem, continuando a armazenar dados na JVM (como mostrado no Exemplo 7.5).
Exemplo 7.5 Cortando um grande plano de consulta para um DataFrame usando Python
def cutLineage(df): """ DataFrame — .. : >>> df = RDD.toDF() >>> cutDf = cutLineage(df) >>> cutDf.count() 3 """ jRDD = df._jdf.toJavaRDD() jSchema = df._jdf.schema() jRDD.cache() sqlCtx = df.sql_ctx try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema) newDF = DataFrame(newJavaDF, sqlCtx) return newDF
De um modo geral, por convenção, a sintaxe _j [nome abreviado] é usada para acessar as versões Java internas da maioria dos objetos Python. Portanto, por exemplo, o objeto SparkContext possui _jsc, que permite obter o objeto Java interno do SparkContext. Isso é possível apenas no programa do driver, portanto, quando você envia objetos PySpark para nós de trabalho, não poderá acessar o componente Java interno e a maior parte da API não funcionará.
Para acessar a classe Spark na JVM, que não possui um adaptador Python, você pode usar o gateway Py4J no driver. O objeto SparkContext contém um link para o gateway na propriedade _gateway. A sintaxe sc._gateway.jvm. [Full_class_name_in_JVM] permitirá acesso a qualquer objeto Java.
Uma técnica semelhante funcionará para suas próprias classes Scala se elas forem organizadas de acordo com o caminho da classe. É possível incluir arquivos JAR no caminho de classe usando o comando spark-submit com o parâmetro --jars ou definindo as propriedades de configuração spark.driver.extraClassPath. Exemplo 7.6, que ajudou a gerar arroz. 7.2, foi intencionalmente projetado para gerar dados para teste de desempenho usando o código Scala existente.
Exemplo 7.6 Chamando classes que não são Spark-JVM usando Py4J
sc = sqlCtx._sc # SQL Context, 2.1, 2.0 , # 2.0, — , :p try: try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx except: javaSqlCtx = sqlCtx._jwrapped jsc = sc._jsc scalasc = jsc.sc() gateway = sc._gateway # java-, RDD JVM- # Row (Int, Double). RDD Python # RDD Java ( Row), # , . # Java-RDD Row — # DataFrame, # RDD Row. java_rdd = (gateway.jvm.com.highperformancespark.examples. tools.GenerateScalingData. generateMiniScaleRows(scalasc, rows, numCols)) # JSON . # Python- Java-. schema = StructType([ StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]) # 2.1 / 2.1 try: jschema = javaSqlCtx.parseDataType(schema.json()) except: jschema = sqlCtx._jsparkSession.parseDataType(schema.json()) # RDD (Java) DataFrame (Java) java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema) # DataFrame (Java) DataFrame (Python) python_dataframe = DataFrame(java_dataframe, sqlCtx) # DataFrame (Python) RDD pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1])) return (python_dataframe, pairRDD)
Embora muitas classes Python sejam simplesmente adaptadoras de objetos Java, nem todos os objetos Java podem ser agrupados em objetos Python e usados no Spark. Por exemplo, objetos nos conjuntos PySpark RDD são representados como seqüências serializadas, que podem ser analisadas apenas facilmente no código Python. Felizmente, os objetos DataFrame são padronizados entre diferentes linguagens de programação; portanto, se você pode converter seus dados em conjuntos de DataFrame, pode envolvê-los em objetos Python e usá-los diretamente como um DataFrame do Python, ou converter um DataFrame do Python em um RDD. mesmo idioma.
»Mais informações sobre o livro podem ser encontradas no
site do editor»
Conteúdo»
Trecho20% de desconto em cupons para Pulverizadores -
Spark