Apache Spark, avaliação lenta e consultas SQL de várias páginas

O famoso: spark trabalha com quadros de dados, que são algoritmos de transformação. O algoritmo é lançado no último momento para "dar mais espaço" à otimização e devido à otimização para executá-lo da maneira mais eficiente possível.


De acordo com o corte, analisaremos como decompor uma consulta SQL de várias páginas em átomos (sem perda de eficiência) e como reduzir significativamente o tempo de execução do pipeline ETL devido a isso.


Avaliação preguiçosa


Um recurso funcional interessante do spark é a avaliação lenta: as transformações são executadas apenas quando as ações são concluídas. Como funciona (aproximadamente): os algoritmos para a construção dos quadros de dados que precedem a ação são "presos", o otimizador cria o algoritmo final mais eficiente do seu ponto de vista, que inicia e fornece o resultado (o que foi solicitado pela ação).


O que é interessante aqui no contexto de nossa apresentação: qualquer consulta complexa pode ser decomposta em "átomos" sem perda de eficiência. Vamos analisar um pouco mais.


SQL de várias páginas


Há muitas razões pelas quais escrevemos consultas SQL de "várias páginas", uma das principais, provavelmente, a relutância em criar objetos intermediários (relutância apoiada por requisitos de eficiência). A seguir, é apresentado um exemplo de uma consulta relativamente complexa (é claro, é até muito simples, mas, para fins de apresentação adicional, teremos o suficiente).


qSel = """ select con.contract_id as con_contract_id, con.begin_date as con_begin_date, con.product_id as con_product_id, cst.contract_status_type_id as cst_status_type_id, sbj.subject_id as sbj_subject_id, sbj.subject_name as sbj_subject_name, pp.birth_date as pp_birth_date from kasko.contract con join kasko.contract_status cst on cst.contract_status_id = con.contract_status_id join kasko.subject sbj on sbj.subject_id = con.owner_subject_id left join kasko.physical_person pp on pp.subject_id = con.owner_subject_id """ dfSel = sp.sql(qSel) 

O que vemos:


  • os dados são selecionados em várias tabelas
  • diferentes tipos de junção são usados
  • colunas selecionáveis ​​são distribuídas por parte selecionada, junção de parte (e onde parte, mas aqui não está aqui - eu a removi por simplicidade)

Essa consulta pode ser decomposta em simples (por exemplo, primeiro combine as tabelas contract e contract_status, salve o resultado em uma tabela temporária, depois combine-a com o assunto, salve também o resultado em uma tabela temporária etc.). Certamente, quando criamos consultas realmente complexas, fazemos isso, só então - após a depuração - coletamos tudo isso em um bloco de várias páginas.


O que há de ruim aqui? Nada, de fato, todo mundo trabalha assim e está acostumado a isso.


Mas há desvantagens - ou melhor, o que melhorar - continue lendo.


A mesma consulta no spark


Ao usar o spark para transformação, é claro, você pode simplesmente aceitar e executar essa solicitação (e será bom, de fato, nós também a executaremos), mas você pode seguir o outro caminho, vamos tentar.


Vamos decompor essa consulta "complexa" em "átomos" - quadros de dados elementares. Obteremos o número de tabelas envolvidas na consulta (neste caso, 4).


Aqui estão eles - "átomos":


 dfCon = sp.sql("""select contract_id as con_contract_id, begin_date as con_begin_date, product_id as con_product_id, owner_subject_id as con_owner_subject_id, contract_status_id as con_contract_status_id from kasko.contract""") dfCStat = sp.sql("""select contract_status_id as cst_status_id, contract_status_type_id as cst_status_type_id from kasko.contract_status""") dfSubj = sp.sql("""select subject_id as sbj_subject_id, subject_type_id as sbj_subject_type_id, subject_name as sbj_subject_name from kasko.subject""") dfPPers = sp.sql("""select subject_id as pp_subject_id, birth_date as pp_birth_date from kasko.physical_person""") 

O Spark permite que você se junte a eles usando expressões separadas dos "átomos" reais; vamos fazer o seguinte:


 con_stat = f.col("cst_status_id")==f.col("con_contract_status_id") con_subj_own = f.col("con_owner_subject_id")==f.col("sbj_subject_id") con_ppers_own = f.col("con_owner_subject_id")==f.col("pp_subject_id") 

Em seguida, nossa "consulta complexa" ficará assim:


 dfAtom = dfCon.join(dfCStat,con_stat, "inner")\ .join(dfSubj,con_subj_own,"inner") \ .join(dfPPers,con_ppers_own, "left") \ .drop("con_contract_status_id","sbj_subject_type_id", "pp_subject_id","con_owner_subject_id","cst_status_id") 

