Das Buch „Effective Spark. Skalierung und Optimierung "

Bild In diesem Beitrag werden wir uns mit dem Zugriff auf die Spark-API aus verschiedenen Programmiersprachen in der JVM sowie einigen Leistungsproblemen befassen, die über die Scala-Sprache hinausgehen. Selbst wenn Sie außerhalb der JVM arbeiten, kann dieser Abschnitt hilfreich sein, da Nicht-JVM-Sprachen häufig von der Java-API und nicht von der Scala-API abhängen.

Das Arbeiten in anderen Programmiersprachen bedeutet nicht immer, dass Sie über die JVM hinausgehen müssen, und das Arbeiten in der JVM bietet viele Vorteile in Bezug auf die Leistung - hauptsächlich aufgrund der Tatsache, dass Sie keine Daten kopieren müssen. Obwohl es nicht erforderlich ist, spezielle Bindungsbibliotheken oder Adapter zu verwenden, um von außerhalb der Scala-Sprache auf Spark zuzugreifen, kann es schwierig sein, Scala-Code aus anderen Programmiersprachen aufzurufen. Das Spark-Framework unterstützt die Verwendung von Java 8 in Lambda-Ausdrücken, und diejenigen, die ältere Versionen des JDK verwenden, haben die Möglichkeit, die entsprechende Schnittstelle aus dem Paket org.apache.spark.api.java.function zu implementieren. Selbst in Fällen, in denen Sie keine Daten kopieren müssen, kann die Arbeit in einer anderen Programmiersprache kleine, aber wichtige Leistungsnuancen aufweisen.

