Bonjour à tous! Mon nom est Anton, à Rostelecom je développe un entrepôt de données central. Notre stockage se compose de modules, dont l'orchestrateur utilise plusieurs instances Informatica, dont certaines que nous souhaitons transférer vers Airflow dans le cadre de la transition vers des solutions open source. Étant donné qu'Informatica et Airflow sont des outils fondamentalement différents, prendre et répéter une implémentation existante n'est pas si simple. Nous voulions obtenir un workflow, d'une part, aussi proche que possible de l'implémentation actuelle et, d'autre part, en utilisant le premier principe d'Airflow le plus intéressant - le dynamisme, ce qui donne de la flexibilité.
Dans ce court article, je veux parler de la génération vraiment dynamique des DAG dans Airflow. Sur ce sujet, Internet contient principalement de nombreux articles de développeurs indiens, qui sont des matériaux de la forme "vous pouvez générer dynamiquement des dags dans Airflow, voici un exemple: <exemple pour générer 10 tâches / dags HelloWorld>" . Mais nous étions intéressés par la génération de dags, qui évolueront dans le temps avec un nombre et des noms de tâches variables.

Actuellement, Airflow est implémenté pour lancer un module qui génère des paquets de données sur des serveurs source distants pour un téléchargement ultérieur vers le référentiel. Il se déroule selon un planning simple, il n'est pas très intéressant de l'examiner en détail. En outre, une orchestration sera bientôt introduite via le module Airflow, qui fournit des paquets de données pour un chargement supplémentaire couche par couche dans le transfert intermédiaire. Ici, nous attendons une série de râteaux, dont je n'ai trouvé la description nulle part et je veux partager mon expérience.
Sur Airflow sur Habré, il y a quelques articles de développeurs de Mail.ru dans lesquels les choses de base sont bien décrites:
Description générale d'Airflow
Branchement, paramétrage via jinja et communication au sein du DAG via Xcom
Petit glossaire:
DAG / DAG est un graphique acyclique dirigé. Dans ce cas, nous entendons une séquence d'actions qui dépendent les unes des autres et ne forment pas de cycles.
SubDAG / Sabdag - le même que DAG, mais situé à l'intérieur d'un autre DAG, lancé dans le cadre du DAG parent (c'est-à-dire, étant une tâche) et n'ayant pas de calendrier séparé.
Opérateur / Opérateur - une étape spécifique du dag, effectuant une action spécifique. Par exemple, PythonOperator.
Tâche / Tâche - une instance spécifique de l'opérateur lors du démarrage du DAG, est visualisée comme un petit carré dans l'interface Web. Par exemple, PythonOperator, qui est appelé run_task et s'exécute dans le DAG check_dag .
L'idée de la génération dynamique de tâches en dag, problèmes et inconvénients
Données d'entrée:
Il y a une table dans le référentiel d'orchestre, appelons-la PKG_TABLE.
Il existe un mécanisme qui ajoute des entrées à la table PKG_TABLE que le paquet de données est prêt à télécharger.
Ce que nous voulions:
DAG, qui sera généré pour les packages prêts à télécharger et commencer à les télécharger (spoiler: à la fin, tout s'est avéré).
En utilisant le code ci-dessous, nous générons un dag composé de la tâche LatestOnlyOperator et de sa tâche dépendante, qui est créée lorsque la fonction pkg_subdag_factory est exécutée, qui reçoit une liste de packages de la table PKG_TABLE et génère plusieurs PythonOperators. S'il n'y a aucun package à télécharger, un DummyOperator est généré.
Ils ont décidé de créer la première version avec un PythonOperator, en la redistribuant dans un workflow détaillé à l'aide d'Airflow.
Les captures d'écran suivantes montrent à quoi cela ressemble en conséquence.
Apparence de DAG:

Apparence du sous-dag en l'absence de colis pour la livraison:

Apparition du sous-dag en présence de colis pour la livraison:

Problèmes et nuances
- Le rattrapage n'a pas fonctionné comme prévu: après avoir allumé le dag éteint, plusieurs lancements ont eu lieu (pas pour toute la période du calendrier, mais 2-3 en même temps). Pour cette raison, j'ai dû ajouter LatestOnlyOperator, afin que tous les lancements, sauf le dernier, deviennent inactifs.
- Si vous créez un sous-dag, vous devez l'activer explicitement via la ligne de commande avec la commande "airflow unpause <subdag_name>", sinon il ne démarre pas, et vous devez le faire lors de la création de chaque nouveau sous-dag (un sous-dag avec un nouveau nom), ce qui rendra très gênant la génération dynamique . Si vous définissez le paramètre "dags_are_paused_at_creation" = false dans la configuration de l'airflow ($ airflow_home / airflow.cfg), cela ne sera pas nécessaire, mais cela peut entraîner des conséquences désagréables avec le lancement automatique automatique d'un nouveau dag - il me semble que vous devez démarrer de nouveaux dagas manuellement.
Comme le dit la documentation , "Une des principales capacités d'Airflow est que ces DAG Runs sont des éléments atomiques et idempotents, <...>", ce qui signifie: "Il est entendu que le DAG est généré inchangé." Du fait que nous avons violé cette "capacité clé", nous avons appris certaines choses:
- Un dag vide (sans tâches) commence et ne peut pas se terminer, obstruant tous les parallèles possibles. Cela s'est produit s'il n'y avait aucun package de téléchargement au moment du lancement du fichier. Pour contourner ce problème, un DummyOperator est créé.
- Si pendant le travail, le fichier de tâches est régénéré et qu'il n'y a plus de tâche dans le fichier de données mis à jour - il s'arrêtera avec interruption du processus en cours. Et cela se produit à chaque étape du sheduler, mais pas plus souvent qu'indiqué dans le paramètre min_file_process_interval dans la configuration de l'airflow ($ airflow_home / airflow.cfg). Pour contourner cela, nous avons fait la génération de packs de tâches non seulement par le statut "prêt à télécharger", mais aussi par le statut "chargement en cours" afin qu'il continue d'être généré pendant le téléchargement.
- Si la version actuelle du dag ne contient aucune tâche antérieure - par exemple, une tâche portant le nom "pkg_123" a été chargée plus tôt et n'est pas créée dans la version actuelle du dag, vous ne pouvez pas voir de statistiques sur cette tâche dans l'interface Web. Bien que toutes les informations soient stockées dans la base de données de flux d'air et sur sa base, il est possible de construire un beau tableau de bord pour les lancements anciens par des moyens externes. Lorsque la question se pose sur la fréquence de mise à jour des DAG et la possibilité de le désactiver, vous pouvez lire à ce sujet ici .
- En raison de la génération dynamique de task_id, vous devez lancer un dictionnaire avec les données de tous les packages actuels dans chacune de ces tâches, ainsi que l'ID du package actuel, de sorte que lorsque la fonction elle-même fonctionne, sélectionnez les données nécessaires dans le même dictionnaire par ID de package. Sinon, toutes les tâches ont démarré pour le même package.
Date d'exécution dans les journaux et heure de début réelle
Je terminerai par une autre nuance d'Airflow, qui au début confond et n'est pas décrite en termes simples dans d'autres articles - date_exécution (qui est affichée dans tous les journaux, dans l'interface, etc.) et l'heure de début réelle. En principe, la description se trouve dans la documentation du flux d'air et la FAQ , mais le résultat n'est pas évident, il me semble donc qu'une clarification est nécessaire.
Documentation : "Le planificateur lance votre travail à la fin de la période"
Résultat : si vous créez un dag avec une planification, par exemple @daily, une exécution avec la date d'exécution "2018-01-01 00:00:00" s'exécutera réellement "2018-02-01 00:00:00".
Liens utiles:
Documentation de rattrapage
Documentation LatestOnlyOperator
Plus de documentation sur LatestOnlyOperator
Exemple d'utilisation de LatestOnlyOperator
Quelques nuances
Question sur les dépendances par rapport au lancement précédent
Un petit exemple de génération dynamique
Une question sur la génération dynamique avec une petite description