Spark SQL。 关于查询优化器的一些知识

大家好 作为介绍,我想告诉你我是如何生活的。


特别是在与大数据和Spark会面之前,我经常要优化SQL查询,首先是针对MSSQL,然后针对Oracle,然后遇到了SparkSQL。


而且,如果已经有很多关于DBMS的好书来描述方法论和“笔”,您可以将它们弄弯以获得最佳的查询计划,那么我还没有看到关于Spark的这些书。 我遇到了更多的文章和实践集,它们与通过RDD / Dataset API而非纯SQL进行工作有关。 对我而言,关于SQL优化的参考书之一是J. Lewis的书Oracle。 成本优化的基础。” 我在寻找深度相似的东西。 为什么研究主题专门是SparkSQL,而不是基础API? 然后,兴趣是由我正在从事的项目的功能引起的。




对于我们的一位客户,我们公司正在开发一个数据仓库,该仓库的详细层和部分展示用例在Hadoop集群中,而最终展示用例在Oracle中。 该项目涉及广泛的数据转换层,该层在Spark上实现。 为了加快不熟悉复杂的大数据技术但熟悉SQL和ETL工具的ETL开发人员的开发和连接速度,开发了一种工具,该工具在意识形态上提醒其他ETL工具,例如Informatica,并允许您在可视化的情况下设计下一代ETL流程。 Spark的代码。 由于算法的复杂性和大量的转换,开发人员主要使用SparkSQL查询。


这就是故事的开始,因为我不得不回答许多形式的问题:“为什么查询不能像在Oracle中那样工作/工作缓慢/不能像在Oracle中那样工作?”。 这对我来说是最有趣的部分:“为什么它工作缓慢?”。 此外,与我之前使用过的DBMS不同,您可以进入源代码并获得问题的答案。


局限性和假设


Spark 2.3.0用于运行示例和分析源代码。
假定读者熟悉Spark架构以及其中一种DBMS的查询优化器的一般原理。 至少,“查询计划”这一短语当然应该不足为奇。


此外,本文也尝试不将Spark优化器代码翻译成俄文,因此对于从优化器的角度来看非常有趣但可以在源代码中读取的内容,此处将简单地通过引用相应类的链接进行简要介绍。


继续学习


让我们从一个小的查询开始,探索它从解析到执行的基本阶段。


scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal") scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust") scala> val df = spark.sql(""" | select bal.account_rk, cust.full_name | from bal | join cust | on bal.party_rk = cust.party_rk | and bal.actual_date = cust.actual_date | where bal.actual_date = cast('2017-12-31' as date) | """) df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string] scala> df.explain(true) 

负责解析SQL和优化查询执行计划的主要模块是Spark Catalyst。


