Spark SQL. Un peu sur l'optimiseur de requêtes

Bonjour à tous. En guise d'introduction, je veux vous dire comment je suis arrivé à une telle vie.


Avant de rencontrer Big Data et Spark, en particulier, j'avais beaucoup et souvent à optimiser les requêtes SQL, d'abord pour MSSQL, puis pour Oracle, et maintenant je suis tombé sur SparkSQL.


Et s'il existe déjà de nombreux bons livres pour le SGBD qui décrivent la méthodologie et les «stylos» que vous pouvez tourner pour obtenir le plan de requête optimal, alors je n'ai pas vu de tels livres pour Spark. Je suis tombé sur plus d'articles et d'ensembles de pratiques, plus liés à l'utilisation de l'API RDD / Dataset qu'à du SQL pur. Pour moi, l'un des ouvrages de référence sur l'optimisation SQL est le livre de J. Lewis, Oracle. Bases de l'optimisation des coûts. " J'ai cherché quelque chose de similaire en profondeur d'étude. Pourquoi l'objet de la recherche était-il spécifiquement SparkSQL, et non l'API sous-jacente? L'intérêt a ensuite été provoqué par les caractéristiques du projet sur lequel je travaille.




Pour l'un de nos clients, notre société développe un entrepôt de données, dont une couche détaillée et une partie des vitrines sont dans le cluster Hadoop, et les vitrines finales sont dans Oracle. Ce projet implique une couche de conversion de données étendue, qui est implémentée sur Spark. Pour accélérer le développement et la connectivité des développeurs ETL qui ne connaissent pas les subtilités des technologies Big Data, mais qui connaissent les outils SQL et ETL, un outil a été développé qui rappelle idéologiquement d'autres outils ETL, par exemple Informatica, et vous permet de concevoir visuellement des processus ETL avec la génération suivante code pour Spark. En raison de la complexité des algorithmes et du grand nombre de transformations, les développeurs utilisent principalement des requêtes SparkSQL.


Et c'est là que commence l'histoire, car j'ai dû répondre à un grand nombre de questions du formulaire «Pourquoi la demande ne fonctionne-t-elle pas / fonctionne-t-elle lentement / fonctionne-t-elle différemment d'Oracle?». Celui-ci s'est avéré être la partie la plus intéressante pour moi: "Pourquoi ça marche lentement?". De plus, contrairement au SGBD avec lequel j'ai travaillé auparavant, vous pouvez entrer dans le code source et obtenir la réponse à vos questions.


Limitations et hypothèses


Spark 2.3.0 est utilisé pour exécuter des exemples et analyser le code source.
Il est supposé que le lecteur connaît l'architecture Spark et les principes généraux de l'optimiseur de requêtes pour l'un des SGBD. Au minimum, l'expression «plan de requête» ne devrait certainement pas être surprenante.


De plus, cet article essaie de ne pas devenir une traduction du code de l'optimiseur Spark en russe, donc pour les choses très intéressantes du point de vue de l'optimiseur, mais qui peuvent être lues dans le code source, elles seront simplement brièvement mentionnées ici avec des liens vers les classes correspondantes.


Passez à l'étude


Commençons par une petite requête pour explorer les étapes de base à travers lesquelles elle passe de l'analyse à l'exécution.


scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal") scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust") scala> val df = spark.sql(""" | select bal.account_rk, cust.full_name | from bal | join cust | on bal.party_rk = cust.party_rk | and bal.actual_date = cust.actual_date | where bal.actual_date = cast('2017-12-31' as date) | """) df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string] scala> df.explain(true) 

Le module principal chargé d'analyser SQL et d'optimiser le plan d'exécution des requêtes est Spark Catalyst.


