Apache Spark, Lazy Evaluation und mehrseitige SQL-Abfragen

Das berühmte: spark arbeitet mit Datenrahmen, bei denen es sich um Transformationsalgorithmen handelt. Der Algorithmus wird im allerletzten Moment gestartet, um der Optimierung "mehr Platz" zu geben und sie aufgrund der Optimierung so effizient wie möglich auszuführen.


Im Folgenden wird analysiert, wie eine mehrseitige SQL-Abfrage in Atome zerlegt werden kann (ohne Einbußen bei der Effizienz) und wie die Ausführungszeit der ETL-Pipeline aufgrund dessen erheblich verkürzt werden kann.


Faule Bewertung


Ein interessantes Funktionsmerkmal von spark ist die verzögerte Auswertung: Transformationen werden nur ausgeführt, wenn die Aktionen abgeschlossen sind. So funktioniert es (grob): Die Algorithmen zum Erstellen der Datenrahmen vor der Aktion sind „zusammengeklebt“. Der Optimierer erstellt aus seiner Sicht den effizientesten endgültigen Algorithmus, der startet und das Ergebnis liefert (das von der Aktion angeforderte).


Was hier im Zusammenhang mit unserer Präsentation interessant ist: Jede komplexe Abfrage kann ohne Effizienzverlust in „Atome“ zerlegt werden. Lassen Sie uns etwas weiter analysieren.


Mehrseitiges SQL


Es gibt viele Gründe, warum wir "mehrseitige" SQL-Abfragen schreiben, eine der Hauptursachen für die Zurückhaltung, Zwischenobjekte zu erstellen (Zurückhaltung, die durch Effizienzanforderungen gestützt wird). Das folgende Beispiel zeigt eine relativ komplexe Abfrage (es ist natürlich auch sehr einfach, aber für die weitere Darstellung haben wir genug).


qSel = """ select con.contract_id as con_contract_id, con.begin_date as con_begin_date, con.product_id as con_product_id, cst.contract_status_type_id as cst_status_type_id, sbj.subject_id as sbj_subject_id, sbj.subject_name as sbj_subject_name, pp.birth_date as pp_birth_date from kasko.contract con join kasko.contract_status cst on cst.contract_status_id = con.contract_status_id join kasko.subject sbj on sbj.subject_id = con.owner_subject_id left join kasko.physical_person pp on pp.subject_id = con.owner_subject_id """ dfSel = sp.sql(qSel) 

Was sehen wir:


  • Daten werden aus mehreren Tabellen ausgewählt
  • Es werden verschiedene Join-Typen verwendet
  • Auswählbare Spalten werden nach Select Part, Join Part (und Where Part, aber hier ist es nicht hier - ich habe es der Einfachheit halber entfernt) verteilt.

Diese Abfrage kann in einfache Abfragen zerlegt werden (z. B. zuerst die Tabellen contract und contract_status verknüpfen, das Ergebnis in einer temporären Tabelle speichern und dann mit subject kombinieren, das Ergebnis auch in einer temporären Tabelle speichern usw.). Wenn wir wirklich komplexe Abfragen erstellen, tun wir dies sicherlich, und dann - nach dem Debuggen - sammeln wir all dies in einem mehrseitigen Block.


Was ist schlecht hier? Nichts, in der Tat, jeder arbeitet so und ist daran gewöhnt.


Aber es gibt Nachteile - oder besser gesagt, was zu verbessern ist - lesen Sie weiter.


Die gleiche Abfrage im Funken


Wenn Sie spark für die Transformation verwenden, können Sie diese Anforderung natürlich einfach annehmen und ausführen (und es ist gut, wir werden sie auch ausführen), aber Sie können auch in die andere Richtung gehen, probieren wir es aus.


Zerlegen wir diese "komplexe" Abfrage in "Atome" - elementare Datenrahmen. Wir erhalten so viele von ihnen wie Tabellen an der Abfrage beteiligt sind (in diesem Fall 4).


Hier sind sie - "Atome":


 dfCon = sp.sql("""select contract_id as con_contract_id, begin_date as con_begin_date, product_id as con_product_id, owner_subject_id as con_owner_subject_id, contract_status_id as con_contract_status_id from kasko.contract""") dfCStat = sp.sql("""select contract_status_id as cst_status_id, contract_status_type_id as cst_status_type_id from kasko.contract_status""") dfSubj = sp.sql("""select subject_id as sbj_subject_id, subject_type_id as sbj_subject_type_id, subject_name as sbj_subject_name from kasko.subject""") dfPPers = sp.sql("""select subject_id as pp_subject_id, birth_date as pp_birth_date from kasko.physical_person""") 

Mit Spark können Sie sie mit Ausdrücken verbinden, die von den tatsächlichen „Atomen“ getrennt sind.


 con_stat = f.col("cst_status_id")==f.col("con_contract_status_id") con_subj_own = f.col("con_owner_subject_id")==f.col("sbj_subject_id") con_ppers_own = f.col("con_owner_subject_id")==f.col("pp_subject_id") 

Dann sieht unsere "komplexe Abfrage" so aus:


 dfAtom = dfCon.join(dfCStat,con_stat, "inner")\ .join(dfSubj,con_subj_own,"inner") \ .join(dfPPers,con_ppers_own, "left") \ .drop("con_contract_status_id","sbj_subject_type_id", "pp_subject_id","con_owner_subject_id","cst_status_id") 

Was ist hier gut? Auf den ersten Blick ist es nichts, ganz im Gegenteil: Mit "komplexem" SQL können Sie verstehen, was passiert, mit unserer "atomaren" Abfrage ist es schwieriger zu verstehen, Sie müssen "Atome" und Ausdrücke betrachten.