Die Schwierigkeiten beim Zugriff auf verschiedene Scala-APIs sind besonders ausgeprägt, wenn Funktionen mit Klassen-Tags aufgerufen werden oder wenn Eigenschaften verwendet werden, die mithilfe impliziter Typkonvertierungen bereitgestellt werden (z. B. alle Funktionen von RDD-Sets, die sich auf die Double- und Tuple-Klassen beziehen). Für Mechanismen, die von impliziten Typkonvertierungen abhängen, werden häufig äquivalente konkrete Klassen zusammen mit expliziten Konvertierungen für diese bereitgestellt. Dummy-Klassen-Tags (z. B. AnyRef) können an Funktionen übergeben werden, die von Klassen-Tags abhängen (Adapter tun dies häufig automatisch. Die Verwendung bestimmter Klassen anstelle impliziter Typkonvertierungen führt normalerweise nicht zu zusätzlichem Overhead. Dummy-Klassen-Tags können jedoch einige Compiler-Optimierungen einschränken.

Die Java-API unterscheidet sich in ihren Eigenschaften nicht allzu sehr von der Scala-API. Nur gelegentlich fehlen einige Funktionen oder Entwickler-APIs. Andere JVM-Programmiersprachen wie die Clojure-Sprache mit DSL Flambo und die Sparkling- Bibliothek werden mithilfe verschiedener Java-APIs unterstützt, anstatt die Scala-API direkt aufzurufen. Da die meisten Sprachbindungen, auch Nicht-JVM-Sprachen wie Python und R, die Java- API durchlaufen, ist es hilfreich, damit umzugehen.

Die Java-APIs sind den Scala-APIs sehr ähnlich, obwohl sie unabhängig von Klassen-Tags und impliziten Konvertierungen sind. Das Fehlen des letzteren bedeutet, dass Sie anstelle der automatischen Konvertierung der RDD-Sätze von Tupel- oder Doppelobjekten in spezielle Klassen mit zusätzlichen Funktionen Funktionen der expliziten Typkonvertierung verwenden müssen (z. B. mapToDouble oder mapToPair). Die angegebenen Funktionen sind nur für Java-RDD-Sätze definiert. Zum Glück aus Kompatibilitätsgründen sind diese speziellen Typen nur Adapter für Scala RDD-Sets. Darüber hinaus geben diese Sonderfunktionen verschiedene Datentypen zurück, z. B. JavaDoubleRDD und JavaPairRDD, mit Funktionen, die durch implizite Scala-Sprachtransformationen bereitgestellt werden.

Wenden wir uns noch einmal dem kanonischen Beispiel der Wortzählung mit der Java-API zu (Beispiel 7.1). Da das Aufrufen der Scala-API von Java aus manchmal eine entmutigende Aufgabe sein kann, sind fast alle Spark-Java-Framework-APIs in der Scala-Sprache mit versteckten Klassen-Tags und impliziten Konvertierungen implementiert. Aus diesem Grund sind Java-Adapter eine sehr dünne Schicht, die im Durchschnitt nur aus wenigen Codezeilen besteht, und das Umschreiben ist praktisch mühelos.

Beispiel 7.1 Wortzählung (Java)

import scala.Tuple2;  import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.api.java.JavaSparkContext;  import java.util.regex.Pattern; import java.util.Arrays;  public final class WordCount { private static final Pattern pattern = Pattern.compile(" ");  public static void main(String[] args) throws Exception { JavaSparkContext jsc = new JavaSparkContext(); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(e -> Arrays.asList(                                           pattern.split(e)).iterator()); JavaPairRDD<String, Integer> wordsIntial = words.mapToPair(  e -> new Tuple2<String, Integer>(e, 1));   } } 

Manchmal müssen Sie möglicherweise Java-RDDs in Scala-RDDs konvertieren oder umgekehrt. Dies wird am häufigsten für Bibliotheken benötigt, die Eingaben benötigen oder Scala-RDD-Sätze zurückgeben. Manchmal sind die grundlegenden Spark-Eigenschaften jedoch noch nicht in der Java-API verfügbar. Das Konvertieren von Java-RDDs in Scala-RDDs ist der einfachste Weg, diese neuen Funktionen zu verwenden.

Wenn Sie den Java-RDD-Satz in die Scala-Bibliothek übertragen müssen, die einen regulären RDD-Spark an der Eingabe erwartet, können Sie mit der Methode rdd () auf die zugrunde liegende RDD-Scala zugreifen. Meistens reicht dies aus, um die endgültige RDD in eine beliebige gewünschte Scala-Bibliothek zu übertragen. Zu den bemerkenswerten Ausnahmen zählen Scala-Bibliotheken, die in ihrer Arbeit auf impliziten Konvertierungen von Arten von Inhaltssätzen oder Klassen-Tag-Informationen beruhen. In diesem Fall ist der einfachste Weg, auf implizite Konvertierungen zuzugreifen, das Schreiben eines kleinen Adapters in Scala. Wenn Scala-Shells nicht verwendet werden können, können Sie die entsprechende Funktion der JavaConverters- Klasse aufrufen und ein Dummy-Klassen-Tag bilden.

Um ein Dummy-Klassen-Tag zu erstellen, können Sie die Methode scala.reflect.ClassTag $ .MODULE $ .AnyRef () verwenden oder die echte Methode mit scala.reflect.ClassTag $ .MODULE $ .apply (CLASS) abrufen, wie in den Beispielen 7.2 und 7.3 gezeigt.

Für die Konvertierung von Scala RDD nach RDD Java sind Klassen-Tag-Informationen häufig wichtiger als die meisten Spark-Bibliotheken. Der Grund dafür ist, dass verschiedene JavaRDD-Klassen zwar öffentlich zugängliche Konstruktoren bereitstellen, die Scala RDD als Argumente verwenden, diese jedoch aus Scala-Code aufgerufen werden sollen und daher Informationen zum Klassen-Tag benötigen.

Dummy-Klassen-Tags werden am häufigsten in generischem Code oder Vorlagencode verwendet, bei dem die genauen Typen zum Zeitpunkt der Kompilierung unbekannt sind. Solche Tags sind oft genug, obwohl die Möglichkeit besteht, dass einige Nuancen auf der Seite des Scala-Codes verloren gehen. In sehr seltenen Fällen erfordert der Scala-Code genaue Informationen zu Klassen-Tags. In diesem Fall müssen Sie ein echtes Tag verwenden. In den meisten Fällen ist dies nicht sehr aufwändig und verbessert die Leistung. Versuchen Sie daher, solche Tags nach Möglichkeit zu verwenden.

Beispiel 7.2. Kompatibilität von Java / Scala RDD mit einem Dummy-Klassen-Tag

 public static JavaPairRDD wrapPairRDDFakeCt( RDD<Tuple2<String, Object>> RDD) { //       AnyRef —   //        , //        , //        //    ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef(); return new JavaPairRDD(rdd, fake, fake); } 

Beispiel 7.3. Sicherstellen der Java / Scala RDD-Kompatibilität

 public static JavaPairRDD wrapPairRDD( RDD<Tuple2<String, Object>> RDD) { //    ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class); ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class); return new JavaPairRDD(rdd, strCt, longCt); } 

Sowohl die Spark SQL- als auch die ML-Pipeline-APIs wurden in Java und Scala größtenteils konsistent gemacht. Es gibt jedoch Java-spezifische Hilfsfunktionen, und die ihnen entsprechenden Scala-Funktionen sind nicht einfach aufzurufen. Hier sind ihre Beispiele: verschiedene numerische Funktionen wie Plus, Minus usw. für die Column-Klasse. Es ist schwierig, ihre überladenen Entsprechungen aus der Scala-Sprache (+, -) zu bezeichnen. Anstatt JavaDataFrame und JavaSQLContext zu verwenden, werden Java-erforderliche Methoden in SQLContext und regulären DataFrame-Sets verfügbar gemacht. Dies kann Sie verwirren, da einige der in der Java-Dokumentation genannten Methoden nicht aus Java-Code verwendet werden können. In solchen Fällen werden jedoch Funktionen mit demselben Namen zum Aufrufen aus Java bereitgestellt.

Benutzerdefinierte Funktionen (UDFs) in der Java-Sprache und in den meisten anderen Sprachen außer Scala erfordern die Angabe des Typs des von der Funktion zurückgegebenen Werts, da dieser nicht logisch abgeleitet werden kann, ähnlich wie er in der Scala-Sprache ausgeführt wird (Beispiel 7.4). .

Beispiel 7.4. UDF-Beispiel für Java

 sqlContext.udf() .register("strlen", (String s) -> s.length(), DataTypes.StringType); 

Obwohl die für die Scala- und Java-APIs erforderlichen Typen unterschiedlich sind, erfordert das Umschließen von Java-Sammlungstypen kein zusätzliches Kopieren. Bei Iteratoren wird die für den Adapter erforderliche Typkonvertierung beim Zugriff auf die Elemente verzögert ausgeführt, sodass das Spark-Framework bei Bedarf Daten sichern kann (wie im Abschnitt "Durchführen von Iterator-Iterator-Transformationen mit der Funktion mapPartitions" auf Seite 121 erläutert). Dies ist sehr wichtig, da für viele einfache Vorgänge die Kosten für das Kopieren von Daten höher sein können als die Kosten für die Berechnung selbst.

Jenseits von Scala und JVM


Wenn Sie sich nicht auf die JVM beschränken, steigt die Anzahl der für die Arbeit verfügbaren Programmiersprachen dramatisch an. Bei der aktuellen Spark-Architektur kann das Arbeiten außerhalb der JVM - insbesondere auf Arbeitsknoten - zu erheblichen Kostensteigerungen führen, da Daten in Arbeitsknoten zwischen der JVM und dem Zielsprachencode kopiert werden. Bei komplexen Vorgängen ist der Anteil der Kosten für das Kopieren von Daten relativ gering, bei einfachen Vorgängen kann dies jedoch leicht zu einer Verdoppelung der gesamten Rechenkosten führen.

Die erste Nicht-JVM-Programmiersprache, die direkt außerhalb von Spark unterstützt wird, ist Python. Die API und die Schnittstelle sind das Modell, auf dem Implementierungen für andere Nicht-JVM-Programmiersprachen basieren.

Wie PySpark funktioniert


PySpark stellt eine Verbindung zu JVM Spark her, indem eine Mischung aus Kanälen für Worker und Py4J, eine spezialisierte Bibliothek, die Python / Java-Interaktion bietet, für den Treiber verwendet werden. Darunter verbirgt die einfache Architektur auf den ersten Blick viele komplexe Nuancen, dank derer PySpark funktioniert, wie in Abb. 7.1. Eines der Hauptprobleme: Selbst wenn die Daten von einem Python-Worker in die JVM kopiert werden, können sie von einer virtuellen Maschine nicht einfach analysiert werden. Sowohl der Python- als auch der Java-Mitarbeiter müssen besondere Anstrengungen unternehmen, um sicherzustellen, dass die JVM über genügend Informationen für Vorgänge wie die Partitionierung verfügt.
Bild

PySpark RDD Kits


Die Kosten für Ressourcen zum Übertragen von Daten zur und von der JVM sowie zum Ausführen des Python-Executors sind erheblich. Sie können viele Leistungsprobleme mit den PySpark RDD Suite-APIs mithilfe der DataFrame / Dataset-APIs vermeiden, da die Daten so lange wie möglich in der JVM verbleiben.

Das Kopieren von Daten von der JVM nach Python erfolgt mithilfe von Sockets und serialisierten Bytes. Eine allgemeinere Version für die Interaktion mit Programmen in anderen Sprachen ist über die PipedRDD-Schnittstelle verfügbar, deren Anwendung im Unterabschnitt „Using Pipe“ gezeigt wird.

Die Organisation von Kanälen für den Datenaustausch (in zwei Richtungen) für jede Transformation wäre zu teuer. Infolgedessen organisiert PySpark (wenn möglich) die Python-Transformationspipeline im Python-Interpreter und verkettet die Filteroperation und anschließend die Zuordnung auf dem Python-Objektiterator mithilfe der speziellen PipelinedRDD-Klasse. Selbst wenn Sie Daten mischen müssen und PySpark keine Konvertierungen in der virtuellen Maschine eines einzelnen Mitarbeiters verketten kann, können Sie den Python-Interpreter wiederverwenden, sodass sich die Kosten für das Starten des Interpreters nicht weiter verlangsamen.

Dies ist nur ein Teil des Puzzles. Normale PipedRDDs arbeiten mit dem String-Typ, der aufgrund des Fehlens eines natürlichen Schlüssels nicht so einfach zu mischen ist. In PySpark und in seinem Image und seiner Ähnlichkeit in Bibliotheken, die an viele andere Programmiersprachen gebunden sind, wird ein spezieller Typ von PairwiseRDD verwendet, bei dem der Schlüssel eine lange Ganzzahl ist und dessen Deserialisierung durch Benutzercode in der Scala-Sprache durchgeführt wird, der zum Parsen von Python-Werten vorgesehen ist. Die Kosten für diese Deserialisierung sind nicht zu hoch, aber es zeigt, dass Scala im Spark-Framework die Ergebnisse von Python-Code grundsätzlich als "undurchsichtige" Byte-Arrays betrachtet.

Bei aller Einfachheit funktioniert dieser Integrationsansatz überraschend gut, und die meisten Operationen an Scala-RDD-Sets sind in Python verfügbar. An einigen der schwierigsten Stellen im Code wird auf Bibliotheken zugegriffen, z. B. auf MLlib, und es werden Daten aus verschiedenen Quellen geladen / gespeichert.

Das Arbeiten mit verschiedenen Datenformaten ist ebenfalls mit Einschränkungen verbunden, da ein wesentlicher Teil des Codes zum Laden / Speichern von Daten aus dem Spark-Framework auf den Hadoop-Java-Schnittstellen basiert. Dies bedeutet, dass alle geladenen Daten zuerst in die JVM geladen und erst dann nach Python verschoben werden.

Für die Interaktion mit MLlib werden normalerweise zwei Ansätze verwendet: Entweder verwendet PySpark einen speziellen Datentyp mit Scala-Typkonvertierungen, oder der Algorithmus wird in Python erneut implementiert. Diese Probleme können mit dem Spark ML-Paket vermieden werden, das die DataFrame / Dataset-Schnittstelle verwendet, die normalerweise Daten in der JVM speichert.

PySpark DataFrame- und Dataset-Kits


Die DataFrame- und Dataset-Sets weisen mit den Python-RDD-Set-APIs nicht viele Leistungsprobleme auf, da sie Daten so lange wie möglich in der JVM speichern. Der gleiche Leistungstest, den wir durchgeführt haben, um die Überlegenheit von DataFrame-Sets gegenüber RDD-Sets zu veranschaulichen (siehe Abbildung 3.1), zeigt signifikante Unterschiede bei der Ausführung in Python (Abbildung 7.2).
Bild

Bei vielen Vorgängen mit DataFrame- und Dataset-Sätzen müssen Sie möglicherweise überhaupt keine Daten aus der JVM verschieben, obwohl für die Verwendung verschiedener UDF-, UDAF- und Python-Lambda-Ausdrücke natürlich einige der Daten in die JVM verschoben werden müssen. Dies führt zu dem folgenden vereinfachten Schema für viele Operationen, das wie das in Fig. 1 gezeigte aussieht. 7.3.

Bild

Zugriff auf zugrunde liegende Java-Objekte und gemischten Code in Scala


Eine wichtige Konsequenz der PySpark-Architektur ist, dass viele der Spark Python-Framework-Klassen tatsächlich Adapter sind, um Aufrufe aus Python-Code in eine verständliche JVM-Form zu übersetzen.

Wenn Sie mit Scala / Java-Entwicklern zusammenarbeiten und mit deren Code interagieren möchten, gibt es im Voraus keine Adapter für den Zugriff auf Ihren Code. Sie können jedoch Ihre Java / Scala-UDF registrieren und sie aus Python-Code verwenden. Ab Spark 2.1 kann dies mit der registerJavaFunction-Methode des sqlContext-Objekts erfolgen.

Manchmal verfügen diese Adapter nicht über alle erforderlichen Mechanismen, und da Python keinen starken Schutz gegen das Aufrufen privater Methoden bietet, können Sie sich sofort an die JVM wenden. Mit derselben Technik können Sie auf Ihren eigenen Code in der JVM zugreifen und die Ergebnisse mit geringem Aufwand wieder in Python-Objekte konvertieren.

Im Unterabschnitt "Große Abfragepläne und iterative Algorithmen" auf S. 22. Wir haben festgestellt, wie wichtig es ist, die JVM-Version der DataFrame- und RDD-Sets zu verwenden, um den Abfrageplan zu reduzieren. Dies ist eine Problemumgehung, da der SQL-Optimierer aufgrund der Platzierung des RDD-Satzes in der Mitte die Möglichkeit verliert, über den Moment hinaus zu schauen, in dem die Daten in RDD angezeigt werden, wenn die Abfragepläne für die Verarbeitung durch den Spark SQL-Optimierer zu groß werden. Dasselbe kann mit Hilfe öffentlicher Python-APIs erreicht werden. Gleichzeitig gehen jedoch viele Vorteile von DataFrame-Sets verloren, da alle Daten über die Arbeitsknoten von Python hin und her gehen müssen. Stattdessen können Sie das Ursprungsdiagramm reduzieren, indem Sie weiterhin Daten in der JVM speichern (wie in Beispiel 7.5 gezeigt).

Beispiel 7.5 Trimmen eines großen Abfrageplans für einen DataFrame mit Python

 def cutLineage(df): """    DataFrame —     .. :              >>> df = RDD.toDF() >>> cutDf = cutLineage(df) >>> cutDf.count() 3 """ jRDD = df._jdf.toJavaRDD() jSchema = df._jdf.schema() jRDD.cache() sqlCtx = df.sql_ctx try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema) newDF = DataFrame(newJavaDF, sqlCtx) return newDF 

Im Allgemeinen wird gemäß der Konvention die Syntax _j [abgekürzter_Name] verwendet, um auf die internen Java-Versionen der meisten Python-Objekte zuzugreifen. So verfügt das SparkContext-Objekt beispielsweise über _jsc, mit dem Sie das interne SparkContext-Java-Objekt abrufen können. Dies ist nur im Treiberprogramm möglich. Wenn Sie also PySpark-Objekte an Arbeitsknoten senden, können Sie nicht auf die interne Java-Komponente zugreifen und der größte Teil der API funktioniert nicht.

Um auf die Spark-Klasse in der JVM zuzugreifen, die keinen Python-Adapter hat, können Sie das Py4J-Gateway auf dem Treiber verwenden. Das SparkContext-Objekt enthält einen Link zum Gateway in der Eigenschaft _gateway. Die Syntax sc._gateway.jvm. [Full_class_name_in_JVM] ermöglicht den Zugriff auf jedes Java-Objekt.

Eine ähnliche Technik funktioniert für Ihre eigenen Scala-Klassen, wenn sie gemäß dem Klassenpfad angeordnet sind. Sie können dem Klassenpfad JAR-Dateien hinzufügen, indem Sie den Befehl spark-submit mit dem Parameter --jars verwenden oder die Konfigurationseigenschaften spark.driver.extraClassPath festlegen. Beispiel 7.6, das zur Erzeugung von Reis beitrug. 7.2 wurde absichtlich entwickelt, um Daten für Leistungstests unter Verwendung des vorhandenen Scala-Codes zu generieren.

Beispiel 7.6 Aufrufen von Nicht-Spark-JVM-Klassen mit Py4J

 sc = sqlCtx._sc #  SQL Context,   2.1, 2.0   , #  2.0, —  ,   :p try: try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx except: javaSqlCtx = sqlCtx._jwrapped jsc = sc._jsc scalasc = jsc.sc() gateway = sc._gateway #  java-,   RDD JVM- # Row (Int, Double).   RDD  Python   #  RDD  Java (   Row),   # ,      . #   Java-RDD  Row —     #    DataFrame,     #    RDD  Row. java_rdd = (gateway.jvm.com.highperformancespark.examples. tools.GenerateScalingData. generateMiniScaleRows(scalasc, rows, numCols)) #     JSON     . #  Python-     Java-. schema = StructType([ StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]) #   2.1 /  2.1 try: jschema = javaSqlCtx.parseDataType(schema.json()) except: jschema = sqlCtx._jsparkSession.parseDataType(schema.json()) #  RDD (Java)  DataFrame (Java) java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema) #  DataFrame (Java)  DataFrame (Python) python_dataframe = DataFrame(java_dataframe, sqlCtx) #  DataFrame (Python)   RDD pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1])) return (python_dataframe, pairRDD) 


Obwohl viele Python-Klassen lediglich Adapter von Java-Objekten sind, können nicht alle Java-Objekte in Python-Objekte eingeschlossen und dann in Spark verwendet werden. Beispielsweise werden Objekte in PySpark-RDD-Sets als serialisierte Zeichenfolgen dargestellt, die nur in Python-Code einfach analysiert werden können. Glücklicherweise sind DataFrame-Objekte zwischen verschiedenen Programmiersprachen standardisiert. Wenn Sie also Ihre Daten in DataFrame-Sets konvertieren können, können Sie sie in Python-Objekte einbinden und sie entweder direkt als Python DataFrame verwenden oder einen Python DataFrame in eine RDD davon konvertieren gleiche Sprache.

»Weitere Informationen zum Buch finden Sie auf der Website des Herausgebers
» Inhalt
» Auszug

20% Rabatt auf Gutscheine für Sprayers - Spark

Source: https://habr.com/ru/post/de414525/


All Articles