Dynamische DAG-Erzeugung im Luftstrom

Hallo allerseits! Mein Name ist Anton, in Rostelecom entwickle ich ein zentrales Data Warehouse. Unser Repository besteht aus Modulen, deren Orchestrator mehrere Informatica-Instanzen verwendet, von denen einige im Rahmen des Übergangs zu Open-Source-Lösungen auf Airflow übertragen werden sollen. Da Informatica und Airflow grundsätzlich unterschiedliche Tools sind, ist es nicht so einfach, eine vorhandene Implementierung zu übernehmen und zu wiederholen. Wir wollten einerseits einen Workflow erhalten, der der aktuellen Implementierung so nahe wie möglich kommt, und andererseits das interessanteste erste Airflow-Prinzip verwenden - Dynamik, die Flexibilität bietet.


In diesem kurzen Artikel möchte ich über die wirklich dynamische Erzeugung von DAGs in Airflow sprechen. Zu diesem Thema enthält das Internet hauptsächlich viele Artikel von Entwicklern aus Indien, die Materialien der Form "Sie können Dags in Airflow dynamisch generieren, hier ein Beispiel: <Beispiel zum Generieren von 10 HelloWorld-Aufgaben / Dags>" sind . Wir waren jedoch an der Erzeugung von Dags interessiert, die sich mit der Zeit mit einer variablen Anzahl und Namen von Aufgaben ändern werden.



Derzeit wird Airflow implementiert, um ein Modul zu starten, das Datenpakete auf Remote-Quellservern für den weiteren Upload in das Repository generiert. Es läuft nach einem einfachen Zeitplan, es ist nicht sehr interessant, es im Detail zu untersuchen. Außerdem wird in Kürze eine Orchestrierung über das Airflow-Modul eingeführt, das Datenpakete zum weiteren Laden durch Schichten in Zwischenstufen bereitstellt. Hier warten wir auf eine Reihe von Rechen, deren Beschreibungen ich nirgendwo gefunden habe und die ich teilen möchte.


Auf Airflow on Habré gibt es einige Artikel von Entwicklern von Mail.ru, in denen grundlegende Dinge gut beschrieben werden:


Allgemeine Beschreibung des Luftstroms
Verzweigung, Parametrisierung über Jinja und Kommunikation innerhalb der DAG über Xcom


Kleines Glossar:


DAG / DAG ist ein gerichteter azyklischer Graph. In diesem Fall meinen wir eine Folge von Aktionen, die voneinander abhängen und keine Zyklen bilden.
SubDAG / Sabdag - das gleiche wie DAG, jedoch innerhalb einer anderen DAG, die als Teil der übergeordneten DAG gestartet wurde (d. H. Aufgabe ist) und keinen separaten Zeitplan hat.
Operator / Operator - ein bestimmter Schritt im Tag, der eine bestimmte Aktion ausführt. Zum Beispiel PythonOperator.
Aufgabe / Aufgabe - Eine bestimmte Instanz des Bedieners beim Starten der DAG wird als kleines Quadrat in der Weboberfläche angezeigt. Zum Beispiel PythonOperator, der run_task heißt und in der DAG check_dag ausgeführt wird .


Die Idee der dynamischen Aufgabengenerierung in dag, Probleme und Nachteile


Eingabedaten:


Es gibt eine Tabelle im Orchester-Repository, nennen wir sie PKG_TABLE.
Es gibt einen Mechanismus, der der Tabelle PKG_TABLE Einträge hinzufügt, für die das Datenpaket zum Herunterladen bereit ist.


Was wir wollten:


DAG, die für herunterladbare Pakete generiert wird und mit dem Herunterladen beginnt (Spoiler: Am Ende hat sich alles herausgestellt).


