气流中动态生成DAG

大家好! 我叫Anton,在Rostelecom中,我正在开发一个中央数据仓库。 我们的存储由模块组成,其中的协调器使用多个Informatica实例,作为向开源解决方案过渡的一部分,我们希望将其中一些实例转移到Airflow。 由于Informatica和Airflow在根本上是不同的工具,因此采用和重复现有的实现方式并不是那么简单。 一方面,我们希望获得一种工作流程,使其尽可能接近当前的实现方式;另一方面,我们要使用最有趣的第一项Airflow原理 -动态性,从而赋予灵活性。


在这篇简短的文章中,我想谈谈在Airflow中真正动态生成DAG的问题。 在此主题上,Internet主要包含来自印度开发人员的许多文章,这些文章的形式为“您可以在Airflow中动态生成dag,这是一个示例:<生成10个HelloWorld任务/ dags的示例>” 。 但是我们对dag的生成很感兴趣,它们会随着任务的数目和名称的变化而及时变化。



当前,已实现Airflow以启动一个模块,该模块在远程源服务器上生成数据包,以进一步上传到存储库。 它按照简单的时间表运行;详细检查它不是很有趣。 而且,很快将通过Airflow模块引入业务流程,该模块将数据包传递给中间阶段中的进一步加载。 在这里,我们正在等待一系列耙,这些耙的描述我在任何地方都找不到,并希望分享我的经验。


在Habré上的Airflow上,Mail.ru的开发人员提供了几篇文章,其中对基本内容进行了很好的描述:


气流概述
通过jinja进行分支,参数化,以及通过Xcom在DAG中进行通信


小词汇表:


DAG / DAG是有向无环图。 在这种情况下,我们指的是一系列相互依赖且不构成周期的动作。
SubDAG / Sabdag-与DAG相同,但位于另一个DAG内,作为父DAG的一部分启动(即作为任务),并且没有单独的时间表。
操作员/操作员 -在dag中执行特定操作的特定步骤。 例如,PythonOperator。
任务/任务 -启动DAG时操作员的特定实例,在Web界面中显示为小方块。 例如,名为run_task的PythonOperator可以在DAG check_dag中运行。


动态任务生成的想法,问题和缺点


输入数据:


乐团存储库中有一个表,我们称它为PKG_TABLE。
有一种机制可以将条目添加到PKG_TABLE表中,以便可以下载数据包。


我们想要的是:


DAG,将为准备下载的软件包生成并开始下载DAG(破坏者:最后,一切都证明了)。


使用下面的代码,我们生成一个由LatestOnlyOperator任务及其相关任务组成的dag,该任务是在运行pkg_subdag_factory函数时创建的,该函数从PKG_TABLE表中接收软件包列表并生成多个PythonOperator。 如果没有要下载的软件包,则会生成DummyOperator。


他们决定使用一个PythonOperator制作第一个版本,并使用Airflow将其重新分配到详细的工作流程中。


# -*- coding: utf-8 -*- """  DAG    """ from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.hooks.oracle_hook import OracleHook from datetime import datetime, timedelta import logging from scripts.lib import run_load, select_pkg_data def pkg_subdag_factory( oracle_hook, parent_dag_name, child_dag_name, start_date, schedule_interval, param_dict): """ ,  DAG    PythonOperator\` (1  - 1 PythonOperator)  : oracle_hook - airflow.hooks.oracle_hook.OracleHook parent_dag_name -  ""  child_dag_name -    start_date -       schedule_interval -      param_dict -     """ dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=schedule_interval, start_date=start_date, catchup=False ) logging.info('selecting pkg data...') pkg_set = select_pkg_data(oracle_hook) if len(pkg_set): logging.info('pkg_set:') logging.info(pkg_set) for pkg in pkg_set: pkg_id = pkg[1] pkg_dict = {'pkg_data_' + str(pkg_id): pkg} param_dict.update(pkg_dict) task_name = 'pkg_' + str(pkg_id) PythonOperator( task_id=task_name, python_callable=run_load, op_kwargs={ 'oracle_hook': oracle_hook, 'param_dict': param_dict, 'pkg_id': pkg_id }, retries=0, dag=dag ) else: logging.info('Undelivered packages not found') DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag) return dag interval = '*/10 * * * *' args = { 'owner': 'airflow', 'start_date': datetime(2018, 11, 12) } oracle_hook = OracleHook('ora_meta') main_dag_name = 'load' load_dag_name = 'load_packages' param_dict = { #       } main_dag = DAG( dag_id=main_dag_name, default_args=args, schedule_interval=interval, catchup=False ) subdag = SubDagOperator( subdag=pkg_subdag_factory( oracle_hook, main_dag_name, load_dag_name, args['start_date'], interval, param_dict ), task_id=load_dag_name, dag=main_dag ) #  ,       latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag) subdag.set_upstream(latest_only) 