La sortie développée dans la description du plan de demande (df.explain (true)) vous permet de suivre toutes les étapes de la demande:


  • Plan logique analysé - obtenez après l'analyse SQL. À ce stade, seule l'exactitude syntaxique de la demande est vérifiée.

 == Parsed Logical Plan == 'Project ['bal.account_rk, 'cust.full_name] +- 'Filter ('bal.actual_date = cast(2017-12-31 as date)) +- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date)) :- 'UnresolvedRelation `bal` +- 'UnresolvedRelation `cust` 

  • Plan logique analysé - à ce stade, des informations sur la structure des entités utilisées sont ajoutées, la correspondance de la structure et les attributs demandés sont vérifiés.

 == Analyzed Logical Plan == account_rk: decimal(38,18), full_name: string Project [account_rk#1, full_name#59] +- Filter (actual_date#27 = cast(2017-12-31 as date)) +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- SubqueryAlias bal : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- SubqueryAlias cust +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc 

  • Le plan logique optimisé est le plus intéressant pour nous. À ce stade, l'arbre de requête résultant est converti en fonction des règles d'optimisation disponibles.

 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531)) +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc 

  • Plan physique - les caractéristiques de l'accès aux données sources commencent à être prises en compte, y compris les optimisations pour filtrer les partitions et les données afin de minimiser l'ensemble de données résultant. La stratégie d'exécution de jointure est sélectionnée (pour plus de détails sur les options disponibles, voir ci-dessous).

 == Physical Plan == *(2) Project [account_rk#1, full_name#59] +- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight :- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- *(2) Filter isnotnull(party_rk#18) : +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true])) +- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- *(1) Filter isnotnull(party_rk#57) +- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string> 

Les étapes suivantes d'optimisation et d'exécution (par exemple, WholeStageCodegen) dépassent le cadre de cet article, mais sont décrites en détail (ainsi que les étapes décrites ci-dessus) dans Mastering Spark Sql .


La lecture du plan d'exécution des requêtes se produit généralement «de l'intérieur» et «de bas en haut», c'est-à-dire que les parties les plus imbriquées sont exécutées en premier et progressent progressivement vers la projection finale située tout en haut.


Types d'optimiseurs de requête


On peut distinguer deux types d'optimiseurs de requête:


  • Optimiseurs basés sur des règles (RBO).
  • Optimiseurs basés sur une estimation du coût d'exécution des requêtes (Optimiseur basé sur les coûts, CBO).

Les premiers se concentrent sur l'utilisation d'un ensemble de règles fixes, par exemple, l'application de conditions de filtrage d'où aux stades antérieurs, si possible, le calcul des constantes, etc.


Pour évaluer la qualité du plan résultant, l'optimiseur CBO utilise une fonction de coût, qui dépend généralement de la quantité de données traitées, du nombre de lignes qui tombent sous les filtres et du coût d'exécution de certaines opérations.


Pour en savoir plus sur la spécification de conception CBO pour Apache Spark, veuillez suivre les liens: la spécification et la tâche JIRA principale pour l'implémentation .


Le point de départ pour explorer la gamme complète des optimisations existantes est le code Optimizer.scala.


Voici un court extrait d'une longue liste d'optimisations disponibles:


 def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, ........ 

Il convient de noter que la liste de ces optimisations comprend à la fois des optimisations basées sur des règles et des optimisations basées sur des estimations de coût de requête, qui seront discutées ci-dessous.


Une caractéristique de CBO est que pour un fonctionnement correct, il doit connaître et stocker des informations sur les statistiques des données utilisées dans la requête - le nombre d'enregistrements, la taille des enregistrements, les histogrammes de distribution des données dans les colonnes du tableau.


Pour collecter des statistiques, un ensemble de commandes SQL ANALYZE TABLE ... COMPUTE STATISTICS est utilisé, en outre, un ensemble de tables est nécessaire pour stocker des informations, l'API est fournie via ExternalCatalog, plus précisément via HiveExternalCatalog.


Puisque CBO est actuellement désactivé par défaut, l'accent sera mis sur la recherche de l'optimisation et des nuances disponibles de RBO.


Types et choix de stratégie de jointure


Au stade de la formation du plan physique d'exécution de la demande, la stratégie de jointure est sélectionnée. Les options suivantes sont actuellement disponibles dans Spark (vous pouvez commencer à apprendre le code à partir du code dans SparkStrategies.scala).


Rejoindre le hachage de diffusion


La meilleure option est si l'une des parties de jointure est suffisamment petite (le critère de suffisance est défini par le paramètre spark.sql.autoBroadcastJoinThreshold dans SQLConf). Dans ce cas, ce côté est entièrement copié vers tous les exécuteurs, où il existe une jointure de hachage avec la table principale. En plus de la taille, il convient de noter que dans le cas d'une jointure externe, seul le côté externe peut être copié.Par conséquent, si possible, en tant que table principale dans le cas d'une jointure externe, vous devez utiliser la table avec la plus grande quantité de données.


   ,    ,     SQL      Oracle,   /*+ broadcast(t1, t2) */ 

Trier la jointure de fusion


Avec spark.sql.join.preferSortMergeJoin activé par défaut, cette méthode est appliquée par défaut si les clés de jointure peuvent être triées.
Parmi les fonctionnalités, on peut noter que, contrairement à la méthode précédente, l'optimisation de la génération de code pour effectuer l'opération n'est disponible que pour la jointure interne.


Shuffle hash join


Si les clés ne peuvent pas être triées ou si l'option de sélection de jointure de fusion par défaut est désactivée, Catalyst essaie d'appliquer une jointure de hachage aléatoire. Outre la vérification des paramètres, il est également vérifié que Spark a suffisamment de mémoire pour créer une carte de hachage locale pour une partition (le nombre total de partitions est défini en définissant spark.sql.shuffle.partitions )


BroadcastNestedLoopJoin et CartésienProduit


Dans le cas où il n'y a pas de possibilité de comparaison directe par clé (par exemple, une condition comme) ou il n'y a pas de clés pour joindre des tables, selon la taille des tables, ce type ou CartesianProduct est sélectionné.


L'ordre de spécification des tables dans join'ah


Dans tous les cas, la jointure nécessite de mélanger les tables par clé. Par conséquent, pour le moment, l'ordre de spécification des tables, en particulier dans le cas de l'exécution de plusieurs jointures consécutives, est important (si vous êtes un alésage, alors si CBO n'est pas activé et le paramètre JOIN_REORDER_ENABLED n'est pas activé).


Si possible, l'ordre de jointure des tables doit minimiser le nombre d'opérations de mélange pour les grandes tables, pour lesquelles les jointures sur la même clé doivent être séquentielles. N'oubliez pas non plus de minimiser les données pour la jointure, pour activer Broadcast Hash Join.


Application transitive des conditions de filtrage


Considérez la requête suivante:


 select bal.account_rk, cust.full_name from balance bal join customer cust on bal.party_rk = cust.party_rk and bal.actual_date = cust.actual_date where bal.actual_date = cast('2017-12-31' as date) 

Ici, nous connectons deux tables qui sont partitionnées de la même manière, selon le champ actual_date et appliquons un filtre explicite uniquement à la partition en fonction de la table de solde.


Comme le montre le plan de requête optimisé, le filtre par date est également appliqué au client, et au moment de la lecture des données du disque, il est déterminé qu’une seule partition est nécessaire.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57)) +- Relation[,... 9 more fields] orc 

