Spark SQL. Um pouco sobre o otimizador de consultas

Olá pessoal. Como introdução, quero lhe contar como cheguei a essa vida.


Antes de conhecer o Big Data e o Spark, em particular, eu tinha muitas e muitas vezes otimização de consultas SQL, primeiro para MSSQL, depois para Oracle, e agora me deparei com o SparkSQL.


E se já existem muitos bons livros para o DBMS que descrevem a metodologia e as "canetas" que você pode girar para obter o plano de consulta ideal, então eu não vi esses livros para o Spark. Me deparei com mais artigos e conjuntos de práticas, mais relacionados ao trabalho através da API RDD / Dataset, em vez de SQL puro. Para mim, um dos livros de referência sobre otimização de SQL é o livro de J. Lewis, Oracle. Noções básicas de otimização de custos ". Eu procurei algo semelhante em profundidade de estudo. Por que o assunto da pesquisa foi especificamente o SparkSQL, e não a API subjacente? Então o interesse foi causado pelos recursos do projeto em que estou trabalhando.




Para um de nossos clientes, nossa empresa está desenvolvendo um data warehouse, cuja camada detalhada e parte dos casos de exibição estão no cluster Hadoop, e os casos de exibição finais estão no Oracle. Este projeto envolve uma extensa camada de conversão de dados, que é implementada no Spark. Para acelerar o desenvolvimento e a conectividade dos desenvolvedores de ETL que não estão familiarizados com as complexidades das tecnologias de Big Data, mas estão familiarizados com as ferramentas SQL e ETL, foi desenvolvida uma ferramenta que lembra ideologicamente outras ferramentas de ETL, por exemplo, a Informatica, e permite projetar visualmente os processos de ETL com a geração subsequente código para Spark. Devido à complexidade dos algoritmos e ao grande número de transformações, os desenvolvedores usam principalmente consultas SparkSQL.


E é aí que a história começa, pois tive que responder a um grande número de perguntas no formulário “Por que a solicitação não funciona / funciona lentamente / funciona de forma diferente da Oracle?”. Essa foi a parte mais interessante para mim: “Por que funciona devagar?”. Além disso, ao contrário do DBMS com o qual trabalhei antes, você pode entrar no código-fonte e obter a resposta para suas perguntas.


Limitações e premissas


O Spark 2.3.0 é usado para executar exemplos e analisar o código-fonte.
Supõe-se que o leitor esteja familiarizado com a arquitetura Spark e os princípios gerais do otimizador de consulta para um dos DBMSs. No mínimo, a frase "plano de consulta" certamente não deve ser surpreendente.


Além disso, este artigo tenta não se tornar uma tradução do código do otimizador Spark para o russo; portanto, para coisas que são muito interessantes do ponto de vista do otimizador, mas que podem ser lidas no código-fonte, elas serão simplesmente mencionadas aqui brevemente com links para as classes correspondentes.


Vá para o estudo


Vamos começar com uma pequena consulta para explorar os estágios básicos pelos quais ela passa da análise para a execução.


scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal") scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust") scala> val df = spark.sql(""" | select bal.account_rk, cust.full_name | from bal | join cust | on bal.party_rk = cust.party_rk | and bal.actual_date = cust.actual_date | where bal.actual_date = cast('2017-12-31' as date) | """) df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string] scala> df.explain(true) 

O principal módulo responsável por analisar o SQL e otimizar o plano de execução da consulta é o Spark Catalyst.