以下屏幕截图显示了结果。
DAG外观:



在没有要交付的包裹的情况下,subdag的外观:



分包在要交付的包装中的外观:



问题与细微之处


  • 赶上未如我们预期的那样工作:打开已关闭的dag后,发生了多次启动(不是在整个计划期间,而是在同一时间2-3次)。 因此,我必须添加LatestOnlyOperator,以便除最后一个启动之外的所有启动都将处于空闲状态。
  • 如果创建子子目录,则需要通过命令行使用“ airflow unpause <subdag_name>”命令显式启用它,否则它不会启动,并且在创建每个新子子目录(具有新名称的子子目录)时需要执行此操作,这将使动态生成非常不便。 如果在气流配置($ airflow_home / airflow.cfg)中将参数“ dags_are_paused_at_creation”设置为false,则没有必要,但这会导致自动自动启动新dag带来不愉快的后果-在我看来,您需要手动启动新的dagas。

正如文档所述 ,“ Airflow的一项关键功能是这些DAG运行是原子的,幂等项,...”,这意味着:“据了解,dag是不变生成的。” 由于我们违反了此“关键功能”,因此我们学到了一些东西:


  • 空的dag(无任务)开始并且无法结束,从而阻塞了所有可能的并行操作。 如果启动dag时没有下载包,就会发生这种情况。 要解决此问题,将创建一个DummyOperator。
  • 如果在工作期间重新生成任务dag,并且更新的dag中不再包含此任务,它将在运行过程中断的情况下停止。 而且,这在sheduler的每个步骤中都会发生,但发生的频率不超过气流配置($ airflow_home / airflow.cfg)中的min_file_process_interval参数所指示的次数。 为了避免这种情况,我们不仅根据“准备下载”的状态来生成任务包,还根据“正在加载”的状态来生成任务包,以便在下载进行时继续生成任务包。
  • 如果当前版本的dag没有以前的任何任务-例如,有一个名为“ pkg_123”的任务已被较早加载,并且未在当前版本的dag中创建,则您无法在Web界面中看到有关此任务的统计信息。 尽管所有信息都存储在气流数据库中,并且在此基础上,也可以通过外部方式为旧发射建立漂亮的仪表板。 当出现有关更新DAG的频率和禁用它的能力的问题时,您可以在此处阅读有关内容。
  • 由于task_id是动态生成的,因此您必须将包含所有当前包的数据的字典以及当前包的ID以及当前包的ID放入字典中,以便在函数本身起作用时,根据包ID从同一字典中选择必要的数据。 否则,所有任务都针对同一软件包启动。

日志中的Execution_date和实际开始时间


最后,我将介绍Airflow的另一个细微差别,它一开始会造成混淆,在其他文章中没有用简单的词来描述-执行日期(在所有日志,界面等中显示)和实际开始时间。 原则上,描述在气流FAQ文档中 ,但结果并不明显,因此在我看来,需要进行澄清。


文档 :“计划员会在期末启动工作”
结果 :如果您创建一个带有日程表的dag,例如@daily,则运行期为execution_date“ 2018-01-01 00:00:00”的运行实际上将运行“ 2018-02-01 00:00:00”。


有用的链接:


赶超文档
LatestOnlyOperator文档
有关LatestOnlyOperator的更多文档
LatestOnlyOperator的用法示例
一些细微差别
有关对先前启动的依赖关系的问题
一个有关动态生成的小例子
一个关于动态生成的问题

Source: https://habr.com/ru/post/zh-CN435746/


All Articles