Mit dem folgenden Code generieren wir einen Tag, der aus der LatestOnlyOperator-Task und ihrer abhängigen Task besteht. Diese wird erstellt, wenn die Funktion pkg_subdag_factory ausgeführt wird, die eine Liste von Paketen aus der Tabelle PKG_TABLE empfängt und mehrere PythonOperators generiert. Wenn keine Pakete zum Herunterladen vorhanden sind, wird ein DummyOperator generiert.


Sie beschlossen, die erste Version mit einem PythonOperator zu erstellen und sie mithilfe von Airflow in einen detaillierten Workflow umzuverteilen.


# -*- coding: utf-8 -*- """  DAG    """ from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.hooks.oracle_hook import OracleHook from datetime import datetime, timedelta import logging from scripts.lib import run_load, select_pkg_data def pkg_subdag_factory( oracle_hook, parent_dag_name, child_dag_name, start_date, schedule_interval, param_dict): """ ,  DAG    PythonOperator\` (1  - 1 PythonOperator)  : oracle_hook - airflow.hooks.oracle_hook.OracleHook parent_dag_name -  ""  child_dag_name -    start_date -       schedule_interval -      param_dict -     """ dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=schedule_interval, start_date=start_date, catchup=False ) logging.info('selecting pkg data...') pkg_set = select_pkg_data(oracle_hook) if len(pkg_set): logging.info('pkg_set:') logging.info(pkg_set) for pkg in pkg_set: pkg_id = pkg[1] pkg_dict = {'pkg_data_' + str(pkg_id): pkg} param_dict.update(pkg_dict) task_name = 'pkg_' + str(pkg_id) PythonOperator( task_id=task_name, python_callable=run_load, op_kwargs={ 'oracle_hook': oracle_hook, 'param_dict': param_dict, 'pkg_id': pkg_id }, retries=0, dag=dag ) else: logging.info('Undelivered packages not found') DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag) return dag interval = '*/10 * * * *' args = { 'owner': 'airflow', 'start_date': datetime(2018, 11, 12) } oracle_hook = OracleHook('ora_meta') main_dag_name = 'load' load_dag_name = 'load_packages' param_dict = { #       } main_dag = DAG( dag_id=main_dag_name, default_args=args, schedule_interval=interval, catchup=False ) subdag = SubDagOperator( subdag=pkg_subdag_factory( oracle_hook, main_dag_name, load_dag_name, args['start_date'], interval, param_dict ), task_id=load_dag_name, dag=main_dag ) #  ,       latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag) subdag.set_upstream(latest_only) 

Die folgenden Screenshots zeigen, wie dies als Ergebnis aussieht.
Aussehen der DAG:



Aussehen des Subtags bei Fehlen von Paketen zur Lieferung:



Aussehen des Subtags bei Vorhandensein von Paketen zur Lieferung:



Probleme und Nuancen


  • Catchup funktionierte nicht wie erwartet: Nach dem Einschalten des ausgeschalteten Dags kam es zu mehreren Starts (nicht für den gesamten Zeitraum des Zeitplans, sondern 2-3 gleichzeitig). Aus diesem Grund musste ich LatestOnlyOperator hinzufügen, damit alle Starts mit Ausnahme des letzten im Leerlauf erfolgen.
  • Wenn Sie ein Subdag erstellen, müssen Sie es explizit über die Befehlszeile mit dem Befehl "airflow unpause <subdag_name>" aktivieren. Andernfalls wird es nicht gestartet, und Sie müssen dies beim Erstellen jedes neuen Subdags (eines Subdags mit einem neuen Namen) tun, was die dynamische Generierung sehr unpraktisch macht . Wenn Sie in der Luftstromkonfiguration ($ airflow_home / airflow.cfg) den Parameter "dags_are_paused_at_creation" = false setzen, ist dies nicht erforderlich, kann jedoch zu unangenehmen Konsequenzen beim automatischen automatischen Starten eines neuen Tages führen - es scheint mir, dass Sie neue Tage manuell starten müssen.