O que é bom aqui? À primeira vista, não é nada, muito pelo contrário: usando SQL "complexo", você pode entender o que está acontecendo; por nossa consulta "atômica", é mais difícil de entender, é preciso olhar para "átomos" e expressões.


Primeiro, certifique-se de que essas consultas sejam equivalentes - no livro jupiter, por referência , dei planos para atender as duas consultas (os curiosos podem encontrar 10 diferenças, mas a essência - equivalência - é óbvia). Isso, é claro, não é um milagre, deve ser assim (veja acima para avaliação e otimização preguiçosas).


O que temos no final - a solicitação de "várias páginas" e a solicitação "atômica" funcionam com a mesma eficiência (isso é importante, sem que essas considerações adicionais percam parcialmente seu significado).


Bem, agora vamos encontrar o lado bom da maneira "atômica" de criar consultas.


O que é um "átomo" (quadro de dados elementar) é o nosso conhecimento de um subconjunto da área de assunto (parte da tabela relacional). Ao isolar esses "átomos", automaticamente (e, o que é mais importante, algorítmica e reprodutível), selecionamos uma parte significativa da coisa sem limites para nós chamada "modelo de dados físicos".


Qual é a expressão que usamos quando ingressamos? Isso também é conhecimento sobre a área de assunto - é assim que (como indicado na expressão) as entidades da área de assunto (tabelas no banco de dados) estão interconectadas.


Repito - isso é importante - esse "conhecimento" (átomos e expressões) é materializado no código executável (não no diagrama ou na descrição verbal), este é o código que é executado toda vez que o pipeline ETL é executado (o exemplo, a propósito, é tirado da vida real).


O código executável - como sabemos pelo codificador limpo - é um dos dois artefatos objetivamente existentes que afirmam ser o "título" da documentação. Ou seja, o uso de "átomos" nos permite dar um passo adiante em um processo tão importante como a documentação de dados.


O que mais pode ser encontrado em "atomicidade"?


Otimização do transportador


Na vida real, um engenheiro de dados - a propósito, eu não me apresentei - um pipeline de ETL consiste em dezenas de transformações semelhantes às anteriores. As tabelas são muitas vezes repetidas nelas (calculei de alguma forma no Excel - algumas tabelas são usadas em 40% das consultas).


O que acontece em termos de eficiência? Confusão - a mesma tabela é lida várias vezes a partir da fonte ...


Como melhorá-lo? O Spark possui um mecanismo para armazenar em cache os quadros de dados - podemos especificar explicitamente quais quadros de dados e quanto queremos manter no cache.


O que temos que fazer para isso é selecionar tabelas duplicadas e criar consultas de forma a minimizar o tamanho total do cache (porque todas as tabelas, por definição, não se encaixam nela, então há big data).


Isso pode ser feito usando consultas SSQ de várias páginas? Sim, mas ... um pouco complicado (não temos realmente quadros de dados lá, apenas tabelas, eles também podem ser armazenados em cache - a comunidade spark está trabalhando nisso).


Isso pode ser feito usando consultas atômicas? Sim E não é difícil, precisamos apenas generalizar os "átomos" - adicione as colunas usadas em todas as consultas do nosso pipeline a eles. Se você pensar bem, isso é "correto" do ponto de vista da documentação: se uma coluna é usada em alguma consulta (mesmo na parte where), ela faz parte dos dados da área de interesse que são interessantes para nós.


E então tudo é simples - armazenamos em cache átomos repetidos (quadros de dados), construímos uma cadeia de transformações para que a interseção de quadros de dados em cache seja mínima (a propósito, isso não é trivial, mas sim programável).


E obtemos o transportador mais eficiente completamente "gratuito". Além disso, um artefato útil e importante é a "preparação" para documentação de dados na área de assunto.


Robótica e Automação


Os átomos são mais suscetíveis ao processamento automático do que o “grande e poderoso SQL” - sua estrutura é simples e clara, o spark faz uma análise para nós (pelos quais agradecemos especialmente), ele também cria planos de consulta, analisando o que você pode reordenar automaticamente a sequência do processamento da consulta.


Então aqui você pode tocar alguma coisa.


Em conclusão


Talvez eu esteja otimista demais - parece-me que esse caminho (atomização de consulta) está mais funcionando do que tentar descrever uma fonte de dados após o fato. Além disso - a propósito, qual é o uso de "aditivos" - obtemos um aumento de eficiência. Por que considero a abordagem atômica "funcional"? Faz parte do processo regular, o que significa que os artefatos descritos têm uma chance real de serem relevantes a longo prazo.


Provavelmente perdi alguma coisa - ajuda a encontrar (nos comentários)?

Source: https://habr.com/ru/post/pt481924/


All Articles