Mais il vous suffit de remplacer la jointure interne par la gauche externe dans la requête, car le prédicat push pour la table client tombe immédiatement et une analyse complète se produit, ce qui est un effet indésirable.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Relation[,... 9 more fields] orc 

Conversion de type


Prenons un exemple simple de sélection à partir d'une table avec filtrage par type de client, dans le schéma le type du champ party_type est string.


 select party_rk, full_name from cust where actual_date = cast('2017-12-31' as date) and party_type = 101 --   -- and party_type = '101' --     

Et comparez les deux plans résultants, le premier - lorsque nous nous référons au type incorrect (il y aura une conversion implicite en int), le second - lorsque le type correspond au schéma.


 PushedFilters: [IsNotNull(PARTY_TYPE)] //            . PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] //             . 

Un problème similaire est observé dans le cas de la comparaison de dates avec une chaîne, il y aura un filtre pour comparer les chaînes. Un exemple:


 where OPER_DATE = '2017-12-31' Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31) PushedFilters: [IsNotNull(OPER_DATE)] where OPER_DATE = cast('2017-12-31' as date) PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)] 

Dans le cas où une conversion de type implicite est possible, par exemple, int -> décimal, l'optimiseur le fait de lui-même.


Recherche complémentaire


De nombreuses informations intéressantes sur les «boutons» pouvant être utilisés pour affiner Catalyst, ainsi que sur les possibilités (présentes et futures) de l'optimiseur, peuvent être obtenues à partir de SQLConf.scala.


En particulier, comme vous pouvez le voir par défaut, l'optimiseur de coût est toujours désactivé pour le moment.


 val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false) 

Ainsi que ses optimisations dépendantes associées à la réorganisation de join'ov.


 val JOIN_REORDER_ENABLED = buildConf("spark.sql.cbo.joinReorder.enabled") .doc("Enables join reorder in CBO.") .booleanConf .createWithDefault(false) 

ou


 val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf .createWithDefault(false) 

Bref résumé


Seule une petite partie des optimisations existantes a été touchée, des expériences d'optimisation des coûts, qui peuvent donner beaucoup plus de place à la conversion des requêtes, sont à venir. De plus, une autre question intéressante est la comparaison d'un ensemble d'optimisations lors de la lecture de fichiers de Parquet et Orc, à en juger par la jira du projet, il s'agit de parité, mais est-ce vraiment le cas?


De plus:


  • L'analyse et l'optimisation des demandes sont intéressantes et passionnantes, surtout compte tenu de la disponibilité des codes sources.
  • L'inclusion de CBO offrira des possibilités d'optimisation et de recherche supplémentaires.
  • Il est nécessaire de contrôler l'applicabilité des règles de base qui vous permettent de filtrer autant de données «supplémentaires» que possible, le plus tôt possible.
  • Rejoindre est un mal nécessaire, mais si possible, il vaut la peine de les minimiser et de garder une trace de l'implémentation utilisée sous le capot.

Source: https://habr.com/ru/post/fr417103/


All Articles