C贸mo hacer que un DAG se dispare en Airflow usando la API experimental

Al preparar nuestros programas educativos, peri贸dicamente encontramos dificultades en t茅rminos de trabajo con algunas herramientas. Y en ese momento cuando los encontramos, no siempre hay suficiente documentaci贸n y art铆culos que ayuden a hacer frente a este problema.


Este fue el caso, por ejemplo, en 2015, y nosotros, en el programa Big Data Specialist, utilizamos un cl煤ster Hadoop con Spark para 35 usuarios simult谩neos. C贸mo cocinarlo en un caso de usuario de este tipo usando YARN no estaba claro. Como resultado, despu茅s de descubrir y caminar por el camino por su cuenta, hicieron una publicaci贸n en Habr茅 y tambi茅n se presentaron en el Moscow Spark Meetup .


Antecedentes


Esta vez hablaremos de otro programa: ingeniero de datos . Nuestros participantes construyen dos tipos de arquitectura sobre ella: lambda y kappa. Y en la arquitectura lamdba, como parte del procesamiento por lotes, Airflow se usa para transferir registros de HDFS a ClickHouse.


Todo est谩 bien en general. D茅jelos construir sus tuber铆as. Sin embargo, existe un "pero": todos nuestros programas son tecnol贸gicos en t茅rminos del proceso de aprendizaje en s铆. Para verificar el laboratorio, utilizamos verificadores autom谩ticos: el participante debe ir a su cuenta personal, hacer clic en el bot贸n "Verificar", y despu茅s de un tiempo ve alg煤n tipo de retroalimentaci贸n extendida sobre lo que ha hecho. Y es en este momento que comenzamos a abordar nuestro problema.


La verificaci贸n de este laboratorio se organiza de la siguiente manera: enviamos un paquete de datos de control a Kafka del participante, luego Gobblin transfiere el paquete de datos a HDFS, luego Airflow toma este paquete de datos y lo coloca en ClickHouse. El truco es que Airflow no debe hacer esto en tiempo real, lo hace en un horario: cada 15 minutos toma un mont贸n de archivos y los arroja.


Resulta que necesitamos de alguna manera activar su DAG por nuestra cuenta a petici贸n del verificador aqu铆 y ahora. Buscando en Google, descubrimos que para las versiones posteriores de Airflow existe la llamada API Experimental . La palabra experimental , por supuesto, suena aterradora, pero qu茅 hacer ... De repente, volar谩.


A continuaci贸n, describimos todo el camino: desde la instalaci贸n de Airflow hasta la generaci贸n de una solicitud POST que desencadena un DAG utilizando la API experimental. Trabajaremos con Ubuntu 16.04.


1. Instalaci贸n del flujo de aire


Vamos a comprobar que tenemos Python 3 y virtualenv.


 $ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0 

Si falta algo de esto, inst谩lelo.


Ahora cree un directorio en el que continuaremos trabajando con Airflow.


 $ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $ 

Instalar flujo de aire:


 (venv) $ pip install airflow 

La versi贸n en la que trabajamos: 1.10.


Ahora necesitamos crear el directorio airflow_home donde se airflow_home los archivos DAG y los complementos de Airflow. Despu茅s de crear el directorio, establezca la variable de entorno AIRFLOW_HOME .


 (venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home> 

El siguiente paso es ejecutar el comando que crear谩 e inicializar谩 la base de datos de flujo de datos en SQLite:


 (venv) $ airflow initdb 

La base de datos se crear谩 en airflow.db de manera predeterminada.


Compruebe si Airflow est谩 instalado:


 $ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0 

Si el comando funcion贸, Airflow cre贸 su archivo de configuraci贸n airflow.cfg en AIRFLOW_HOME :


 $ tree . 鈹溾攢鈹 airflow.cfg 鈹斺攢鈹 unittests.cfg 

Airflow tiene una interfaz web. Se puede iniciar ejecutando el comando:


 (venv) $ airflow webserver --port 8081 

Ahora puede acceder a la interfaz web en un navegador en el puerto 8081 en el host donde se lanz贸 Airflow, por ejemplo: <hostname:8081> .


2. Trabajando con la API Experimental


En esto, Airflow est谩 configurado y listo para funcionar. Sin embargo, tambi茅n necesitamos ejecutar la API experimental. Nuestras fichas est谩n escritas en Python, por lo que todas las solicitudes se incluir谩n en la biblioteca de requests .


De hecho, la API ya est谩 funcionando para consultas simples. Por ejemplo, una solicitud de este tipo le permite probar su funcionamiento:


 >>> import requests >>> host = <your hostname> >>> airflow_port = 8081 #   ,    8080 >>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text 'OK' 

Si recibi贸 un mensaje de este tipo en respuesta, esto significa que todo funciona.


Sin embargo, cuando queremos activar DAG, nos enfrentaremos al hecho de que este tipo de solicitud no se puede realizar sin autenticaci贸n.


Para hacer esto, deber谩 realizar una serie de acciones.


En primer lugar, debe agregar esto a la configuraci贸n:


 [api] auth_backend = airflow.contrib.auth.backends.password_auth 

Luego, debe crear su usuario con derechos de administrador:


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

Luego, debe crear un usuario con derechos normales, a quien se le permitir谩 hacer un disparador DAG.


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

Ahora todo est谩 listo.


3. Inicio de una solicitud POST


La solicitud POST en s铆 se ver谩 as铆:


 >>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n' 

Solicitud procesada con 茅xito.


En consecuencia, le damos algo de tiempo al DAG para el procesamiento y hacemos una solicitud a la tabla ClickHouse, tratando de capturar un paquete de datos de control.


Verificaci贸n completada.

Source: https://habr.com/ru/post/445852/


All Articles