请求计划说明中的扩展输出(df.explain(true))使您可以跟踪请求经过的所有阶段:


  • 解析的逻辑计划-解析SQL后获取。 在此阶段,仅检查请求的语法正确性。

 == Parsed Logical Plan == 'Project ['bal.account_rk, 'cust.full_name] +- 'Filter ('bal.actual_date = cast(2017-12-31 as date)) +- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date)) :- 'UnresolvedRelation `bal` +- 'UnresolvedRelation `cust` 

  • 分析的逻辑计划-在此阶段,将添加有关所用实体的结构的信息,并检查结构与所请求属性的对应关系。

 == Analyzed Logical Plan == account_rk: decimal(38,18), full_name: string Project [account_rk#1, full_name#59] +- Filter (actual_date#27 = cast(2017-12-31 as date)) +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- SubqueryAlias bal : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- SubqueryAlias cust +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc 

  • 优化逻辑计划对我们来说是最有趣的。 在此阶段,将基于可用的优化规则来转换结果查询树。

 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531)) +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc 

  • 物理计划-开始考虑访问源数据的功能,包括优化过滤分区和数据以最小化结果数据集的功能。 选择了联接执行策略(有关可用选项的更多详细信息,请参见下文)。

 == Physical Plan == *(2) Project [account_rk#1, full_name#59] +- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight :- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- *(2) Filter isnotnull(party_rk#18) : +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true])) +- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- *(1) Filter isnotnull(party_rk#57) +- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string> 

优化和执行的以下阶段(例如,WholeStageCodegen)不在本文讨论范围之内,但在Mastering Spark Sql中进行了详细描述(以及上述阶段)。


读取查询执行计划通常是“从内部”和“从底部到顶部”进行的,也就是说,最嵌套的部分首先执行,然后逐渐前进到位于最顶部的最终投影。


查询优化器的类型


可以区分两种类型的查询优化器:


  • 基于规则的优化器(RBO)。
  • 优化程序基于对查询执行成本的估算(基于成本的优化器,CBO)。

第一个集中在一组固定规则的使用上,例如,在较早阶段从那里过滤条件的应用,如果可能的话,常数的计算等。


为了评估结果计划的质量,CBO优化器使用了成本函数,该函数通常取决于处理的数据量,过滤器下的行数以及执行某些操作的成本。


要了解有关Apache Spark的CBO设计规范的更多信息,请访问以下链接: 规范和实现的主要JIRA任务


探索全部现有优化的起点是Optimizer.scala代码。


以下是一长串可用优化的摘录:


 def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, ........ 

应当注意,这些优化的列表包括基于规则的优化和基于查询成本估算的优化,这将在下面进行讨论。


CBO的一个功能是,为了进行正确的操作,它需要知道并存储有关查询中使用的数据的统计信息-记录数,记录大小,表列中数据分布的直方图。


为了收集统计信息,使用了一组SQL命令ANALYZE TABLE ... COMPUTE STATISTICS,此外,还需要一组表来存储信息,该API通过ExternalCatalog(更确切地说是通过HiveExternalCatalog)提供。


由于默认情况下当前禁用CBO,因此主要重点将放在研究RBO的可用优化和细微差别上。


加盟策略的类型和选择


在形成用于执行请求的物理计划的阶段,选择加入策略。 目前,Spark中提供以下选项(您可以从SparkStrategies.scala中的代码开始学习代码)。


广播哈希联接


最好的选择是如果其中一个参与方足够小(充分性标准由SQLConf中的spark.sql.autoBroadcastJoinThreshold参数设置)。 在这种情况下,这一面将完全复制到所有执行程序,在该执行程序中主表具有哈希连接。 除了大小外,还应注意,在外部联接的情况下,只能复制外侧,因此,如果可能,作为外部联接的前导表,必须使用数据量最大的表。


   ,    ,     SQL      Oracle,   /*+ broadcast(t1, t2) */ 

排序合并联接


默认情况下启用spark.sql.join.preferSortMergeJoin时,如果可以对连接的键进行排序,则默认情况下将应用此方法。
在这些功能中,可以注意到,与先前的方法不同,用于执行该操作的代码生成优化仅可用于内部联接。


随机哈希联接


如果无法对键进行排序,或者禁用了默认的排序合并联接选择选项,则Catalyst会尝试应用洗牌哈希联接。 除了检查设置之外,还检查Spark是否有足够的内存来为一个分区构建本地哈希映射(分区总数通过设置spark.sql.shuffle.partitions来设置)


广播嵌套循环加入和笛卡尔积


如果不可能通过键直接进行比较(例如,类似条件),或者没有用于连接表的键,则取决于表的大小,请选择此类型或CartesianProduct。


join'ah中指定表的顺序


在任何情况下,联接都需要按键对表进行洗牌。 因此,此刻,指定表的顺序很重要,尤其是在连续执行多个联接的情况下(如果您很无聊,则如果未启用CBO并且未启用JOIN_REORDER_ENABLED设置)。


如果可能的话,连接表的顺序应最大程度地减少大型表的混洗操作次数,对于大型表,应按顺序执行同一键上的连接。 另外,不要忘记最小化要加入的数据,以启用广播哈希加入。


过滤条件的传递应用


考虑以下查询:


 select bal.account_rk, cust.full_name from balance bal join customer cust on bal.party_rk = cust.party_rk and bal.actual_date = cust.actual_date where bal.actual_date = cast('2017-12-31' as date) 

在这里,我们根据actual_date字段连接以相同方式分区的两个表,并根据余额表仅对分区应用显式过滤器。


从优化的查询计划可以看出,按日期过滤也适用于客户,并且在从磁盘读取数据时,确定恰好需要一个分区。


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57)) +- Relation[,... 9 more fields] orc 

但是,您只需要在查询中用左外部替换内部联接,因为customer表的推式谓词会立即消失,并且会进行全扫描,这是不希望的结果。


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Relation[,... 9 more fields] orc 

类型转换


考虑一个从表中进行选择的简单示例,该表按客户端类型进行过滤,在该方案中,party_type字段的类型为字符串。


 select party_rk, full_name from cust where actual_date = cast('2017-12-31' as date) and party_type = 101 --   -- and party_type = '101' --     

并比较两个结果计划,第一个-当我们引用不正确的类型(将隐式转换为int)时,第二个-当类型与方案相对应时。


 PushedFilters: [IsNotNull(PARTY_TYPE)] //            . PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] //             . 

对于将日期与字符串进行比较的情况,也观察到类似的问题,将有一个用于比较字符串的过滤器。 一个例子:


 where OPER_DATE = '2017-12-31' Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31) PushedFilters: [IsNotNull(OPER_DATE)] where OPER_DATE = cast('2017-12-31' as date) PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)] 

对于可能进行隐式类型转换的情况,例如int->十进制,优化器将自行进行转换。


进一步研究


可以从SQLConf.scala获得许多有关可用于微调Catalyst的“旋钮”的有趣信息,以及有关优化器的可能性(现在和将来)的信息。


特别是,如您所见,默认情况下,成本优化器目前仍处于关闭状态。


 val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false) 

以及与重新排序join'ov相关的依赖优化。


 val JOIN_REORDER_ENABLED = buildConf("spark.sql.cbo.joinReorder.enabled") .doc("Enables join reorder in CBO.") .booleanConf .createWithDefault(false) 


 val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf .createWithDefault(false) 

简要总结


仅触及了现有优化的一小部分,即将进行成本优化的实验,该优化可以为查询转换提供更多的空间。 另外,另一个有趣的问题是,从Parquet和Orc读取文件时,根据项目的jira判断,一组优化的比较与奇偶性有关,但是真的吗?


另外:


  • 请求的分析和优化是有趣而令人兴奋的,尤其是考虑到源代码的可用性。
  • 包含CBO将为进一步的优化和研究提供范围。
  • 有必要监视基本规则的适用性,这些基本规则允许您在最早的阶段尽可能多地过滤掉“额外”数据。
  • 加入是必然的恶作剧,但如果可能的话,将它们最小化并跟踪在幕后使用哪种实现是值得的。

Source: https://habr.com/ru/post/zh-CN417103/


All Articles