A saída expandida na descrição do plano de solicitação (df.explain (true)) permite rastrear todos os estágios pelos quais a solicitação passa:


  • Plano lógico analisado - obtenha após analisar o SQL. Nesse estágio, apenas a correção sintática da solicitação é verificada.

 == Parsed Logical Plan == 'Project ['bal.account_rk, 'cust.full_name] +- 'Filter ('bal.actual_date = cast(2017-12-31 as date)) +- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date)) :- 'UnresolvedRelation `bal` +- 'UnresolvedRelation `cust` 

  • Plano Lógico Analisado - nesta etapa, são adicionadas informações sobre a estrutura das entidades utilizadas, a correspondência da estrutura e os atributos solicitados.

 == Analyzed Logical Plan == account_rk: decimal(38,18), full_name: string Project [account_rk#1, full_name#59] +- Filter (actual_date#27 = cast(2017-12-31 as date)) +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- SubqueryAlias bal : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- SubqueryAlias cust +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc 

  • O plano lógico otimizado é o mais interessante para nós. Nesse estágio, a árvore de consulta resultante é convertida com base nas regras de otimização disponíveis.

 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531)) +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc 

  • Plano físico - os recursos de acesso aos dados de origem estão começando a ser levados em consideração, incluindo otimizações para filtrar partições e dados para minimizar o conjunto de dados resultante. A estratégia de execução da junção está selecionada (para obter mais detalhes sobre as opções disponíveis, veja abaixo).

 == Physical Plan == *(2) Project [account_rk#1, full_name#59] +- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight :- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- *(2) Filter isnotnull(party_rk#18) : +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true])) +- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- *(1) Filter isnotnull(party_rk#57) +- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string> 

Os seguintes estágios de otimização e execução (por exemplo, WholeStageCodegen) estão além do escopo deste artigo, mas são descritos em grande detalhe (assim como os estágios descritos acima) no Mastering Spark Sql .


A leitura do plano de execução da consulta geralmente acontece “de dentro” e “de baixo para cima”, ou seja, as partes mais aninhadas são executadas primeiro e avançam gradualmente para a projeção final localizada na parte superior.


Tipos de otimizadores de consulta


Dois tipos de otimizadores de consulta podem ser distinguidos:


  • Otimizadores baseados em regras (RBOs).
  • Otimizadores com base em uma estimativa do custo de execução da consulta (otimizador baseado em custo, CBO).

Os primeiros são focados no uso de um conjunto de regras fixas, por exemplo, na aplicação de condições de filtragem a partir de onde, em estágios anteriores, se possível, no cálculo de constantes, etc.


Para avaliar a qualidade do plano resultante, o otimizador CBO usa uma função de custo, que geralmente depende da quantidade de dados processados, do número de linhas que se enquadram nos filtros e do custo de executar determinadas operações.


Para saber mais sobre a especificação de design do CBO para o Apache Spark, siga os links: a especificação e a principal tarefa do JIRA para implementação .


O ponto de partida para explorar toda a gama de otimizações existentes é o código Optimizer.scala.


Aqui está um pequeno trecho de uma longa lista de otimizações disponíveis:


 def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, ........ 

Deve-se observar que a lista dessas otimizações inclui otimizações baseadas em regras e otimizações baseadas em estimativas de custo de consulta, que serão discutidas abaixo.


Um recurso do CBO é que, para a operação correta, ele precisa conhecer e armazenar informações sobre as estatísticas dos dados usados ​​na consulta - o número de registros, tamanho do registro, histogramas de distribuição de dados nas colunas da tabela.


Para coletar estatísticas, um conjunto de comandos SQL ANALYZE TABLE ... COMPUTE STATISTICS é usado, além disso, é necessário um conjunto de tabelas para armazenar informações, a API é fornecida por ExternalCatalog, mais precisamente por HiveExternalCatalog.


Como o CBO está atualmente desativado por padrão, a ênfase principal será colocada na pesquisa da otimização e nuances disponíveis do RBO.


Tipos e escolha da estratégia de junção


No estágio de formação do plano físico para executar a solicitação, a estratégia de junção é selecionada. As seguintes opções estão atualmente disponíveis no Spark (você pode começar a aprender código a partir do código no SparkStrategies.scala).


Associação de hash de transmissão


A melhor opção é se uma das partes de junção for pequena o suficiente (o critério de suficiência é definido pelo parâmetro spark.sql.autoBroadcastJoinThreshold no SQLConf). Nesse caso, esse lado é completamente copiado para todos os executores, onde há uma junção de hash com a tabela principal. Além do tamanho, deve-se observar que, no caso de junção externa, apenas o lado externo pode ser copiado; portanto, se possível, como a tabela principal no caso de junção externa, você deve usar a tabela com a maior quantidade de dados.


   ,    ,     SQL      Oracle,   /*+ broadcast(t1, t2) */ 

Classificar junção de mesclagem


Com spark.sql.join.preferSortMergeJoin ativado por padrão, esse método é aplicado por padrão se as chaves para junção puderem ser classificadas.
Dos recursos, pode-se notar que, diferentemente do método anterior, a otimização da geração de código para executar a operação está disponível apenas para junção interna.


Misturar aleatoriamente o hash


Se as chaves não puderem ser classificadas ou a opção de seleção da junção de mesclagem de classificação padrão estiver desativada, o Catalyst tentará aplicar uma junção de hash aleatória. Além de verificar as configurações, também é verificado se o Spark possui memória suficiente para criar um mapa de hash local para uma partição (o número total de partições é definido pela configuração de spark.sql.shuffle.partitions )


BroadcastNestedLoopJoin e CartesianProduct


No caso em que não há possibilidade de comparação direta por chave (por exemplo, uma condição como) ou não há chaves para unir tabelas, dependendo do tamanho das tabelas, esse tipo ou Produto Cartesiano está selecionado.


A ordem de especificar tabelas em join'ah


Em qualquer caso, a junção requer tabelas aleatórias por chave. Portanto, no momento, a ordem de especificação das tabelas, especialmente no caso de várias junções seguidas, é importante (se você é um orifício, se o CBO não estiver ativado e a configuração JOIN_REORDER_ENABLED não estiver ativada).


Se possível, a ordem das tabelas de junção deve minimizar o número de operações aleatórias para tabelas grandes, para as quais as junções na mesma chave devem ser seqüenciais. Além disso, não se esqueça de minimizar os dados para ingresso, para ativar o ingresso de transmissão de hash.


Aplicação transitiva das condições do filtro


Considere a seguinte consulta:


 select bal.account_rk, cust.full_name from balance bal join customer cust on bal.party_rk = cust.party_rk and bal.actual_date = cust.actual_date where bal.actual_date = cast('2017-12-31' as date) 

Aqui, conectamos duas tabelas particionadas da mesma maneira, de acordo com o campo actual_date e aplicamos um filtro explícito apenas à partição de acordo com a tabela de saldo.


Como pode ser visto no plano de consulta otimizado, o filtro por data também é aplicado ao cliente e, no momento da leitura dos dados do disco, é determinado que exatamente uma partição é necessária.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57)) +- Relation[,... 9 more fields] orc 

