如何使用实验性API在Airflow中触发DAG触发器

在准备我们的教育计划时,我们在使用某些工具方面会定期遇到困难。 并且在我们遇到它们的那一刻,总是没有足够的文档和文章来帮助解决这个问题。


例如,在2015年就是这种情况,在大数据专家计划中,我们为35个同时使用的用户使用了带有Spark的Hadoop集群。 目前尚不清楚如何在这种情况下使用YARN进行烹饪。 结果,他们自己弄清了自己的路,就在哈布雷(Habré)上发 ,还参加了莫斯科星火聚会Moscow Spark Meetup)


背景知识


这次我们将讨论另一个程序- 数据工程师 。 我们的参与者在其上构建两种类型的体系结构:lambda和kappa。 在lamdba架构中,作为批处理的一部分,Airflow用于将日志从HDFS传输到ClickHouse。


总的来说一切都很好。 让他们建立管道。 但是,有一个“但是”:就学习过程本身而言,我们所有的程序都是技术性的。 要检查实验室,我们使用自动检查器:参与者需要进入他的个人帐户,单击“检查”按钮,过了一会儿,他会看到一些关于自己所做工作的扩展反馈。 正是在这一刻,我们开始解决我们的问题。


检查此实验室的组织方式如下:我们将控制数据包发送到参与者的Kafka,然后Gobblin将数据包传输到HDFS,然后Airflow将此数据包放入ClickHouse。 诀窍是,Airflow不应实时执行此操作,而应按计划执行:每15分钟一次需要花费一堆文件并将其丢弃。


事实证明,我们需要在此处和现在的检查程序的要求下自行触发DAG。 谷歌搜索,我们发现对于更高版本的Airflow,存在所谓的实验API 。 “ experimental ”一词听起来确实很吓人,但是该怎么做……突然之间,它会飞起来。


接下来,我们描述整个方法:从安装Airflow到使用实验性API生成触发DAG的POST请求。 我们将使用Ubuntu 16.04。


1.安装气流


让我们检查一下是否有Python 3和virtualenv。


 $ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0 

如果缺少任何一个,请安装。


现在创建一个目录,我们将在其中继续使用Airflow。


 $ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $ 

安装气流:


 (venv) $ pip install airflow 

我们使用的版本:1.10。


现在,我们需要创建airflow_home目录,DAG文件和Airflow插件将位于该目录中。 创建目录后,设置环境变量AIRFLOW_HOME


 (venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home> 

下一步是执行将在SQLite中创建和初始化数据流数据库的命令:


 (venv) $ airflow initdb 

默认情况下,将在airflow.db创建数据库。


检查是否安装了气流:


 $ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0 

如果该命令有效,则Airflow在airflow.cfg中创建其配置文件AIRFLOW_HOME


 $ tree . ├── airflow.cfg └── unittests.cfg 

Airflow具有Web界面。 可以通过运行以下命令来启动它:


 (venv) $ airflow webserver --port 8081 

现在,您可以在启动了Airflow的主机上的端口8081上的浏览器中访问Web界面,例如: <hostname:8081>


2.使用实验性API


在此基础上,配置气流并准备就绪。 但是,我们还需要运行实验API。 我们的检查程序是用Python编写的,因此所有其他请求都将使用requests库包含在上面。


实际上,该API已经可以用于简单查询。 例如,这样的请求使您可以测试其操作:


 >>> import requests >>> host = <your hostname> >>> airflow_port = 8081 #   ,    8080 >>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text 'OK' 

如果您收到这样的消息作为响应,则表示一切正常。


但是,当我们想打开DAG时,我们将面临这样的事实,即如果没有身份验证,就无法发出这种请求。


为此,您将需要执行许多操作。


首先,您需要将其添加到配置中:


 [api] auth_backend = airflow.contrib.auth.backends.password_auth 

然后,您需要创建具有管理员权限的用户:


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

然后,您需要创建一个具有正常权限的用户,该用户将被允许创建DAG触发器。


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

现在一切就绪。


3.启动POST请求


POST请求本身将如下所示:


 >>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n' 

请求已成功处理。


因此,然后我们给DAG一些时间进行处理,并向ClickHouse表发出请求,以尝试捕获数据的控制包。


验证完成。

Source: https://habr.com/ru/post/zh-CN445852/


All Articles