Generación DAG dinámica en flujo de aire

Hola a todos! Mi nombre es Anton, en Rostelecom estoy desarrollando un almacén de datos central. Nuestro almacenamiento consta de módulos, cuyo orquestador utiliza varias instancias de Informatica, algunas de las cuales queremos transferir a Airflow como parte de la transición a soluciones de código abierto. Dado que Informatica y Airflow son herramientas fundamentalmente diferentes, tomar y repetir una implementación existente no es tan simple. Queríamos obtener un flujo de trabajo, por un lado, lo más cercano posible a la implementación actual y, por otro lado, utilizando el primer principio de flujo de aire más interesante: el dinamismo, que brinda flexibilidad.


En este breve artículo, quiero hablar sobre la generación verdaderamente dinámica de DAG en Airflow. Sobre este tema, Internet contiene principalmente muchos artículos de desarrolladores de la India, que son materiales de la forma "puede generar dinámicamente dags en Airflow, aquí hay un ejemplo: <ejemplo para generar 10 tareas / dags HelloWorld>" . Pero estábamos interesados ​​en la generación de dags, que cambiarán con el tiempo con un número variable y nombres de tareas.



Actualmente, Airflow se implementa para lanzar un módulo que genera paquetes de datos en servidores de origen remoto para su posterior carga en el repositorio. Se ejecuta de acuerdo con un cronograma simple; no es muy interesante examinarlo en detalle. Además, pronto se introducirá una orquestación a través del módulo Airflow, que entrega paquetes de datos para la carga adicional capa por capa en etapas intermedias. Aquí estamos esperando una serie de rastrillos, descripciones de las cuales no he encontrado en ningún lado y quiero compartir mi experiencia.


En Airflow en Habré hay un par de artículos de desarrolladores de Mail.ru en los que las cosas básicas están bien descritas:


Descripción general del flujo de aire
Ramificación, parametrización a través de jinja y comunicación dentro del DAG a través de Xcom


Pequeño glosario:


DAG / DAG es un gráfico acíclico dirigido. En este caso, nos referimos a una secuencia de acciones que dependen unas de otras y no forman ciclos.
SubDAG / Sabdag : lo mismo que DAG, pero ubicado dentro de otro DAG, lanzado como parte del DAG principal (es decir, que es una tarea) y que no tiene un horario separado.
Operador / Operador : un paso específico en el dag, que realiza una acción específica. Por ejemplo, PythonOperator.
Tarea / Tarea : una instancia específica del operador al iniciar el DAG se visualiza como un pequeño cuadrado en la interfaz web. Por ejemplo, PythonOperator, que se llama run_task y se ejecuta en el DAG check_dag .


La idea de la generación dinámica de tareas en dag, problemas y desventajas.


Datos de entrada:


Hay una tabla en el repositorio de orquesta, llamémosla PKG_TABLE.
Hay un mecanismo que agrega entradas a la tabla PKG_TABLE de que el paquete de datos está listo para descargar.


Lo que queríamos:


DAG, que se generará para paquetes listos para descargar y comenzará a descargarlos (spoiler: al final, todo resultó).


Usando el siguiente código, generamos un dag que consta de la tarea LatestOnlyOperator y su tarea dependiente, que se crea cuando se ejecuta la función pkg_subdag_factory, que recibe una lista de paquetes de la tabla PKG_TABLE y genera varios PythonOperators. Si no hay paquetes para descargar, se genera un DummyOperator.


Decidieron hacer la primera versión con un PythonOperator, redistribuyéndolo en un flujo de trabajo detallado usando Airflow.


# -*- coding: utf-8 -*- """  DAG    """ from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.hooks.oracle_hook import OracleHook from datetime import datetime, timedelta import logging from scripts.lib import run_load, select_pkg_data def pkg_subdag_factory( oracle_hook, parent_dag_name, child_dag_name, start_date, schedule_interval, param_dict): """ ,  DAG    PythonOperator\` (1  - 1 PythonOperator)  : oracle_hook - airflow.hooks.oracle_hook.OracleHook parent_dag_name -  ""  child_dag_name -    start_date -       schedule_interval -      param_dict -     """ dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=schedule_interval, start_date=start_date, catchup=False ) logging.info('selecting pkg data...') pkg_set = select_pkg_data(oracle_hook) if len(pkg_set): logging.info('pkg_set:') logging.info(pkg_set) for pkg in pkg_set: pkg_id = pkg[1] pkg_dict = {'pkg_data_' + str(pkg_id): pkg} param_dict.update(pkg_dict) task_name = 'pkg_' + str(pkg_id) PythonOperator( task_id=task_name, python_callable=run_load, op_kwargs={ 'oracle_hook': oracle_hook, 'param_dict': param_dict, 'pkg_id': pkg_id }, retries=0, dag=dag ) else: logging.info('Undelivered packages not found') DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag) return dag interval = '*/10 * * * *' args = { 'owner': 'airflow', 'start_date': datetime(2018, 11, 12) } oracle_hook = OracleHook('ora_meta') main_dag_name = 'load' load_dag_name = 'load_packages' param_dict = { #       } main_dag = DAG( dag_id=main_dag_name, default_args=args, schedule_interval=interval, catchup=False ) subdag = SubDagOperator( subdag=pkg_subdag_factory( oracle_hook, main_dag_name, load_dag_name, args['start_date'], interval, param_dict ), task_id=load_dag_name, dag=main_dag ) #  ,       latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag) subdag.set_upstream(latest_only) 

