Comment créer un déclencheur DAG dans Airflow à l'aide de l'API expérimentale

Lors de la préparation de nos programmes éducatifs, nous rencontrons périodiquement des difficultés de travail avec certains outils. Et au moment où nous les rencontrons, il n'y a pas toujours assez de documentation et d'articles qui pourraient aider à faire face à ce problème.


Ce fut le cas, par exemple, en 2015, et nous, dans le cadre du programme Big Data Specialist, avons utilisé un cluster Hadoop avec Spark pour 35 utilisateurs simultanés. Comment le faire cuire sous un tel cas d'utilisation en utilisant YARN n'était pas clair. En conséquence, après avoir compris et avoir parcouru le chemin par eux-mêmes, ils ont fait un post sur Habré et ont également joué au Meetup Moscow Spark .


Contexte


Cette fois, nous allons parler d'un autre programme - Data Engineer . Nos participants y construisent deux types d'architecture: lambda et kappa. Et dans l'architecture lamdba, dans le cadre du traitement par lots, Airflow est utilisé pour transférer les journaux de HDFS vers ClickHouse.


Tout est bon en général. Laissez-les construire leurs pipelines. Cependant, il y a un «mais»: tous nos programmes sont technologiques en termes de processus d'apprentissage lui-même. Pour vérifier le laboratoire, nous utilisons des vérificateurs automatiques: le participant doit accéder à son compte personnel, cliquer sur le bouton «Vérifier» et après un certain temps, il voit une sorte de rétroaction étendue sur ce qu'il a fait. Et c'est à ce moment que nous commençons à aborder notre problème.


La vérification de ce laboratoire est organisée comme suit: nous envoyons un paquet de données de contrôle à Kafka du participant, puis Gobblin transfère le paquet de données à HDFS, puis Airflow prend ce paquet de données et le place dans ClickHouse. L'astuce est qu'Airflow ne doit pas le faire en temps réel, il le fait selon un calendrier: une fois en 15 minutes, il prend un tas de fichiers et le jette.


Il s'avère que nous devons d'une manière ou d'une autre déclencher leur DAG à la demande du vérificateur ici et maintenant. Sur Google, nous avons découvert que pour les versions ultérieures d'Airflow, il existe ce qu'on appelle l' API expérimentale . Le mot experimental , bien sûr, semble effrayant, mais que faire ... Soudain, il s'envolera.


Ensuite, nous décrivons tout le chemin: de l'installation d'Airflow à la génération d'une demande POST qui déclenche un DAG à l'aide de l'API expérimentale. Nous travaillerons avec Ubuntu 16.04.


1. Installation d'Airflow


Vérifions que nous avons Python 3 et virtualenv.


 $ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0 

Si l'un de ces éléments manque, installez-le.


Créez maintenant un répertoire dans lequel nous continuerons à travailler avec Airflow.


 $ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $ 

Installez Airflow:


 (venv) $ pip install airflow 

La version sur laquelle nous avons travaillé: 1.10.


Maintenant, nous devons créer le répertoire airflow_home où les fichiers DAG et les plugins Airflow seront situés. Après avoir créé le répertoire, définissez la variable d'environnement AIRFLOW_HOME .


 (venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home> 

L'étape suivante consiste à exécuter la commande qui créera et initialisera la base de données de flux de données dans SQLite:


 (venv) $ airflow initdb 

La base de données sera créée par défaut dans airflow.db .


Vérifiez si Airflow est installé:


 $ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0 

Si la commande a fonctionné, Airflow a créé son fichier de configuration airflow.cfg dans AIRFLOW_HOME :


 $ tree . ├── airflow.cfg └── unittests.cfg 

Airflow possède une interface Web. Il peut être lancé en exécutant la commande:


 (venv) $ airflow webserver --port 8081 

Vous pouvez maintenant accéder à l'interface Web dans un navigateur sur le port 8081 sur l'hôte sur lequel Airflow a été lancé, par exemple: <hostname:8081> .


2. Travailler avec l'API expérimentale


Sur ce point, Airflow est configuré et prêt à fonctionner. Cependant, nous devons également exécuter l'API expérimentale. Nos vérificateurs sont écrits en Python, donc toutes les demandes y seront en utilisant la bibliothèque de requests .


En fait, l'API fonctionne déjà pour les requêtes simples. Par exemple, une telle requête vous permet de tester son fonctionnement:


 >>> import requests >>> host = <your hostname> >>> airflow_port = 8081 #   ,    8080 >>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text 'OK' 

Si vous avez reçu un tel message en réponse, cela signifie que tout fonctionne.


Cependant, lorsque nous voulons activer DAG, nous devrons faire face Ă  ce type de demande ne peut pas ĂŞtre faite sans authentification.


Pour ce faire, vous devrez effectuer un certain nombre d'actions.


Tout d'abord, vous devez ajouter ceci Ă  la configuration:


 [api] auth_backend = airflow.contrib.auth.backends.password_auth 

Ensuite, vous devez créer votre utilisateur avec des droits d'administrateur:


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

Ensuite, vous devez créer un utilisateur avec des droits normaux, qui sera autorisé à effectuer un déclencheur DAG.


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

Maintenant, tout est prĂŞt.


3. Lancer une requĂŞte POST


La demande POST elle-mĂŞme ressemblera Ă  ceci:


 >>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n' 

Demande traitée avec succès.


Par conséquent, nous accordons un peu de temps au DAG pour le traitement et faisons une demande à la table ClickHouse, essayant d'attraper un paquet de contrôle de données.


Vérification terminée.

Source: https://habr.com/ru/post/fr445852/


All Articles