《有效火花》一书。 缩放和优化”

图片 在本文中,我们将探讨如何从JVM中的各种编程语言访问Spark API,以及超越Scala语言时的一些性能问题。 即使您在JVM之外工作,本节也会很有用,因为非JVM语言通常取决于Java API,而不取决于Scala API。

使用其他编程语言并不总是意味着您需要超越JVM,并且在JVM方面在性能方面具有许多优势-主要是因为您不需要复制数据。 尽管不必使用特殊的绑定库或适配器从Scala语言外部访问Spark,但是从其他编程语言调用Scala代码可能很困难。 Spark框架支持在lambda表达式中使用Java 8,使用JDK的较早版本的人有机会从org.apache.spark.api.java.function包中实现适当的接口。 即使在不需要复制数据的情况下,使用另一种编程语言进行的工作也可能与性能有微小但重要的细微差别。

当使用类标签调用函数或使用隐式类型转换提供的属性时(例如,与Double和Tuple类相关的RDD集的所有功能),访问各种Scala API的困难尤其明显。 对于依赖隐式类型转换的机制,通常会提供等效的具体类以及对它们的显式转换。 虚拟类标签(例如,AnyRef)可以传递给依赖于类标签的函数(通常适配器自动执行此操作)。 使用特定的类而不是隐式类型转换通常不会导致额外的开销,但是伪类标记可以对某些编译器优化施加限制。

Java API在属性方面与Scala API并没有太大区别,只是偶尔缺少某些功能或开发人员API。 使用各种Java API(而不是直接调用Scala API) 支持其他JVM编程语言,例如带有DSL Flambo的Clojure语言和闪闪发光的库。 由于大多数语言绑定,甚至非JVM语言(如Python和R)都通过Java API进行处理,因此处理它非常有用。

Java API与Scala API非常相似,尽管它们独立于类标记和隐式转换。 缺少后者意味着必须使用显式类型转换函数(例如mapToDouble或mapToPair),而不是将元组或双精度对象的RDD集自动转换为具有附加功能的特殊类。 指定的功能仅为Java RDD集定义; 幸运的是,出于兼容性考虑,这些特殊类型仅是Scala RDD集的适配器。 另外,这些特殊功能返回各种数据类型,例如JavaDoubleRDD和JavaPairRDD,以及隐式Scala语言转换提供的功能。

让我们再次转到使用Java API进行字计数的规范示例(示例7.1)。 由于从Java调用Scala API有时可能很困难,因此Spark框架Java API几乎都是用Scala语言实现的,具有隐藏的类标记和隐式转换。 因此,Java适配器是一个非常薄的层,平均仅包含几行代码,并且重写它们实际上是毫不费力的。

范例7.1 字数统计(Java)

import scala.Tuple2;  import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.api.java.JavaSparkContext;  import java.util.regex.Pattern; import java.util.Arrays;  public final class WordCount { private static final Pattern pattern = Pattern.compile(" ");  public static void main(String[] args) throws Exception { JavaSparkContext jsc = new JavaSparkContext(); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(e -> Arrays.asList(                                           pattern.split(e)).iterator()); JavaPairRDD<String, Integer> wordsIntial = words.mapToPair(  e -> new Tuple2<String, Integer>(e, 1));   } } 

有时您可能需要将Java RDD转换为Scala RDD,反之亦然。 对于需要输入或返回Scala RDD集的库,这是最经常需要的,但是有时基本的Spark属性可能在Java API中不可用。 将Java RDD转换为Scala RDD是使用这些新功能的最简单方法。

如果您需要将Java RDD集转移到Scala库(需要在输入中使用常规RDD Spark),则可以使用rdd()方法访问基础的RDD Scala。 通常,这足以将最终的RDD转移到任何所需的Scala库中。 值得注意的例外是Scala库,该库在其工作中依赖于内容类型集类型或类标签信息的隐式转换。 在这种情况下,访问隐式转换的最简单方法是在Scala中编写一个小型适配器。 如果无法使用Scala shell,则可以调用JavaConverters类的相应函数并形成一个虚拟类标签。