In der Dokumentation heißt es: "Eine wichtige Funktion von Airflow besteht darin, dass diese DAG-Läufe atomare, idempotente Elemente sind, <...>" was bedeutet: "Es versteht sich, dass der Tag unverändert generiert wird." Aufgrund der Tatsache, dass wir diese "Schlüsselfähigkeit" verletzt haben, haben wir einige Dinge gelernt:


  • Ein leerer Tag (ohne Aufgaben) beginnt und kann nicht enden und verstopft alle möglichen Parallelen. Dies geschah, wenn zum Zeitpunkt des Starts des Dags keine Download-Pakete vorhanden waren. Um dies zu umgehen, wird ein DummyOperator erstellt.
  • Wenn während der Arbeit der Task-Tag neu generiert wird und diese Aufgabe nicht mehr im aktualisierten Tag enthalten ist, wird sie mit einer Unterbrechung des laufenden Prozesses beendet. Dies geschieht bei jedem Schritt des Shedulers, jedoch nicht häufiger als im Parameter min_file_process_interval in der Luftstromkonfiguration ($ airflow_home / airflow.cfg) angegeben. Um dies zu umgehen, haben wir die Erstellung von Task Packs nicht nur anhand des Status "Bereit zum Herunterladen", sondern auch anhand des Status "Laden in Bearbeitung" durchgeführt, damit diese während des Downloads weiterhin generiert werden.
  • Wenn die aktuelle Version des Dags keine Aufgabe enthält, die zuvor ausgeführt wurde, z. B. eine Aufgabe mit dem Namen "pkg_123", die zuvor geladen wurde und in der aktuellen Version des Dags nicht erstellt wurde, können Sie keine Statistiken zu dieser Aufgabe in der Weboberfläche anzeigen. Obwohl alle Informationen in der Luftstromdatenbank gespeichert sind und auf deren Grundlage ein externes Dashboard für alte Starts erstellt werden kann. Wenn sich die Frage nach der Häufigkeit der Aktualisierung von DAGs und der Möglichkeit zum Deaktivieren dieser DAGs stellt, können Sie hier darüber lesen.
  • Aufgrund der dynamischen Generierung von task_id ist es erforderlich, ein Wörterbuch mit Daten für alle aktuellen Pakete sowie die ID des aktuellen Pakets in jede dieser Aufgaben zu werfen, damit Sie, wenn die Funktion selbst funktioniert, die erforderlichen Daten aus demselben Wörterbuch nach Paket-ID auswählen. Andernfalls wurden alle Aufgaben für dasselbe Paket gestartet.

Ausführungsdatum in Protokollen und tatsächliche Startzeit


Ich werde mit einer weiteren Nuance von Airflow enden, die zunächst verwirrt und in anderen Artikeln nicht mit einfachen Worten beschrieben wird - Ausführungsdatum (das in allen Protokollen, in der Benutzeroberfläche usw. angezeigt wird) und die tatsächliche Startzeit. Im Prinzip befindet sich die Beschreibung in der Dokumentation des Luftstroms und in den FAQ , aber das Ergebnis ist nicht offensichtlich, so dass meines Erachtens eine Klärung erforderlich ist.


Dokumentation : "Scheduler startet Ihren Job am Ende des Zeitraums"
Ergebnis : Wenn Sie einen Tag mit einem Zeitplan erstellen, z. B. @daily, wird bei einem Lauf mit dem Ausführungsdatum "2018-01-01 00:00:00" tatsächlich "2018-02-01 00:00:00" ausgeführt.


Nützliche Links:


Nachholdokumentation
NeuesteOnlyOperator-Dokumentation
Weitere Dokumentation zu LatestOnlyOperator
Beispiel für die Verwendung von LatestOnlyOperator
Einige Nuancen
Frage zu Abhängigkeiten vom vorherigen Start
Ein kleines Beispiel zur dynamischen Erzeugung
Eine Frage zur dynamischen Erzeugung mit einer kleinen Beschreibung

Source: https://habr.com/ru/post/de435746/


All Articles