Vergewissern wir uns zunächst, dass diese Abfragen gleichwertig sind - im Jupyter-Buch habe ich Pläne für die Erfüllung beider Abfragen angegeben (die Neugierigen können 10 Unterschiede feststellen, aber das Wesentliche - die Gleichwertigkeit - liegt auf der Hand). Dies ist natürlich kein Wunder, es sollte auch so sein (siehe oben für eine verzögerte Auswertung und Optimierung).


Was wir am Ende haben - die "mehrseitige" Anfrage und die "atomare" Anfrage arbeiten mit der gleichen Effizienz (dies ist wichtig, ohne dass weitere Überlegungen teilweise ihre Bedeutung verlieren).


Nun, lassen Sie uns das Gute in der "atomaren" Art der Erstellung von Abfragen finden.


Was ist ein "Atom" (elementarer Datenrahmen) ist unser Wissen über eine Teilmenge des Themenbereichs (Teil der relationalen Tabelle). Durch die Isolierung solcher „Atome“ wählen wir automatisch (und vor allem algorithmisch und reproduzierbar) einen wesentlichen Teil des für uns grenzenlosen Objekts aus, das als „physikalisches Datenmodell“ bezeichnet wird.


Was ist der Ausdruck, den wir beim Beitritt verwendet haben? Dies ist auch Wissen über den Themenbereich - auf diese Weise werden (wie im Ausdruck angegeben) die Entitäten des Themenbereichs (Tabellen in der Datenbank) miteinander verbunden.


Ich wiederhole - das ist wichtig - dieses „Wissen“ (Atome und Ausdrücke) materialisiert sich im ausführbaren Code (nicht im Diagramm oder in der verbalen Beschreibung). Dies ist der Code, der jedes Mal ausgeführt wird, wenn die ETL-Pipeline ausgeführt wird (das Beispiel stammt übrigens aus dem wirklichen Leben).


Der ausführbare Code - wie wir ihn von CleanCoder kennen - ist eines von zwei objektiv vorhandenen Artefakten, die behaupten, der "Titel" der Dokumentation zu sein. Das heißt, die Verwendung von „Atomen“ ermöglicht es uns, in einem so wichtigen Prozess wie der Dokumentation von Daten einen Schritt nach vorne zu machen.


Was kann man sonst noch in „Atomizität“ finden?


Fördereroptimierung


Im wirklichen Leben besteht eine ETL-Pipeline aus Dutzenden von Transformationen, die den oben genannten ähnlich sind. Dies habe ich mir übrigens nicht vorgestellt. Tabellen werden in ihnen sehr oft wiederholt (ich habe sie irgendwie in Excel berechnet - einige Tabellen werden in 40% der Abfragen verwendet).


Was passiert in Bezug auf Effizienz? Durcheinander - dieselbe Tabelle wird mehrmals aus der Quelle gelesen ...


Wie kann man es verbessern? Spark hat einen Mechanismus zum Zwischenspeichern von Datenrahmen - wir können explizit angeben, welche Datenrahmen und wie viel im Cache gespeichert werden sollen.


Dazu müssen wir doppelte Tabellen auswählen und Abfragen so erstellen, dass die Gesamt-Cache-Größe minimiert wird (da per Definition nicht alle Tabellen hineinpassen, gibt es große Datenmengen).


Kann dies mit mehrseitigen SSQ-Abfragen erfolgen? Ja, aber ... ein bisschen kompliziert (wir haben dort nicht wirklich Datenrahmen, nur Tabellen, sie können auch zwischengespeichert werden - die Spark-Community arbeitet daran).


Kann dies mit atomaren Abfragen erfolgen? Ja Und es ist nicht schwierig, wir müssen nur die „Atome“ verallgemeinern - fügen Sie die Spalten hinzu, die in allen Abfragen unserer Pipeline verwendet werden. Wenn Sie darüber nachdenken, ist dies unter dem Gesichtspunkt der Dokumentation „richtig“: Wenn in einer Abfrage eine Spalte verwendet wird (auch im Where-Teil), ist dies ein Teil der Daten des Themenbereichs, der für uns interessant ist.


Und dann ist alles einfach - wir zwischenspeichern sich wiederholende Atome (Datenrahmen), wir bilden eine Kette von Transformationen, so dass die Schnittmenge zwischengespeicherter Datenrahmen minimal ist (dies ist übrigens nicht trivial, aber algorithmisierbar).


Und wir bekommen den effizientesten Förderer komplett „gratis“. Ein nützliches und wichtiges Artefakt ist darüber hinaus die „Vorbereitung“ zur Dokumentation von Daten zum Themenbereich.


Robotisierung und Automatisierung


Atome sind anfälliger für die automatische Verarbeitung als "großes und mächtiges SQL" - ihre Struktur ist einfach und klar, Spark parst für uns (wofür er sich besonders bedankt), er erstellt auch Abfragepläne und analysiert, mit denen Sie die Reihenfolge der Abfrageverarbeitung automatisch neu ordnen können.


Hier können Sie also etwas spielen.


Abschließend


Vielleicht bin ich zu optimistisch - es scheint mir, dass dieser Pfad (Query Atomization) mehr funktioniert, als zu versuchen, eine Datenquelle nachträglich zu beschreiben. Übrigens, was ist die Verwendung von "Additiven"? Wir erzielen außerdem eine Effizienzsteigerung. Warum betrachte ich den atomaren Ansatz als "funktionierend"? Es ist Teil des regulären Prozesses, was bedeutet, dass die beschriebenen Artefakte eine echte Chance haben, auf lange Sicht relevant zu sein.


Ich habe wahrscheinlich etwas verpasst - Hilfe finden (in den Kommentaren)?

Source: https://habr.com/ru/post/de481924/


All Articles