要创建一个虚拟类标记,可以使用scala.reflect.ClassTag $ .MODULE $ .AnyRef()方法,或者使用scala.reflect.ClassTag $ .MODULE $ .apply(CLASS)获得真实的标记,如示例7.2和7.3所示。

要从Scala RDD转换为RDD Java,类标记信息通常比大多数Spark库更重要。 原因是,尽管各种JavaRDD类提供了以Scala RDD作为参数的可公开访问的构造函数,但是它们打算从Scala代码中调用,因此需要有关类标记的信息。

虚拟类标记最常用于通用代码或模板代码,在编译时确切的类型未知。 这样的标签通常就足够了,尽管可能会失去Scala代码方面的细微差别。 在极少数情况下,Scala代码需要准确的类标签信息。 在这种情况下,您将必须使用真实标签。 在大多数情况下,这不需要太多的工作并可以提高性能,因此请尽可能使用此类标签。

示例7.2 使Java / Scala RDD与虚拟类标签兼容

 public static JavaPairRDD wrapPairRDDFakeCt( RDD<Tuple2<String, Object>> RDD) { //       AnyRef —   //        , //        , //        //    ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef(); return new JavaPairRDD(rdd, fake, fake); } 

示例7.3 确保Java / Scala RDD兼容性

 public static JavaPairRDD wrapPairRDD( RDD<Tuple2<String, Object>> RDD) { //    ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class); ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class); return new JavaPairRDD(rdd, strCt, longCt); } 

大多数情况下,Spark SQL和ML管道API在Java和Scala中都保持一致。 但是,存在特定于Java的帮助器函数,与它们等效的Scala函数不容易调用。 这是它们的示例:Column类的各种数值函数,例如加号,减号等。 从Scala语言(+,-)很难调用它们的重载等效项。 代替使用JavaDataFrame和JavaSQLContext,在SQLContext和常规DataFrame集中提供了Java必需的方法。 这可能会使您感到困惑,因为无法从Java代码中使用Java文档中提到的某些方法,但是在这种情况下,提供了具有相同名称的函数以从Java进行调用。

Java语言的用户定义函数(UDF),就此而言,在除Scala之外的其他大多数语言中,它们都需要指定函数返回的值的类型,因为它无法逻辑推断,类似于在Scala语言中执行的方式(示例7.4) 。

示例7.4 Java UDF示例

 sqlContext.udf() .register("strlen", (String s) -> s.length(), DataTypes.StringType); 

尽管Scala和Java API要求的类型不同,但是包装Java集合类型不需要额外的复制。 对于迭代器,适配器所需的类型转换将在访问元素时以延迟的方式执行,这将使Spark框架在必要时转储数据(如第121页的“使用mapPartitions函数执行迭代器-迭代器转换”一节中所述)。 这非常重要,因为对于许多简单的操作而言,复制数据的成本可能会高于计算本身的成本。

超越Scala和JVM


如果您不局限于JVM,那么可用于工作的编程语言的数量将急剧增加。 但是,对于当前的Spark架构,由于在JVM和目标语言代码之间复制工作节点中的数据,因此在JVM外部(尤其是在工作节点上)进行工作可能会导致成本显着增加。 在复杂的操作中,复制数据成本的份额相对较小,但是在简单的操作中,它很容易导致总计算成本加倍。

Spark外部直接支持的第一种非JVM编程语言是Python,其API和接口已成为其他非JVM编程语言的实现所基于的模型。

PySpark如何工作


PySpark使用worker上的多个通道和驱动程序上提供Python / Java交互的专用库Py4J连接到JVM Spark。 在此之下,乍看之下,简单的架构隐藏了许多复杂的细微差别,这要归功于PySpark的工作,如图1所示。 7.1。 主要问题之一:即使将数据从Python worker复制到JVM,虚拟机也不容易解析。 Python和Java worker都需要付出特殊的努力,以确保JVM具有足够的信息来进行分区等操作。
图片

PySpark RDD套件


用于往返于JVM的数据传输以及运行Python执行器的资源成本非常可观。 您可以使用DataFrame / Dataset API避免PySpark RDD Suite API的许多性能问题,因为数据会尽可能长时间地保留在JVM中。

使用套接字和序列化字节将数据从JVM复制到Python。 可通过PipedRDD界面获得用于与其他语言的程序进行交互的更通用版本,其应用程序在“使用管道”部分中显示。

每次转换的数据交换渠道的组织(在两个方向上)将过于昂贵。 结果,PySpark使用专用的PipelinedRDD类在Python解释器内部组织了Python转换管道(如果可能的话),链接了过滤操作,并在其后映射了Python对象的迭代器上的映射。 即使您需要重新整理数据并且PySpark无法在单个工作人员的虚拟机中链接转换,您也可以重用Python解释器,因此启动解释器的成本不会进一步降低。

这只是难题的一部分。 常规PipedRDD与String类型一起使用,由于缺少自然键,因此不太容易随机播放。 在PySpark中,以及与绑定到许多其他编程语言的库中的图像和相似性中,使用了一种特殊的PairwiseRDD类型,其中键是一个长整数,并且其反序列化由Scala语言中的用户代码执行,旨在解析Python值。 这种反序列化的代价不是很高,但表明Spark框架中的Scala基本上将Python代码的结果视为“不透明”字节数组。

尽管非常简单,但是这种集成方法的效果出奇地好,并且Scala RDD集上的大多数操作都可以在Python中使用。 在代码中某些最困难的地方,可以访问库,例如MLlib,以及从各种来源加载/保存数据。

使用各种数据格式也有其局限性,因为从Spark框架加载/保存数据的代码中有很大一部分是基于Hadoop Java接口的。 这意味着所有加载的数据都首先加载到JVM中,然后才移至Python。

通常使用两种方法与MLlib进行交互:PySpark使用具有Scala类型转换的专用数据类型,或者在Python中重新实现该算法。 使用ML软件包可以避免这些问题,该软件包使用DataFrame / Dataset接口,该接口通常将数据存储在JVM中。

PySpark DataFrame和数据集套件


Python RDD Set API的DataFrame和Dataset集没有很多性能问题,因为它们将数据尽可能长时间地存储在JVM中。 为了说明DataFrame集相对于RDD集的优越性而进行的性能测试(参见图3.1),在Python中运行时(图7.2)存在显着差异。
图片

对于许多使用DataFrame和Dataset集的操作,您可能根本不需要从JVM移动数据,尽管使用各种UDF,UDAF和Python lambda表达式自然需要将某些数据移入JVM。 这导致了针对许多操作的以下简化方案,看起来类似于图1中所示的方案。 7.3。

图片

访问Scala中的基础Java对象和混合代码


PySpark架构的一个重要结果是,许多Spark Python框架类实际上是用于将来自Python代码的调用转换为可理解的JVM形式的适配器。

如果您与Scala / Java开发人员一起工作并且想与他们的代码进行交互,那么事先没有适配器可以访问您的代码,但是您可以注册Java / Scala UDF并从Python代码中使用它们。 从Spark 2.1开始,可以使用sqlContext对象的registerJavaFunction方法完成此操作。

有时,这些适配器没有所有必要的机制,并且由于Python没有针对调用私有方法的强大保护,因此您可以立即转向JVM。 相同的技术将使您可以在JVM中访问自己的代码,并且不费吹灰之力即可将结果转换回Python对象。

在p的“大查询计划和迭代算法”小节中。 91我们注意到使用DataFrame和RDD集的JVM版本来减少查询计划的重要性。 这是一种解决方法,因为当查询计划变得太大而无法由Spark SQL优化器进行处理时,由于将RDD集放在中间,SQL优化器将失去查看超出数据出现在RDD中的能力。 借助公共Python API可以实现相同的目的,但是,同时,由于所有数据都必须在Python的工作节点之间来回传递,因此DataFrame集的许多优点将丢失。 相反,您可以通过继续将数据存储在JVM中来减少源图(如示例7.5所示)。

示例7.5 使用Python整理DataFrame的大型查询计划

 def cutLineage(df): """    DataFrame —     .. :              >>> df = RDD.toDF() >>> cutDf = cutLineage(df) >>> cutDf.count() 3 """ jRDD = df._jdf.toJavaRDD() jSchema = df._jdf.schema() jRDD.cache() sqlCtx = df.sql_ctx try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema) newDF = DataFrame(newJavaDF, sqlCtx) return newDF 

一般来说,按照惯例,_j [abbreviated_name]语法用于访问大多数Python对象的内部Java版本。 因此,例如,SparkContext对象具有_jsc,它使您可以获取内部SparkContext Java对象。 这仅在驱动程序中是可能的,因此,当您将PySpark对象发送到工作节点时,将无法访问内部Java组件,并且大多数API将无法工作。

要在没有Python适配器的JVM中访问Spark类,可以在驱动程序上使用Py4J网关。 SparkContext对象在_gateway属性中包含指向网关的链接。 语法sc._gateway.jvm。[Full_class_name_in_JVM]将允许访问任何Java对象。

如果您的Scala类是根据类路径进行排列的,则类似的技术也适用于您自己的Scala类。 您可以使用带有--jars参数的spark-submit命令或通过设置spark.driver.extraClassPath配置属性,将JAR文件添加到类路径。 例7.6,它有助于产生水稻。 7.2是有意设计的,用于使用现有Scala代码生成用于性能测试的数据。

示例7.6 使用Py4J调用非Spark-JVM类

 sc = sqlCtx._sc #  SQL Context,   2.1, 2.0   , #  2.0, —  ,   :p try: try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx except: javaSqlCtx = sqlCtx._jwrapped jsc = sc._jsc scalasc = jsc.sc() gateway = sc._gateway #  java-,   RDD JVM- # Row (Int, Double).   RDD  Python   #  RDD  Java (   Row),   # ,      . #   Java-RDD  Row —     #    DataFrame,     #    RDD  Row. java_rdd = (gateway.jvm.com.highperformancespark.examples. tools.GenerateScalingData. generateMiniScaleRows(scalasc, rows, numCols)) #     JSON     . #  Python-     Java-. schema = StructType([ StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]) #   2.1 /  2.1 try: jschema = javaSqlCtx.parseDataType(schema.json()) except: jschema = sqlCtx._jsparkSession.parseDataType(schema.json()) #  RDD (Java)  DataFrame (Java) java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema) #  DataFrame (Java)  DataFrame (Python) python_dataframe = DataFrame(java_dataframe, sqlCtx) #  DataFrame (Python)   RDD pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1])) return (python_dataframe, pairRDD) 


尽管许多Python类只是Java对象的适配器,但并非所有Java对象都可以包装在Python对象中,然后在Spark中使用。 例如,PySpark RDD集中的对象表示为序列化的字符串,这些字符串只能在Python代码中轻松解析。 幸运的是,DataFrame对象在不同的​​编程语言之间是标准化的,因此,如果您可以将数据转换为DataFrame集,则可以将它们包装在Python对象中,或者直接将它们用作Python DataFrame,或者将Python DataFrame转换为此RDD相同的语言。

»这本书的更多信息可以在出版商的网站上找到
» 目录
» 摘录

喷雾器-Spark优惠券20%折扣

Source: https://habr.com/ru/post/zh-CN414525/


All Articles