Mas você só precisa substituir a junção interna pela externa esquerda na consulta, pois o predicado push da tabela do cliente cai imediatamente e ocorre uma varredura completa, o que é um efeito indesejável.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Relation[,... 9 more fields] orc 

Conversão de tipo


Considere um exemplo simples de seleção de uma tabela com filtragem por tipo de cliente. No esquema, o tipo do campo party_type é string.


 select party_rk, full_name from cust where actual_date = cast('2017-12-31' as date) and party_type = 101 --   -- and party_type = '101' --     

E compare os dois planos resultantes, o primeiro - quando nos referimos ao tipo incorreto (haverá uma conversão implícita para int), o segundo - quando o tipo corresponde ao esquema.


 PushedFilters: [IsNotNull(PARTY_TYPE)] //            . PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] //             . 

Um problema semelhante é observado no caso de comparar datas com uma sequência, haverá um filtro para comparar as sequências. Um exemplo:


 where OPER_DATE = '2017-12-31' Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31) PushedFilters: [IsNotNull(OPER_DATE)] where OPER_DATE = cast('2017-12-31' as date) PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)] 

No caso em que uma conversão implícita de tipo é possível, por exemplo, int -> decimal, o otimizador faz isso por conta própria.


Pesquisa adicional


Muitas informações interessantes sobre os "botões" que podem ser usados ​​para ajustar o Catalyst, bem como sobre as possibilidades (presente e futura) do otimizador, podem ser obtidas no SQLConf.scala.


Em particular, como você pode ver por padrão, o otimizador de custos ainda está desativado no momento.


 val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false) 

Bem como suas otimizações dependentes associadas à reordenação de join'ov.


 val JOIN_REORDER_ENABLED = buildConf("spark.sql.cbo.joinReorder.enabled") .doc("Enables join reorder in CBO.") .booleanConf .createWithDefault(false) 

ou


 val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf .createWithDefault(false) 

Breve resumo


Apenas uma pequena parte das otimizações existentes foi tocada; experimentos com otimização de custos, que podem dar muito mais espaço para a conversão de consultas, estão à frente. Além disso, uma questão interessante e separada é a comparação de um conjunto de otimizações ao ler arquivos do Parquet e Orc, a julgar pelo jira do projeto, trata-se de paridade, mas é mesmo?


Além disso:


  • A análise e otimização de solicitações é interessante e empolgante, principalmente considerando a disponibilidade de códigos-fonte.
  • A inclusão do CBO fornecerá espaço para novas otimizações e pesquisas.
  • É necessário monitorar a aplicabilidade das regras básicas que permitem filtrar o máximo de dados "extras" possível nos estágios mais iniciais possíveis.
  • A associação é um mal necessário, mas, se possível, vale a pena minimizá-los e acompanhar qual implementação é usada sob o capô.

Source: https://habr.com/ru/post/pt417103/


All Articles