Cómo hacer que un DAG se dispare en Airflow usando la API experimental

Al preparar nuestros programas educativos, periódicamente encontramos dificultades en términos de trabajo con algunas herramientas. Y en ese momento cuando los encontramos, no siempre hay suficiente documentación y artículos que ayuden a hacer frente a este problema.


Este fue el caso, por ejemplo, en 2015, y nosotros, en el programa Big Data Specialist, utilizamos un clúster Hadoop con Spark para 35 usuarios simultáneos. Cómo cocinarlo en un caso de usuario de este tipo usando YARN no estaba claro. Como resultado, después de descubrir y caminar por el camino por su cuenta, hicieron una publicación en Habré y también se presentaron en el Moscow Spark Meetup .


Antecedentes


Esta vez hablaremos de otro programa: ingeniero de datos . Nuestros participantes construyen dos tipos de arquitectura sobre ella: lambda y kappa. Y en la arquitectura lamdba, como parte del procesamiento por lotes, Airflow se usa para transferir registros de HDFS a ClickHouse.


Todo está bien en general. Déjelos construir sus tuberías. Sin embargo, existe un "pero": todos nuestros programas son tecnológicos en términos del proceso de aprendizaje en sí. Para verificar el laboratorio, utilizamos verificadores automáticos: el participante debe ir a su cuenta personal, hacer clic en el botón "Verificar", y después de un tiempo ve algún tipo de retroalimentación extendida sobre lo que ha hecho. Y es en este momento que comenzamos a abordar nuestro problema.


La verificación de este laboratorio se organiza de la siguiente manera: enviamos un paquete de datos de control a Kafka del participante, luego Gobblin transfiere el paquete de datos a HDFS, luego Airflow toma este paquete de datos y lo coloca en ClickHouse. El truco es que Airflow no debe hacer esto en tiempo real, lo hace en un horario: cada 15 minutos toma un montón de archivos y los arroja.


Resulta que necesitamos de alguna manera activar su DAG por nuestra cuenta a petición del verificador aquí y ahora. Buscando en Google, descubrimos que para las versiones posteriores de Airflow existe la llamada API Experimental . La palabra experimental , por supuesto, suena aterradora, pero qué hacer ... De repente, volará.


A continuación, describimos todo el camino: desde la instalación de Airflow hasta la generación de una solicitud POST que desencadena un DAG utilizando la API experimental. Trabajaremos con Ubuntu 16.04.


1. Instalación del flujo de aire


Vamos a comprobar que tenemos Python 3 y virtualenv.


 $ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0 

Si falta algo de esto, instálelo.


Ahora cree un directorio en el que continuaremos trabajando con Airflow.


 $ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $ 

Instalar flujo de aire:


 (venv) $ pip install airflow 

La versión en la que trabajamos: 1.10.


Ahora necesitamos crear el directorio airflow_home donde se airflow_home los archivos DAG y los complementos de Airflow. Después de crear el directorio, establezca la variable de entorno AIRFLOW_HOME .


 (venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home> 

El siguiente paso es ejecutar el comando que creará e inicializará la base de datos de flujo de datos en SQLite:


 (venv) $ airflow initdb 

La base de datos se creará en airflow.db de manera predeterminada.


Compruebe si Airflow está instalado:


 $ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0 

Si el comando funcionó, Airflow creó su archivo de configuración airflow.cfg en AIRFLOW_HOME :


 $ tree . ├── airflow.cfg └── unittests.cfg 

Airflow tiene una interfaz web. Se puede iniciar ejecutando el comando:


 (venv) $ airflow webserver --port 8081 

Ahora puede acceder a la interfaz web en un navegador en el puerto 8081 en el host donde se lanzó Airflow, por ejemplo: <hostname:8081> .


2. Trabajando con la API Experimental


En esto, Airflow está configurado y listo para funcionar. Sin embargo, también necesitamos ejecutar la API experimental. Nuestras fichas están escritas en Python, por lo que todas las solicitudes se incluirán en la biblioteca de requests .


De hecho, la API ya está funcionando para consultas simples. Por ejemplo, una solicitud de este tipo le permite probar su funcionamiento:


 >>> import requests >>> host = <your hostname> >>> airflow_port = 8081 #   ,    8080 >>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text 'OK' 

Si recibió un mensaje de este tipo en respuesta, esto significa que todo funciona.


Sin embargo, cuando queremos activar DAG, nos enfrentaremos al hecho de que este tipo de solicitud no se puede realizar sin autenticación.


Para hacer esto, deberá realizar una serie de acciones.


En primer lugar, debe agregar esto a la configuración:


 [api] auth_backend = airflow.contrib.auth.backends.password_auth 

Luego, debe crear su usuario con derechos de administrador:


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

Luego, debe crear un usuario con derechos normales, a quien se le permitirá hacer un disparador DAG.


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

Ahora todo está listo.


3. Inicio de una solicitud POST


La solicitud POST en sí se verá así:


 >>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n' 

Solicitud procesada con éxito.


En consecuencia, le damos algo de tiempo al DAG para el procesamiento y hacemos una solicitud a la tabla ClickHouse, tratando de capturar un paquete de datos de control.


Verificación completada.

Source: https://habr.com/ru/post/445852/


All Articles