Bei der Vorbereitung unserer Bildungsprogramme stoßen wir regelmäßig auf Schwierigkeiten bei der Arbeit mit einigen Tools. Und in dem Moment, in dem wir ihnen begegnen, gibt es nicht immer genug Dokumentationen und Artikel, um dieses Problem zu lösen.
Dies war beispielsweise 2015 der Fall, und wir haben im Rahmen des Big Data Specialist-Programms einen Hadoop-Cluster mit Spark für 35 gleichzeitige Benutzer verwendet. Wie man es unter einem solchen Anwenderfall mit YARN kocht, war nicht klar. Nachdem sie es herausgefunden und den Weg alleine gegangen waren, machten sie einen Beitrag auf Habré und traten auch beim Moscow Spark Meetup auf .
Hintergrund
Dieses Mal werden wir über ein anderes Programm sprechen - Data Engineer . Unsere Teilnehmer bauen zwei Arten von Architektur darauf: Lambda und Kappa. In der Lamdba-Architektur wird Airflow im Rahmen der Stapelverarbeitung zum Übertragen von Protokollen von HDFS zu ClickHouse verwendet.
Im Allgemeinen ist alles gut. Lassen Sie sie ihre Pipelines bauen. Es gibt jedoch ein „Aber“: Alle unsere Programme sind im Hinblick auf den Lernprozess selbst technologisch. Um das Labor zu überprüfen, verwenden wir automatische Prüfer: Der Teilnehmer muss zu seinem persönlichen Konto gehen, auf die Schaltfläche „Prüfen“ klicken und nach einer Weile sieht er eine Art erweitertes Feedback zu dem, was er getan hat. Und in diesem Moment beginnen wir, uns unserem Problem zu nähern.
Die Überprüfung dieses Labors ist wie folgt organisiert: Wir senden ein Kontrolldatenpaket an Kafka des Teilnehmers, dann überträgt Gobblin das Datenpaket an HDFS, dann nimmt Airflow dieses Datenpaket und legt es in ClickHouse ab. Der Trick ist, dass Airflow dies nicht in Echtzeit tun sollte, sondern nach einem Zeitplan: Alle 15 Minuten nimmt es eine Reihe von Dateien und wirft sie ein.
Es stellt sich heraus, dass wir auf Anfrage des Prüfers hier und jetzt irgendwie ihre DAG selbst auslösen müssen. Beim Googeln haben wir herausgefunden, dass es für die späteren Versionen von Airflow die sogenannte experimentelle API gibt . Das Wort experimental
klingt natürlich beängstigend, aber was zu tun ist ... Plötzlich wird es hochfliegen.
Als Nächstes beschreiben wir den gesamten Weg: von der Installation von Airflow bis zur Generierung einer POST-Anforderung, die mithilfe der experimentellen API eine DAG auslöst. Wir werden mit Ubuntu 16.04 arbeiten.
1. Installieren des Luftstroms
Lassen Sie uns überprüfen, ob wir Python 3 und virtualenv haben.
$ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0
Wenn etwas davon fehlt, installieren Sie es.
Erstellen Sie nun ein Verzeichnis, in dem wir weiterhin mit Airflow arbeiten werden.
$ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $
Luftstrom installieren:
(venv) $ pip install airflow
Die Version, an der wir gearbeitet haben: 1.10.
Jetzt müssen wir das Verzeichnis airflow_home
erstellen, in dem sich die DAG-Dateien und Airflow-Plugins befinden. AIRFLOW_HOME
nach dem Erstellen des Verzeichnisses die Umgebungsvariable AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home>
Der nächste Schritt besteht darin, den Befehl auszuführen, mit dem die Datenstromdatenbank in SQLite erstellt und initialisiert wird:
(venv) $ airflow initdb
Die Datenbank wird standardmäßig in airflow.db
erstellt.
Überprüfen Sie, ob Airflow installiert ist:
$ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0
Wenn der Befehl funktioniert hat, hat Airflow seine Konfigurationsdatei airflow.cfg
in AIRFLOW_HOME
:
$ tree . ├── airflow.cfg └── unittests.cfg
Airflow verfügt über eine Webschnittstelle. Es kann durch Ausführen des folgenden Befehls gestartet werden:
(venv) $ airflow webserver --port 8081
Jetzt können Sie in einem Browser auf Port 8081 auf dem Host, auf dem Airflow gestartet wurde, auf die Weboberfläche zugreifen, z. B.: <hostname:8081>
.
2. Arbeiten mit der experimentellen API
In diesem Fall ist Airflow konfiguriert und betriebsbereit. Wir müssen jedoch auch die experimentelle API ausführen. Unsere Prüfer sind in Python geschrieben, sodass alle Anfragen mithilfe der requests
bearbeitet werden.
Tatsächlich funktioniert die API bereits für einfache Abfragen. Mit einer solchen Anforderung können Sie beispielsweise den Betrieb testen:
>>> import requests >>> host = <your hostname> >>> airflow_port = 8081
Wenn Sie eine solche Nachricht als Antwort erhalten haben, bedeutet dies, dass alles funktioniert.
Wenn wir jedoch die DAG aktivieren möchten, werden wir feststellen, dass diese Art von Anfrage nicht ohne Authentifizierung gestellt werden kann.
Dazu müssen Sie eine Reihe von Aktionen ausführen.
Zunächst müssen Sie dies zur Konfiguration hinzufügen:
[api] auth_backend = airflow.contrib.auth.backends.password_auth
Anschließend müssen Sie Ihren Benutzer mit Administratorrechten erstellen:
>>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
Anschließend müssen Sie einen Benutzer mit normalen Rechten erstellen, der einen DAG-Trigger ausführen darf.
>>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
Jetzt ist alles fertig.
3. Starten einer POST-Anforderung
Die POST-Anfrage selbst sieht folgendermaßen aus:
>>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n'
Anfrage erfolgreich bearbeitet.
Dementsprechend geben wir der DAG etwas Zeit für die Verarbeitung und stellen eine Anfrage an die ClickHouse-Tabelle, um zu versuchen, ein Steuerdatenpaket abzufangen.
Überprüfung abgeschlossen.