Hallo an alle. Als Einführung möchte ich Ihnen erzählen, wie ich zu einem solchen Leben gekommen bin.
Insbesondere vor dem Treffen mit Big Data und Spark musste ich häufig SQL-Abfragen optimieren, zuerst für MSSQL, dann für Oracle, und jetzt bin ich auf SparkSQL gestoßen.
Und wenn es bereits viele gute Bücher für das DBMS gibt, die die Methodik und die „Stifte“ beschreiben, die Sie drehen können, um den optimalen Abfrageplan zu erhalten, habe ich solche Bücher für Spark nicht gesehen. Ich bin auf mehr Artikel und Vorgehensweisen gestoßen, die sich eher auf die Arbeit mit der RDD / Dataset-API als auf reines SQL beziehen. Für mich ist eines der Nachschlagewerke zur SQL-Optimierung das Buch Oracle von J. Lewis. Grundlagen der Kostenoptimierung. " Ich suchte nach etwas Ähnlichem in der Tiefe des Studiums. Warum war das Forschungsgebiet speziell SparkSQL und nicht die zugrunde liegende API? Dann wurde das Interesse durch die Funktionen des Projekts verursacht, an dem ich arbeite.

Für einen unserer Kunden entwickelt unser Unternehmen ein Data Warehouse, von dem sich eine detaillierte Schicht und ein Teil der Vitrinen im Hadoop-Cluster und die endgültigen Vitrinen in Oracle befinden. Dieses Projekt umfasst eine umfangreiche Datenkonvertierungsschicht, die auf Spark implementiert ist. Um die Entwicklung und Konnektivität von ETL-Entwicklern zu beschleunigen, die nicht mit den Feinheiten der Big Data-Technologien vertraut sind, aber mit SQL- und ETL-Tools vertraut sind, wurde ein Tool entwickelt, das andere ETL-Tools, z. B. Informatica, ideologisch erinnert und es Ihnen ermöglicht, ETL-Prozesse mit nachfolgender Generation visuell zu entwerfen Code für Spark. Aufgrund der Komplexität der Algorithmen und der großen Anzahl von Transformationen verwenden Entwickler hauptsächlich SparkSQL-Abfragen.
Und hier beginnt die Geschichte, da ich eine große Anzahl von Fragen des Formulars „Warum funktioniert die Anfrage nicht / arbeitet langsam / funktioniert sie anders als Oracle?“ Beantworten musste. Dieser erwies sich für mich als der interessanteste Teil: „Warum funktioniert er langsam?“. Im Gegensatz zu dem DBMS, mit dem ich zuvor gearbeitet habe, können Sie außerdem in den Quellcode gelangen und die Antwort auf Ihre Fragen erhalten.
Einschränkungen und Annahmen
Mit Spark 2.3.0 werden Beispiele ausgeführt und der Quellcode analysiert.
Es wird davon ausgegangen, dass der Leser mit der Spark-Architektur und den allgemeinen Prinzipien des Abfrageoptimierers für eines der DBMS vertraut ist. Zumindest sollte der Ausdruck "Abfrageplan" sicherlich nicht überraschend sein.
Außerdem versucht dieser Artikel, keine Übersetzung des Spark-Optimierungscodes ins Russische zu werden. Für Dinge, die aus Sicht des Optimierers sehr interessant sind, aber im Quellcode gelesen werden können, werden sie hier einfach kurz mit Links zu den entsprechenden Klassen erwähnt.
Mach weiter mit dem Lernen
Beginnen wir mit einer kleinen Abfrage, um die grundlegenden Phasen zu untersuchen, die vom Parsen bis zur Ausführung durchlaufen werden.
scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal") scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust") scala> val df = spark.sql(""" | select bal.account_rk, cust.full_name | from bal | join cust | on bal.party_rk = cust.party_rk | and bal.actual_date = cust.actual_date | where bal.actual_date = cast('2017-12-31' as date) | """) df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string] scala> df.explain(true)
Das Hauptmodul, das für das Parsen von SQL und die Optimierung des Abfrageausführungsplans verantwortlich ist, ist Spark Catalyst.
Mit der erweiterten Ausgabe in der Beschreibung des Anforderungsplans (df.explain (true)) können Sie alle Phasen verfolgen, die die Anforderung durchläuft:
- Analysierter logischer Plan - Nach dem Parsen von SQL abrufen. In dieser Phase wird nur die syntaktische Richtigkeit der Anforderung überprüft.
== Parsed Logical Plan == 'Project ['bal.account_rk, 'cust.full_name] +- 'Filter ('bal.actual_date = cast(2017-12-31 as date)) +- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date)) :- 'UnresolvedRelation `bal` +- 'UnresolvedRelation `cust`
- Analysierter logischer Plan - In dieser Phase werden Informationen zur Struktur der verwendeten Entitäten hinzugefügt, die Übereinstimmung der Struktur und der angeforderten Attribute überprüft.
== Analyzed Logical Plan == account_rk: decimal(38,18), full_name: string Project [account_rk#1, full_name#59] +- Filter (actual_date#27 = cast(2017-12-31 as date)) +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- SubqueryAlias bal : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- SubqueryAlias cust +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc
- Der optimierte logische Plan ist für uns am interessantesten. In dieser Phase wird der resultierende Abfragebaum basierend auf den verfügbaren Optimierungsregeln konvertiert.
== Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531)) +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc
- Physischer Plan - Funktionen für den Zugriff auf Quelldaten werden zunehmend berücksichtigt, einschließlich Optimierungen zum Filtern von Partitionen und Daten, um den resultierenden Datensatz zu minimieren. Die Join-Ausführungsstrategie ist ausgewählt (mehr zu den unten verfügbaren Optionen).
== Physical Plan == *(2) Project [account_rk#1, full_name#59] +- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight :- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- *(2) Filter isnotnull(party_rk#18) : +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true])) +- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- *(1) Filter isnotnull(party_rk#57) +- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string>
Die folgenden Phasen der Optimierung und Ausführung (z. B. WholeStageCodegen) gehen über den Rahmen dieses Artikels hinaus, werden jedoch (sowie die oben beschriebenen Phasen) in Mastering Spark Sql ausführlich beschrieben .
Das Lesen des Abfrageausführungsplans erfolgt normalerweise „von innen“ und „von unten nach oben“, dh die am meisten verschachtelten Teile werden zuerst ausgeführt und gelangen schrittweise zur endgültigen Projektion ganz oben.
Arten von Abfrageoptimierern
Es können zwei Arten von Abfrageoptimierern unterschieden werden:
- Regelbasierte Optimierer (RBOs).
- Optimierer basierend auf einer Schätzung der Kosten für die Ausführung von Abfragen (Kostenbasierter Optimierer, CBO).
Die ersten konzentrieren sich auf die Verwendung eines Satzes fester Regeln, zum Beispiel die Anwendung von Filterbedingungen, aus denen in früheren Stadien, wenn möglich, die Berechnung von Konstanten usw.
Um die Qualität des resultierenden Plans zu bewerten, verwendet der CBO-Optimierer eine Kostenfunktion, die normalerweise von der Menge der verarbeiteten Daten, der Anzahl der Zeilen, die unter die Filter fallen, und den Kosten für die Ausführung bestimmter Vorgänge abhängt.
Um mehr über die CBO-Designspezifikation für Apache Spark zu erfahren, folgen Sie bitte den Links: der Spezifikation und der Hauptaufgabe von JIRA für die Implementierung .
Der Ausgangspunkt für die Erkundung aller vorhandenen Optimierungen ist der Code Optimizer.scala.
Hier ist ein kurzer Auszug aus einer langen Liste verfügbarer Optimierungen:
def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, ........
Es ist zu beachten, dass die Liste dieser Optimierungen sowohl regelbasierte Optimierungen als auch Optimierungen basierend auf Abfragekostenschätzungen enthält, die nachstehend erläutert werden.
Ein Merkmal von CBO ist, dass es für einen korrekten Betrieb Informationen über die Statistiken der in der Abfrage verwendeten Daten kennen und speichern muss - Anzahl der Datensätze, Datensatzgröße, Histogramme der Datenverteilung in den Tabellenspalten.
Zum Sammeln von Statistiken wird eine Reihe von SQL-Befehlen ANALYZE TABLE ... COMPUTE STATISTICS verwendet. Außerdem wird eine Reihe von Tabellen zum Speichern von Informationen benötigt. Die API wird über ExternalCatalog bereitgestellt, genauer gesagt über HiveExternalCatalog.
Da CBO derzeit standardmäßig deaktiviert ist, liegt der Schwerpunkt auf der Untersuchung der verfügbaren Optimierung und Nuancen von RBO.
Typen und Auswahl der Join-Strategie
In der Phase der Erstellung des physischen Plans zur Ausführung der Anforderung wird die Verbindungsstrategie ausgewählt. Die folgenden Optionen sind derzeit in Spark verfügbar (Sie können Code aus dem Code in SparkStrategies.scala lernen).
Broadcast-Hash-Join
Die beste Option ist, wenn eine der Join-Parteien klein genug ist (das Suffizienzkriterium wird durch den Parameter spark.sql.autoBroadcastJoinThreshold in SQLConf festgelegt). In diesem Fall wird diese Seite vollständig auf alle Executoren kopiert, bei denen ein Hash-Join mit der Haupttabelle vorhanden ist. Zusätzlich zur Größe sollte beachtet werden, dass im Fall einer äußeren Verknüpfung nur die Außenseite kopiert werden kann. Wenn möglich, müssen Sie als führende Tabelle im Fall einer äußeren Verknüpfung die Tabelle mit der größten Datenmenge verwenden.
, , SQL Oracle,
Sort Merge Join
Wenn spark.sql.join.preferSortMergeJoin standardmäßig aktiviert ist, wird diese Methode standardmäßig angewendet, wenn die Schlüssel für den Join sortiert werden können.
Von den Merkmalen kann angemerkt werden, dass im Gegensatz zum vorherigen Verfahren die Codegenerierungsoptimierung zum Ausführen der Operation nur für die innere Verknüpfung verfügbar ist.
Shuffle Hash Join
Wenn die Schlüssel nicht sortiert werden können oder die Standardauswahloption für Sortierzusammenführungsverknüpfungen deaktiviert ist, versucht Catalyst, einen Shuffle-Hash-Join anzuwenden. Zusätzlich zur Überprüfung der Einstellungen wird auch überprüft, ob Spark über genügend Speicher verfügt, um eine lokale Hash-Map für eine Partition zu erstellen (die Gesamtzahl der Partitionen wird durch Festlegen von spark.sql.shuffle.partitions festgelegt ).
BroadcastNestedLoopJoin und CartesianProduct
In dem Fall, in dem keine Möglichkeit eines direkten Vergleichs nach Schlüsseln besteht (z. B. eine Bedingung wie) oder keine Schlüssel zum Verknüpfen von Tabellen vorhanden sind, wird abhängig von der Größe der Tabellen entweder dieser Typ oder CartesianProduct ausgewählt.
Die Reihenfolge der Angabe von Tabellen in join'ah
In jedem Fall erfordert der Join das Mischen von Tabellen nach Schlüssel. Daher ist derzeit die Reihenfolge der Angabe von Tabellen wichtig, insbesondere bei der Ausführung mehrerer Verknüpfungen in einer Reihe (wenn Sie eine Bohrung sind, ist CBO nicht aktiviert und die Einstellung JOIN_REORDER_ENABLED ist nicht aktiviert).
Wenn möglich, sollte die Reihenfolge der Verknüpfungstabellen die Anzahl der Mischvorgänge für große Tabellen minimieren, für die Verknüpfungen mit demselben Schlüssel nacheinander ausgeführt werden sollten. Vergessen Sie auch nicht, die Daten für die Verknüpfung zu minimieren, um die Broadcast-Hash-Verknüpfung zu aktivieren.
Transitive Anwendung von Filterbedingungen
Betrachten Sie die folgende Abfrage:
select bal.account_rk, cust.full_name from balance bal join customer cust on bal.party_rk = cust.party_rk and bal.actual_date = cust.actual_date where bal.actual_date = cast('2017-12-31' as date)
Hier verbinden wir zwei Tabellen, die gemäß dem Feld actual_date auf dieselbe Weise partitioniert sind, und wenden einen expliziten Filter nur auf die Partition gemäß der Balance-Tabelle an.
Wie aus dem optimierten Abfrageplan hervorgeht, wird der Filter nach Datum auch auf den Kunden angewendet, und zum Zeitpunkt des Lesens von Daten von der Festplatte wird festgestellt, dass genau eine Partition benötigt wird.
== Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57)) +- Relation[,... 9 more fields] orc
Sie müssen jedoch nur den inneren Join durch den linken äußeren in der Abfrage ersetzen, da das Push-Prädikat für die Kundentabelle sofort abfällt und ein vollständiger Scan erfolgt, was ein unerwünschter Effekt ist.
== Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Relation[,... 9 more fields] orc
Typkonvertierung
Betrachten Sie ein einfaches Beispiel für die Auswahl aus einer Tabelle mit Filterung nach Clienttyp. Im Schema ist der Typ des Felds party_type string.
select party_rk, full_name from cust where actual_date = cast('2017-12-31' as date) and party_type = 101
Und vergleichen Sie die beiden resultierenden Pläne, den ersten - wenn wir uns auf den falschen Typ beziehen (es wird eine implizite Umwandlung in int geben), den zweiten - wenn der Typ dem Schema entspricht.
PushedFilters: [IsNotNull(PARTY_TYPE)] // . PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] // .
Ein ähnliches Problem wird beim Vergleichen von Datumsangaben mit einer Zeichenfolge beobachtet. Es gibt einen Filter zum Vergleichen von Zeichenfolgen. Ein Beispiel:
where OPER_DATE = '2017-12-31' Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31) PushedFilters: [IsNotNull(OPER_DATE)] where OPER_DATE = cast('2017-12-31' as date) PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)]
Für den Fall, dass eine implizite Typkonvertierung möglich ist, z. B. int -> decimal, führt der Optimierer dies selbst aus.
Weitere Forschung
Viele interessante Informationen zu den „Reglern“, mit denen Catalyst fein eingestellt werden kann, sowie zu den Möglichkeiten (Gegenwart und Zukunft) des Optimierers erhalten Sie bei SQLConf.scala.
Wie Sie standardmäßig sehen können, ist das Kostenoptimierungsprogramm derzeit noch deaktiviert.
val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false)
Sowie seine abhängigen Optimierungen im Zusammenhang mit der Neuordnung von join'ov.
val JOIN_REORDER_ENABLED = buildConf("spark.sql.cbo.joinReorder.enabled") .doc("Enables join reorder in CBO.") .booleanConf .createWithDefault(false)
oder
val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf .createWithDefault(false)
Kurze Zusammenfassung
Nur ein kleiner Teil der vorhandenen Optimierungen wurde berührt. Experimente zur Kostenoptimierung, die viel mehr Raum für die Abfragekonvertierung bieten können, stehen an. Eine weitere interessante Frage ist der Vergleich einer Reihe von Optimierungen beim Lesen von Dateien aus Parkett und Ork. Nach dem Jira des Projekts geht es um Parität, aber ist es wirklich so?
Außerdem:
- Die Analyse und Optimierung von Anfragen ist interessant und aufregend, insbesondere angesichts der Verfügbarkeit von Quellcodes.
- Die Einbeziehung von CBO bietet Raum für weitere Optimierungen und Forschungsarbeiten.
- Es ist notwendig, die Anwendbarkeit der Grundregeln zu überwachen, die es Ihnen ermöglichen, so viele "zusätzliche" Daten wie möglich zum frühestmöglichen Zeitpunkt herauszufiltern.
- Join ist ein notwendiges Übel, aber wenn möglich, lohnt es sich, sie zu minimieren und zu verfolgen, welche Implementierung unter der Haube verwendet wird.