Geração dinâmica de DAG no fluxo de ar

Olá pessoal! Meu nome é Anton, na Rostelecom estou desenvolvendo um data warehouse central. Nosso armazenamento consiste em módulos, cujo orquestrador usa várias instâncias da Informatica, algumas das quais queremos transferir para o Airflow como parte da transição para soluções de código aberto. Como a Informatica e o Airflow são ferramentas fundamentalmente diferentes, pegar e repetir uma implementação existente não é tão simples. Queríamos obter um fluxo de trabalho, por um lado, o mais próximo possível da implementação atual e, por outro lado, usar o primeiro princípio mais interessante do fluxo de ar - o dinamismo, que oferece flexibilidade.


Neste breve artigo, quero falar sobre a geração verdadeiramente dinâmica de DAGs no Airflow. Neste tópico, a Internet contém principalmente muitos artigos de desenvolvedores da Índia, que são materiais da forma "você pode gerar dinamicamente dags no Airflow, aqui está um exemplo: <exemplo para gerar 10 tarefas / dags do HelloWorld>" . Mas estávamos interessados ​​na geração de dags, que mudarão com o tempo com um número variável e nomes de tarefas.



Atualmente, o Airflow é implementado para iniciar um módulo que gera pacotes de dados em servidores de origem remota para posterior upload no repositório. Ele é executado de acordo com um cronograma simples; não é muito interessante examiná-lo em detalhes. Além disso, em breve será introduzida uma orquestração através do módulo Airflow, que fornece pacotes de dados para carregamento adicional camada por camada no preparo intermediário. Aqui estamos aguardando uma série de ancinhos, descrições que não encontrei em lugar nenhum e quero compartilhar minha experiência.


No Airflow on Habré, existem alguns artigos de desenvolvedores do Mail.ru nos quais coisas básicas são bem descritas:


Descrição geral do Airflow
Ramificação, parametrização via jinja e comunicação dentro do DAG através do Xcom


Pequeno glossário:


DAG / DAG é um gráfico acíclico direcionado. Nesse caso, queremos dizer uma sequência de ações que dependem uma da outra e não formam ciclos.
SubDAG / Sabdag - o mesmo que o DAG, mas localizado dentro de outro DAG, iniciado como parte do DAG pai (ou seja, sendo tarefa) e sem um cronograma separado.
Operador / Operador - uma etapa específica do dag, executando uma ação específica. Por exemplo, PythonOperator.
Tarefa / Tarefa - uma instância específica do operador ao iniciar o DAG, é visualizada como um pequeno quadrado na interface da web. Por exemplo, PythonOperator, chamado run_task e executado no DAG check_dag .


A idéia de geração dinâmica de tarefas em dag, problemas e desvantagens


Dados de entrada:


Há uma tabela no repositório da orquestra, vamos chamá-la de PKG_TABLE.
Existe um mecanismo que adiciona entradas à tabela PKG_TABLE que o pacote de dados está pronto para download.


O que queríamos:


DAG, que será gerado para pacotes prontos para download e começará a baixá-los (spoiler: no final, tudo acabou).


Usando o código abaixo, geramos um dag que consiste na tarefa LatestOnlyOperator e sua tarefa dependente, criada quando a função pkg_subdag_factory é executada, que recebe uma lista de pacotes da tabela PKG_TABLE e gera vários PythonOperators. Se não houver pacotes para baixar, um DummyOperator será gerado.


Eles decidiram criar a primeira versão com um PythonOperator, redistribuindo-a em um fluxo de trabalho detalhado usando o Airflow.


# -*- coding: utf-8 -*- """  DAG    """ from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.hooks.oracle_hook import OracleHook from datetime import datetime, timedelta import logging from scripts.lib import run_load, select_pkg_data def pkg_subdag_factory( oracle_hook, parent_dag_name, child_dag_name, start_date, schedule_interval, param_dict): """ ,  DAG    PythonOperator\` (1  - 1 PythonOperator)  : oracle_hook - airflow.hooks.oracle_hook.OracleHook parent_dag_name -  ""  child_dag_name -    start_date -       schedule_interval -      param_dict -     """ dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=schedule_interval, start_date=start_date, catchup=False ) logging.info('selecting pkg data...') pkg_set = select_pkg_data(oracle_hook) if len(pkg_set): logging.info('pkg_set:') logging.info(pkg_set) for pkg in pkg_set: pkg_id = pkg[1] pkg_dict = {'pkg_data_' + str(pkg_id): pkg} param_dict.update(pkg_dict) task_name = 'pkg_' + str(pkg_id) PythonOperator( task_id=task_name, python_callable=run_load, op_kwargs={ 'oracle_hook': oracle_hook, 'param_dict': param_dict, 'pkg_id': pkg_id }, retries=0, dag=dag ) else: logging.info('Undelivered packages not found') DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag) return dag interval = '*/10 * * * *' args = { 'owner': 'airflow', 'start_date': datetime(2018, 11, 12) } oracle_hook = OracleHook('ora_meta') main_dag_name = 'load' load_dag_name = 'load_packages' param_dict = { #       } main_dag = DAG( dag_id=main_dag_name, default_args=args, schedule_interval=interval, catchup=False ) subdag = SubDagOperator( subdag=pkg_subdag_factory( oracle_hook, main_dag_name, load_dag_name, args['start_date'], interval, param_dict ), task_id=load_dag_name, dag=main_dag ) #  ,       latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag) subdag.set_upstream(latest_only) 