Las siguientes capturas de pantalla muestran cómo se ve esto como resultado.
Apariencia de DAG:



Apariencia del subdag en ausencia de paquetes para la entrega:



Apariencia del subdag en presencia de paquetes para entrega:



Problemas y matices


  • Catchup no funcionó como esperábamos: después de encender el dag apagado, se produjeron varios lanzamientos (no durante todo el período del cronograma, sino 2-3 al mismo tiempo). Debido a esto, tuve que agregar LatestOnlyOperator, para que todos los lanzamientos, excepto el último, ocurran inactivos.
  • Si crea un subdag, debe habilitarlo explícitamente a través de la línea de comando con el comando "airflow unpause <subdag_name>", de lo contrario no se inicia, y debe hacerlo al crear cada subdag nuevo (un subdag con un nuevo nombre), lo que hará que sea muy inconveniente generar dinámicamente . Si establece el parámetro "dags_are_paused_at_creation" = false en la configuración del flujo de aire ($ airflow_home / airflow.cfg), esto no será necesario, pero puede tener consecuencias desagradables con el lanzamiento automático automático de un nuevo dag; me parece que necesita comenzar nuevos dags explícitamente manualmente.

Como dice la documentación , "Una capacidad clave de Airflow es que estas ejecuciones de DAG son elementos atómicos, idempotentes, <...>", lo que significa: "Se entiende que el dag se genera sin cambios". Debido al hecho de que violamos esta "capacidad clave", aprendimos algunas cosas:


  • Un dag vacío (sin tareas) comienza y no puede terminar, obstruyendo todos los paralelos posibles. Esto sucedió si no había paquetes de descarga en el momento en que se lanzó el dag. Para evitar esto, se crea un DummyOperator.
  • Si durante el trabajo la tarea dag se regenera y ya no hay esta tarea en la dag actualizada, se detendrá con la interrupción del proceso en ejecución. Y esto sucede con cada paso del sheduler, pero no con mayor frecuencia que la indicada en el parámetro min_file_process_interval en la configuración de flujo de aire ($ airflow_home / airflow.cfg). Para evitar esto, hicimos la generación de paquetes de tareas no solo por el estado de "listo para descargar", sino también por el estado de "carga en proceso" para que continúe siendo generado mientras la descarga está en progreso.
  • Si la versión actual del dag no tiene ninguna tarea anterior, por ejemplo, hubo una tarea con el nombre "pkg_123" que se cargó anteriormente y no se creó en la versión actual del dag, no puede ver estadísticas sobre esta tarea en la interfaz web. Aunque toda la información se almacena en la base de datos de flujo de aire y, sobre esta base, es posible construir un hermoso tablero para lanzamientos antiguos por medios externos. Cuando surge la pregunta sobre la frecuencia de actualización de DAG y la capacidad de deshabilitarlo, puede leer sobre esto aquí .
  • Debido a la generación dinámica de task_id, es necesario lanzar un diccionario con datos para todos los paquetes actuales en cada tarea, así como la identificación del paquete actual, de modo que cuando la función en sí misma funcione, seleccione los datos necesarios del mismo diccionario por la identificación del paquete. De lo contrario, todas las tareas comenzaron para el mismo paquete.

Execution_date en registros y hora de inicio real


Terminaré con otro matiz de Airflow, que al principio confunde y no se describe con palabras simples en otros artículos: fecha de ejecución (que se muestra en todos los registros, en la interfaz, etc.) y la hora de inicio real. En principio, la descripción se encuentra en la documentación del flujo de aire y las preguntas frecuentes , pero el resultado no es obvio, por lo que me parece que se requiere aclaración.


Documentación : "El programador inicia su trabajo al final del período"
Resultado : si crea un dag con una programación, por ejemplo, @daily, una ejecución con execute_date "2018-01-01 00:00:00" realmente ejecutará "2018-02-01 00:00:00".


Enlaces utiles:


Documentación de actualización
LatestOnlyOperator Documentation
Más documentación sobre LatestOnlyOperator
Ejemplo de uso de LatestOnlyOperator
Algunos matices
Pregunta sobre dependencias en el lanzamiento anterior
Un pequeño ejemplo sobre la generación dinámica.
Una pregunta sobre la generación dinámica con una pequeña descripción.

Source: https://habr.com/ru/post/es435746/


All Articles