Apache Spark, évaluation paresseuse et requêtes SQL multi-pages

Le fameux: spark fonctionne avec des dataframes, qui sont des algorithmes de transformation. L'algorithme est lancé au tout dernier moment afin de "donner plus d'espace" à l'optimisation et grâce à l'optimisation de l'exécuter le plus efficacement possible.


Sous la coupe, nous analyserons comment décomposer une requête SQL de plusieurs pages en atomes (sans perte d'efficacité) et comment réduire considérablement le temps d'exécution du pipeline ETL à cause de cela.


Évaluation paresseuse


L'évaluation paresseuse est une caractéristique fonctionnelle intéressante de spark: les transformations ne sont exécutées que lorsque les actions sont terminées. Comment ça marche (grosso modo): les algorithmes de construction des trames de données précédant l'action sont «collés ensemble», l'optimiseur construit l'algorithme final le plus efficace de son point de vue, qui démarre et donne le résultat (celui qui a été demandé par l'action).


Ce qui est intéressant ici dans le cadre de notre présentation: toute requête complexe peut être décomposée en «atomes» sans perte d'efficacité. Analysons un peu plus loin.


SQL multipage


Il existe de nombreuses raisons pour lesquelles nous écrivons des requêtes SQL «multi-pages», l'une des principales réticences, probablement, à créer des objets intermédiaires (réticence renforcée par des exigences d'efficacité). Ce qui suit est un exemple de requête relativement complexe (bien sûr, c'est même très simple, mais pour les besoins d'une présentation plus approfondie, nous en aurons assez).


qSel = """ select con.contract_id as con_contract_id, con.begin_date as con_begin_date, con.product_id as con_product_id, cst.contract_status_type_id as cst_status_type_id, sbj.subject_id as sbj_subject_id, sbj.subject_name as sbj_subject_name, pp.birth_date as pp_birth_date from kasko.contract con join kasko.contract_status cst on cst.contract_status_id = con.contract_status_id join kasko.subject sbj on sbj.subject_id = con.owner_subject_id left join kasko.physical_person pp on pp.subject_id = con.owner_subject_id """ dfSel = sp.sql(qSel) 

Que voyons-nous:


  • les données sont sélectionnées à partir de plusieurs tableaux
  • différents types de jointure sont utilisés
  • les colonnes sélectionnables sont distribuées par partie sélectionnée, partie jointe (et où partie, mais ici elle n'est pas ici - je l'ai supprimée pour plus de simplicité)

Cette requête peut être décomposée en requêtes simples (par exemple, combinez d'abord les tables contract et contract_status, enregistrez le résultat dans une table temporaire, puis combinez-le avec le sujet, enregistrez également le résultat dans une table temporaire, etc.). Certes, lorsque nous créons des requêtes vraiment complexes, nous le faisons, juste après - après le débogage - nous collectons tout cela dans un bloc de plusieurs pages.


Qu'est-ce qui est mauvais ici? Rien, en fait, tout le monde travaille comme ça et y est habitué.


Mais il y a des inconvénients - ou plutôt ce qu'il faut améliorer - lisez la suite.


La même requête dans spark


Lorsque vous utilisez spark pour la transformation, bien sûr, vous pouvez simplement prendre et exécuter cette demande (et ce sera bien, en fait, nous l'exécuterons aussi), mais vous pouvez aller dans l'autre sens, essayons-le.


Décomposons cette requête «complexe» en «atomes» - des trames de données élémentaires. Nous en obtiendrons autant que le nombre de tables impliquées dans la requête (dans ce cas, 4).


Les voici - des «atomes»:


 dfCon = sp.sql("""select contract_id as con_contract_id, begin_date as con_begin_date, product_id as con_product_id, owner_subject_id as con_owner_subject_id, contract_status_id as con_contract_status_id from kasko.contract""") dfCStat = sp.sql("""select contract_status_id as cst_status_id, contract_status_type_id as cst_status_type_id from kasko.contract_status""") dfSubj = sp.sql("""select subject_id as sbj_subject_id, subject_type_id as sbj_subject_type_id, subject_name as sbj_subject_name from kasko.subject""") dfPPers = sp.sql("""select subject_id as pp_subject_id, birth_date as pp_birth_date from kasko.physical_person""") 

Spark vous permet de les joindre en utilisant des expressions séparées des «atomes» réels, faisons ceci:


 con_stat = f.col("cst_status_id")==f.col("con_contract_status_id") con_subj_own = f.col("con_owner_subject_id")==f.col("sbj_subject_id") con_ppers_own = f.col("con_owner_subject_id")==f.col("pp_subject_id") 

Ensuite, notre «requête complexe» ressemblera à ceci:


 dfAtom = dfCon.join(dfCStat,con_stat, "inner")\ .join(dfSubj,con_subj_own,"inner") \ .join(dfPPers,con_ppers_own, "left") \ .drop("con_contract_status_id","sbj_subject_type_id", "pp_subject_id","con_owner_subject_id","cst_status_id") 

Qu'est-ce qui est bon ici? À première vue, ce n’est rien, bien au contraire: par un SQL «complexe», vous pouvez comprendre ce qui se passe, par notre requête «atomique», il est plus difficile à comprendre, vous devez regarder les «atomes» et les expressions.


Assurons-nous d'abord que ces requêtes sont équivalentes - dans le livre jupyter par référence, j'ai donné des plans pour répondre aux deux requêtes (les curieux peuvent trouver 10 différences, mais l'essence - l'équivalence - est évidente). Bien sûr, ce n'est pas un miracle, il devrait en être ainsi (voir ci-dessus pour une évaluation et une optimisation paresseuses).


Ce que nous avons en fin de compte - la demande «multi-page» et la demande «atomique» fonctionnent avec la même efficacité (c'est important, sans que ces considérations supplémentaires perdent partiellement leur sens).


Eh bien, trouvons maintenant le bon dans la manière «atomique» de construire des requêtes.


Ce qui est un «atome» (trame de données élémentaires) est notre connaissance d'un sous-ensemble du domaine (partie de la table relationnelle). En isolant de tels «atomes», nous sélectionnons automatiquement (et, surtout, de façon algorithmique et reproductible) une partie importante de ce qui est sans limites pour nous appelé le «modèle de données physiques».


Quelle est l'expression que nous avons utilisée lors de l'adhésion? Il s'agit également de connaissances sur le domaine - c'est ainsi (comme indiqué dans l'expression) que les entités du domaine (tableaux de la base de données) sont interconnectées.


Je répète - c'est important - cette «connaissance» (atomes et expressions) est matérialisée dans le code exécutable (pas dans le diagramme ou la description verbale), c'est le code qui est exécuté chaque fois que le pipeline ETL est exécuté (l'exemple est pris, en passant, de la vie réelle).


Le code exécutable - comme nous le savons par un codeur propre - est l'un des deux artefacts objectivement existants qui prétendent être le "titre" de la documentation. Autrement dit, l'utilisation des «atomes» nous permet de faire un pas en avant dans un processus aussi important que la documentation des données.


Que peut-on trouver d'autre dans «l'atomicité»?


Optimisation des convoyeurs


Dans la vraie vie, un ingénieur de données - en passant, je ne me suis pas présenté - un pipeline ETL se compose de dizaines de transformations similaires à celles ci-dessus. Les tableaux y sont très souvent répétés (je les ai en quelque sorte calculés dans Excel - certains tableaux sont utilisés dans 40% des requêtes).


Que se passe-t-il en termes d'efficacité? Mess - le même tableau est lu plusieurs fois depuis la source ...


Comment l'améliorer? Spark a un mécanisme de mise en cache des trames de données - nous pouvons spécifier explicitement quelles trames de données et combien nous voulons conserver dans le cache.


Ce que nous devons faire pour cela est de sélectionner des tables en double et de créer des requêtes de manière à minimiser la taille totale du cache (car toutes les tables, par définition, n'y rentreront pas, alors il y a des données volumineuses).


Cela peut-il être fait à l'aide de requêtes SSQ de plusieurs pages? Oui, mais ... un peu compliqué (nous n'avons pas vraiment de trames de données là-bas, seulement des tables, ils peuvent également être mis en cache - la communauté spark travaille sur cela).


Cela peut-il être fait en utilisant des requêtes atomiques? Oui! Et ce n'est pas difficile, il suffit de généraliser les «atomes» - ajoutez-y les colonnes utilisées dans toutes les requêtes de notre pipeline. Si vous y réfléchissez, c'est «correct» du point de vue de la documentation: si une colonne est utilisée dans une requête (même si dans la partie where), elle fait partie des données du domaine qui nous intéresse.


Et puis tout est simple - nous mettons en cache les atomes répétitifs (trames de données), nous construisons la chaîne de transformations de sorte que l'intersection des trames de données mises en cache soit minimale (ce n'est pas trivial, mais algorithmisable, soit dit en passant).


Et nous obtenons le convoyeur le plus efficace complètement «gratuit». Et en plus de cela, un artefact utile et important est la «préparation» pour la documentation des données sur le sujet.


Robotisation et automatisation


Les atomes sont plus sensibles au traitement automatique que les «grands et puissants SQL» - leur structure est simple et claire, spark fait de l'analyse syntaxique pour nous (pour cela, merci à lui), il construit également des plans de requête, analysant que vous pouvez réorganiser automatiquement la séquence de traitement des requêtes.


Ici, vous pouvez jouer quelque chose.


En conclusion


Je suis peut-être trop optimiste - il me semble que ce chemin (atomisation de requête) fonctionne plus que d'essayer de décrire une source de données après coup. De plus - à propos, à quoi servent les «additifs» - nous obtenons une augmentation de l’efficacité. Pourquoi est-ce que je considère que l'approche atomique "fonctionne"? Cela fait partie du processus normal, ce qui signifie que les artefacts décrits ont une réelle chance d'être pertinents à long terme.


J'ai probablement raté quelque chose - aider à trouver (dans les commentaires)?

Source: https://habr.com/ru/post/fr481924/


All Articles