As seguintes capturas de tela mostram como isso fica como resultado.
Aparência do DAG:



Aparência do subdag na ausência de pacotes para entrega:



Aparência do subdag na presença de pacotes para entrega:



Problemas e nuances


  • O catchup não funcionou como esperávamos: depois de ligar o dag desligado, ocorreram vários lançamentos (não durante todo o período do cronograma, mas 2-3 ao mesmo tempo). Por isso, tive que adicionar o LatestOnlyOperator, para que todos os lançamentos, exceto o último, fiquem ociosos.
  • Se você criar um subdag, precisará habilitá-lo explicitamente pela linha de comando com o comando "airflow unpause <subdag_name>"; caso contrário, ele não será iniciado e você deverá fazer isso ao criar cada novo subdag (um subdag com um novo nome), o que tornará muito inconveniente gerar dinamicamente . Se você definir o parâmetro "dags_are_paused_at_creation" = false na configuração do fluxo de ar ($ airflow_home / airflow.cfg), não será necessário, mas poderá levar a conseqüências desagradáveis ​​com o lançamento automático automático de um novo dag - parece-me que você precisa iniciar novos dagas manualmente.

Como a documentação diz, "Um recurso importante do Airflow é que essas execuções do DAG são itens atômicos e idempotentes, <...>", o que significa: "Entende-se que o dag é gerado inalterado". Devido ao fato de termos violado esse "recurso essencial", aprendemos algumas coisas:


  • Um dag vazio (sem tarefas) inicia e não pode terminar, obstruindo todos os paralelos possíveis. Isso aconteceu se não houvesse pacotes de download no momento em que o dag foi lançado. Para contornar isso, um DummyOperator é criado.
  • Se durante o trabalho, o dag da tarefa for regenerado e não houver mais essa tarefa no dag atualizado - ele será interrompido com a interrupção do processo em execução. E isso acontece com todas as etapas do sheduler, mas não mais frequentemente do que o indicado no parâmetro min_file_process_interval na configuração do fluxo de ar ($ airflow_home / airflow.cfg). Para contornar isso, geramos pacotes de tarefas não apenas pelo status de "pronto para download", mas também pelo status de "carregamento em processo", para que continue a ser gerado enquanto o download estiver em andamento.
  • Se a versão atual do dag não tiver nenhuma tarefa anterior - por exemplo, houve uma tarefa com o nome "pkg_123" que foi carregada anteriormente e não é criada na versão atual do dag, você não pode ver estatísticas sobre essa tarefa na interface da web. Embora todas as informações sejam armazenadas no banco de dados de fluxo de ar e, com base nisso, é possível criar um belo painel para lançamentos antigos por meios externos. Quando surgir a pergunta sobre a frequência da atualização de DAGs e a capacidade de desativá-lo, você pode ler sobre isso aqui .
  • Devido à geração dinâmica de task_id, é necessário lançar um dicionário com dados para todos os pacotes atuais em cada uma dessas tarefas, bem como o ID do pacote atual, para que, quando a função em si funcione, selecione os dados necessários do mesmo dicionário por ID do pacote. Caso contrário, todas as tarefas foram iniciadas para o mesmo pacote.

Execution_date nos logs e hora de início real


Termino com outra nuance do Airflow, que a princípio confunde e não é descrita em palavras simples em outros artigos - signature_date (que é exibida em todos os logs, na interface etc.) e na hora de início real. Em princípio, a descrição está na documentação do fluxo de ar e no FAQ , mas o resultado não é óbvio, portanto parece-me que é necessário esclarecer.


Documentação : "O Agendador inicia seu trabalho no final do período"
Resultado : se você criar um dag com um agendamento, por exemplo, @daily, uma execução com a data de execução "2018-01-01 00:00:00" realmente executará "2018-02-01 00:00:00".


Links úteis:


Documentação de recuperação
Documentação LatestOnlyOperator
Mais documentação em LatestOnlyOperator
Exemplo de uso de LatestOnlyOperator
Algumas nuances
Pergunta sobre dependências no lançamento anterior
Um pequeno exemplo sobre geração dinâmica
Uma pergunta sobre geração dinâmica com uma pequena descrição

Source: https://habr.com/ru/post/pt435746/


All Articles