大家好! 我叫Anton,在Rostelecom中,我正在开发一个中央数据仓库。 我们的存储由模块组成,其中的协调器使用多个Informatica实例,作为向开源解决方案过渡的一部分,我们希望将其中一些实例转移到Airflow。 由于Informatica和Airflow在根本上是不同的工具,因此采用和重复现有的实现方式并不是那么简单。 一方面,我们希望获得一种工作流程,使其尽可能接近当前的实现方式;另一方面,我们要使用最有趣的第一项Airflow原理 -动态性,从而赋予灵活性。
在这篇简短的文章中,我想谈谈在Airflow中真正动态生成DAG的问题。 在此主题上,Internet主要包含来自印度开发人员的许多文章,这些文章的形式为“您可以在Airflow中动态生成dag,这是一个示例:<生成10个HelloWorld任务/ dags的示例>” 。 但是我们对dag的生成很感兴趣,它们会随着任务的数目和名称的变化而及时变化。

当前,已实现Airflow以启动一个模块,该模块在远程源服务器上生成数据包,以进一步上传到存储库。 它按照简单的时间表运行;详细检查它不是很有趣。 而且,很快将通过Airflow模块引入业务流程,该模块将数据包传递给中间阶段中的进一步加载。 在这里,我们正在等待一系列耙,这些耙的描述我在任何地方都找不到,并希望分享我的经验。
在Habré上的Airflow上,Mail.ru的开发人员提供了几篇文章,其中对基本内容进行了很好的描述:
气流概述
通过jinja进行分支,参数化,以及通过Xcom在DAG中进行通信
小词汇表:
DAG / DAG是有向无环图。 在这种情况下,我们指的是一系列相互依赖且不构成周期的动作。
SubDAG / Sabdag-与DAG相同,但位于另一个DAG内,作为父DAG的一部分启动(即作为任务),并且没有单独的时间表。
操作员/操作员 -在dag中执行特定操作的特定步骤。 例如,PythonOperator。
任务/任务 -启动DAG时操作员的特定实例,在Web界面中显示为小方块。 例如,名为run_task的PythonOperator可以在DAG check_dag中运行。
动态任务生成的想法,问题和缺点
输入数据:
乐团存储库中有一个表,我们称它为PKG_TABLE。
有一种机制可以将条目添加到PKG_TABLE表中,以便可以下载数据包。
我们想要的是:
DAG,将为准备下载的软件包生成并开始下载DAG(破坏者:最后,一切都证明了)。
使用下面的代码,我们生成一个由LatestOnlyOperator任务及其相关任务组成的dag,该任务是在运行pkg_subdag_factory函数时创建的,该函数从PKG_TABLE表中接收软件包列表并生成多个PythonOperator。 如果没有要下载的软件包,则会生成DummyOperator。
他们决定使用一个PythonOperator制作第一个版本,并使用Airflow将其重新分配到详细的工作流程中。
以下屏幕截图显示了结果。
DAG外观:

在没有要交付的包裹的情况下,subdag的外观:

分包在要交付的包装中的外观:

问题与细微之处
- 赶上未如我们预期的那样工作:打开已关闭的dag后,发生了多次启动(不是在整个计划期间,而是在同一时间2-3次)。 因此,我必须添加LatestOnlyOperator,以便除最后一个启动之外的所有启动都将处于空闲状态。
- 如果创建子子目录,则需要通过命令行使用“ airflow unpause <subdag_name>”命令显式启用它,否则它不会启动,并且在创建每个新子子目录(具有新名称的子子目录)时需要执行此操作,这将使动态生成非常不便。 如果在气流配置($ airflow_home / airflow.cfg)中将参数“ dags_are_paused_at_creation”设置为false,则没有必要,但这会导致自动自动启动新dag带来不愉快的后果-在我看来,您需要手动启动新的dagas。
正如文档所述 ,“ Airflow的一项关键功能是这些DAG运行是原子的,幂等项,...”,这意味着:“据了解,dag是不变生成的。” 由于我们违反了此“关键功能”,因此我们学到了一些东西:
- 空的dag(无任务)开始并且无法结束,从而阻塞了所有可能的并行操作。 如果启动dag时没有下载包,就会发生这种情况。 要解决此问题,将创建一个DummyOperator。
- 如果在工作期间重新生成任务dag,并且更新的dag中不再包含此任务,它将在运行过程中断的情况下停止。 而且,这在sheduler的每个步骤中都会发生,但发生的频率不超过气流配置($ airflow_home / airflow.cfg)中的min_file_process_interval参数所指示的次数。 为了避免这种情况,我们不仅根据“准备下载”的状态来生成任务包,还根据“正在加载”的状态来生成任务包,以便在下载进行时继续生成任务包。
- 如果当前版本的dag没有以前的任何任务-例如,有一个名为“ pkg_123”的任务已被较早加载,并且未在当前版本的dag中创建,则您无法在Web界面中看到有关此任务的统计信息。 尽管所有信息都存储在气流数据库中,并且在此基础上,也可以通过外部方式为旧发射建立漂亮的仪表板。 当出现有关更新DAG的频率和禁用它的能力的问题时,您可以在此处阅读有关内容。
- 由于task_id是动态生成的,因此您必须将包含所有当前包的数据的字典以及当前包的ID以及当前包的ID放入字典中,以便在函数本身起作用时,根据包ID从同一字典中选择必要的数据。 否则,所有任务都针对同一软件包启动。
日志中的Execution_date和实际开始时间
最后,我将介绍Airflow的另一个细微差别,它一开始会造成混淆,在其他文章中没有用简单的词来描述-执行日期(在所有日志,界面等中显示)和实际开始时间。 原则上,描述在气流和FAQ的文档中 ,但结果并不明显,因此在我看来,需要进行澄清。
文档 :“计划员会在期末启动工作”
结果 :如果您创建一个带有日程表的dag,例如@daily,则运行期为execution_date“ 2018-01-01 00:00:00”的运行实际上将运行“ 2018-02-01 00:00:00”。
有用的链接:
赶超文档
LatestOnlyOperator文档
有关LatestOnlyOperator的更多文档
LatestOnlyOperator的用法示例
一些细微差别
有关对先前启动的依赖关系的问题
一个有关动态生成的小例子
一个关于动态生成的问题