Einführung von Airflow zur Verwaltung von Spark-Jobs in ivi: Hoffnungen und Krücken

Die Aufgabe, Modelle für maschinelles Lernen in der Produktion einzusetzen, ist immer schmerzhaft und schmerzhaft, da es sehr unangenehm ist, aus einem gemütlichen Jupyter-Notebook in die Welt der Überwachung und Fehlertoleranz zu gelangen.

Wir haben bereits über die erste Iteration der Überarbeitung des Empfehlungssystems des Online-Kinos ivi geschrieben. Im vergangenen Jahr haben wir die Anwendungsarchitektur fast nicht fertiggestellt (von global - nur von veraltetem Python 2.7 und Python 3.4 zu „frischem“ Python 3.6), aber wir haben einige neue ML-Modelle hinzugefügt und sind sofort auf das Problem gestoßen, neue Algorithmen in der Produktion einzuführen. In dem Artikel werde ich über unsere Erfahrungen bei der Implementierung eines solchen Tools für das Task-Flow-Management wie Apache Airflow berichten: Warum das Team diesen Bedarf hatte, was nicht zu der vorhandenen Lösung passte, welche Krücken auf dem Weg geschnitten werden mussten und was daraus wurde.

→ Die Videoversion des Berichts kann hier auf YouTube (ab 03:00:00 Uhr) angesehen werden .




Hydra Team


Ich erzähle Ihnen ein wenig über das Projekt: ivi besteht aus mehreren Zehntausenden von Inhaltseinheiten, wir haben eines der größten legalen Verzeichnisse in RuNet. Die Hauptseite der ivi-Webversion ist ein personalisierter Ausschnitt aus dem Katalog, der dem Benutzer basierend auf seinem Feedback (Ansichten, Bewertungen usw.) den umfangreichsten und relevantesten Inhalt bietet.


Der Online-Teil des Empfehlungssystems ist eine Flask-Backend-Anwendung mit einer Last von bis zu 600 RPS. Offline wird das Modell auf mehr als 250 Millionen Inhaltsaufrufe pro Monat trainiert. Die Datenaufbereitungs-Pipelines für das Training sind in Spark implementiert, das über dem Hive-Repository ausgeführt wird.

Das Team hat jetzt 7 Entwickler, die sowohl Modelle erstellen als auch in die Produktion einführen - dies ist ein ziemlich großes Team, das praktische Tools für die Verwaltung von Aufgabenabläufen benötigt.

Offline-Architektur


Unten sehen Sie das Diagramm der Datenflussinfrastruktur für das Empfehlungssystem.


Hier sind zwei Datenspeicher dargestellt: Hive für Benutzerfeedback (Ansichten, Bewertungen) und Postgres für verschiedene Geschäftsinformationen (Arten der Monetarisierung von Inhalten usw.), während die Übertragung von Postgres zu Hive angepasst wurde. Ein Paket von Spark-Anwendungen saugt Daten aus Hive: und schult unsere Modelle anhand dieser Daten (ALS für persönliche Empfehlungen, verschiedene kollaborative Modelle für die Ähnlichkeit von Inhalten).

Spark-Anwendungen werden traditionell von einer dedizierten virtuellen Maschine verwaltet, die wir Hydra-Updater mit einer Reihe von Cron + Shell-Skripten nennen. Dieses Bundle wurde seit jeher in der ivi-Betriebsabteilung erstellt und hat hervorragend funktioniert. Das Shell-Skript war ein einziger Einstiegspunkt für das Starten von Spark-Anwendungen - das heißt, jedes neue Modell begann sich erst im Produkt zu drehen, nachdem die Administratoren dieses Skript fertiggestellt hatten.

Einige der Artefakte des Modelltrainings werden in HDFS zur ewigen Speicherung gespeichert (und warten darauf, dass jemand sie von dort herunterlädt und auf den Server überträgt, auf dem sich der Online-Teil dreht), und andere werden direkt vom Spark-Treiber in den Redis-Schnellspeicher geschrieben, den wir allgemein verwenden Speicher für mehrere Dutzend Python-Prozesse des Online-Teils.

Eine solche Architektur hat im Laufe der Zeit eine Reihe von Nachteilen angehäuft:


Das Diagramm zeigt, dass Datenflüsse eine ziemlich komplizierte und komplizierte Struktur haben - ohne ein einfaches und klares Werkzeug zur Verwaltung dieses Gutes werden Entwicklung und Betrieb zu Horror, Verfall und Leiden.

Neben der Verwaltung von Spark-Anwendungen bietet das Administrationsskript viele nützliche Funktionen: Neustarten von Diensten im Kampf, Redis-Dump und andere Systemaufgaben. Offensichtlich ist das Skript über einen langen Zeitraum mit vielen Funktionen überfüllt, da jedes neue Modell von uns ein paar Dutzend Zeilen darin generiert hat. Das Skript sah in Bezug auf die Funktionalität zu überladen aus. Daher wollten wir als Team des Empfehlungssystems irgendwo einen Teil der Funktionalität entfernen, die das Starten und Verwalten von Spark-Anwendungen betrifft. Für diese Zwecke haben wir uns für Airflow entschieden.

Krücken für Luftstrom


Neben der Lösung all dieser Probleme haben wir natürlich auch neue für uns erstellt. Die Bereitstellung von Airflow zum Starten und Überwachen von Spark-Anwendungen erwies sich als schwierig.

Die Hauptschwierigkeit bestand darin, dass niemand die gesamte Infrastruktur für uns umbauen würde, weil Devops Ressource ist eine knappe Sache. Aus diesem Grund mussten wir Airflow nicht nur implementieren, sondern in das bestehende System integrieren, was von Grund auf viel schwieriger zu erkennen ist.

Ich möchte über die Schmerzen sprechen, die wir während des Implementierungsprozesses hatten, und über die Krücken, die wir einschlagen mussten, um den Luftstrom zu erhalten.

Der erste und wichtigste Punkt : Wie integriert man Airflow in ein großes Shell-Skript der Betriebsabteilung?

Hier ist die Lösung am offensichtlichsten: Wir haben begonnen, Diagramme direkt aus dem Shell-Skript mithilfe der Luftstrom-Binärdatei mit dem Schlüssel trigger_dag auszulösen. Bei diesem Ansatz verwenden wir nicht den Airflow-Sheduler, und tatsächlich wird die Spark-Anwendung mit derselben Krone gestartet - dies ist religiös nicht sehr korrekt. Wir haben jedoch eine nahtlose Integration in eine vorhandene Lösung erhalten. So sieht der Start aus dem Shell-Skript unserer Hauptanwendung Spark aus, die historisch als Hydramatrices bezeichnet wurde.

log "$FUNCNAME started" local RETVAL=0 export AIRFLOW_CONFIG=/opt/airflow/airflow.cfg AIRFLOW_API=api/dag_last_run/hydramatrices/all log "run /var/www/airflow/bin/airflow trigger_dag hydramatrices" /var/www/airflow/bin/airflow trigger_dag hydramatrices 2>&1 | tee -a $LOGFILE 

Schmerz: Das Shell-Skript der Betriebsabteilung muss irgendwie den Status des Luftstromdiagramms bestimmen, um seinen eigenen Ausführungsfluss zu steuern.

Crutch: Wir haben die Airflow REST-API um einen Endpunkt für die DAG-Überwachung direkt in Shell-Skripten erweitert. Jetzt hat jeder Graph drei Zustände: RUNNING, SUCCEED, FAILED.

Nachdem wir die Berechnungen in Airflow gestartet haben, rufen wir einfach regelmäßig das laufende Diagramm ab: Wir schreiben die GET-Anforderung auf, um festzustellen, ob die DAG abgeschlossen ist oder nicht. Wenn der Überwachungsendpunkt auf die erfolgreiche Ausführung des Diagramms antwortet, führt das Shell-Skript seinen Ablauf weiter aus.
Ich möchte sagen, dass die Airflow REST-API nur eine feurige Sache ist, mit der Sie Ihre Pipelines flexibel konfigurieren können - zum Beispiel können Sie POST-Parameter an Diagramme weiterleiten.

Die Airflow-API-Erweiterung ist nur eine Python-Klasse, die ungefähr so ​​aussieht:

 import json import os from airflow import settings from airflow.models import DagBag, DagRun from flask import Blueprint, request, Response airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api') AIRFLOW_DAGS = '{}/dags'.format( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) class ApiResponse: """    GET """ STATUS_OK = 200 STATUS_NOT_FOUND = 404 def __init__(self): pass @staticmethod def standard_response(status: int, payload: dict) -> Response: json_data = json.dumps(payload) resp = Response(json_data, status=status, mimetype='application/json') return resp def success(self, payload: dict) -> Response: return self.standard_response(self.STATUS_OK, payload) def error(self, status: int, message: str) -> Response: return self.standard_response(status, {'error': message}) def not_found(self, message: str = 'Resource not found') -> Response: return self.error(self.STATUS_NOT_FOUND, message) 

Wir verwenden die API im Shell-Skript - wir fragen den Endpunkt alle 10 Minuten ab:

  TRIGGER=$? [ "$TRIGGER" -eq "0" ] && log "trigger airflow DAG succeeded" || { log "trigger airflow DAG failed"; return 1; } CMD="curl -s http://$HYDRA_SERVER/$AIRFLOW_API | jq .dag_last_run.state" STATE=$(eval $CMD) while [ $STATE == \"running\" ]; do log "Generating matrices in progress..." sleep 600 STATE=$(eval $CMD) done [ $STATE == \"success\" ] && RETVAL=0 || RETVAL=1 [ $RETVAL -eq 0 ] && log "$FUNCNAME succeeded" || log "$FUNCNAME failed" return $RETVAL 

Schmerz : Wenn Sie jemals einen Spark-Job mit Spark-Submit im Cluster-Modus ausführen, wissen Sie, dass die Protokolle in STDOUT ein nicht informatives Blatt mit den Zeilen "SPARK APPLICATION_ID IS RUNNING" sind. Die Protokolle der Spark-Anwendung selbst können beispielsweise mit dem Befehl Garnprotokolle angezeigt werden. In einem Shell-Skript wurde dieses Problem einfach gelöst: Ein SSH-Tunnel wurde für einen der Cluster-Computer geöffnet und die Funkenübermittlung wurde im Client-Modus für diesen Computer ausgeführt. In diesem Fall verfügt STDOUT über lesbare und verständliche Protokolle. In Airflow haben wir uns für die Cluster-Entscheidung entschieden, und eine solche Nummer funktioniert nicht.

Crutch: Nachdem Spark-Submit funktioniert hat, ziehen wir die Treiberprotokolle mit application_id aus HDFS und zeigen sie in der Airflow-Oberfläche einfach über den Python print () -Operator an. Das einzig Negative - in der Airflow-Oberfläche werden die Protokolle erst angezeigt, nachdem die Funkenübermittlung funktioniert hat. Sie müssen die Echtzeit an anderen Stellen verfolgen - beispielsweise an der YARN-Webmündung.

 def get_logs(config: BaseConfig, app_id: str) -> None: """   :param config: :param app_id: """ hdfs = HDFSInteractor(config) logs_path = '/tmp/logs/{username}/logs/{app_id}'.format(username=config.CURRENT_USERNAME, app_id=app_id) logs_files = hdfs.files_in_folder(logs_path) logs_files = [file for file in logs_files if file[-4:] != '.tmp'] for file in logs_files: with hdfs.hdfs_client.read(os.path.join(logs_path, file), encoding='utf-8', delimiter='\n') as reader: print_line = False for line in reader: if re.search('stdout', line) and len(line) > 30: print_line = True if re.search('stderr', line): print_line = False if print_line: print(line) 

Schmerz : Für Tester und Entwickler wäre es schön, einen Airflow-Prüfstand zu haben, aber wir sparen Entwicklungsressourcen, daher haben wir lange darüber nachgedacht, wie die Testumgebung bereitgestellt werden kann.

Crutch: Wir haben Airflow in einen Docker-Container gepackt und Dockerfile hat es mit Spark-Jobs direkt in das Repository gestellt. Somit kann jeder Entwickler oder Tester seinen eigenen Luftstrom auf einem lokalen Computer erhöhen. Aufgrund der Tatsache, dass Anwendungen im Cluster-Modus ausgeführt werden, sind lokale Ressourcen für Docker fast nicht erforderlich.

Eine lokale Installation des Funkens wurde im Docker-Container und seiner gesamten Konfiguration über Umgebungsvariablen versteckt - Sie müssen nicht mehr mehrere Stunden damit verbringen, die Umgebung einzurichten. Unten habe ich ein Beispiel mit einem Docker-Dateifragment für einen Container mit Airflow gegeben, in dem Sie sehen können, wie Airflow mithilfe von Umgebungsvariablen konfiguriert wird:

 FROM ubuntu:16.04 ARG AIRFLOW_VERSION=1.9.0 ARG AIRFLOW_HOME ARG USERNAME=airflow ARG USER_ID ARG GROUP_ID ARG LOCALHOST ARG AIRFLOW_PORT ARG PIPENV_PATH ARG PROJECT_HYDRAMATRICES_DOCKER_PATH RUN apt-get update \ && apt-get install -y \ python3.6 \ python3.6-dev \ && update-alternatives --install /usr/bin/python3 python3.6 /usr/bin/python3.6 0 \ && apt-get -y install python3-pip RUN mv /root/.pydistutils.cf /root/.pydistutils.cfg RUN pip3 install pandas==0.20.3 \ apache-airflow==$AIRFLOW_VERSION \ psycopg2==2.7.5 \ ldap3==2.5.1 \ cryptography #   ,       ENV PROJECT_HYDRAMATRICES_DOCKER_PATH=${PROJECT_HYDRAMATRICES_DOCKER_PATH} ENV PIPENV_PATH=${PIPENV_PATH} ENV SPARK_HOME=/usr/lib/spark2 ENV HADOOP_CONF_DIR=$PROJECT_HYDRAMATRICES_DOCKER_PATH/etc/hadoop-conf-preprod ENV PYTHONPATH=${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib ENV PIP_NO_BINARY=numpy ENV AIRFLOW_HOME=${AIRFLOW_HOME} ENV AIRFLOW_DAGS=${AIRFLOW_HOME}/dags ENV AIRFLOW_LOGS=${AIRFLOW_HOME}/logs ENV AIRFLOW_PLUGINS=${AIRFLOW_HOME}/plugins #      Airflow (log url) BASE_URL="http://${AIRFLOW_CURRENT_HOST}:${AIRFLOW_PORT}" ; #   Airflow ENV AIRFLOW__WEBSERVER__BASE_URL=${BASE_URL} ENV AIRFLOW__WEBSERVER__ENDPOINT_URL=${BASE_URL} ENV AIRFLOW__CORE__AIRFLOW_HOME=${AIRFLOW_HOME} ENV AIRFLOW__CORE__DAGS_FOLDER=${AIRFLOW_DAGS} ENV AIRFLOW__CORE__BASE_LOG_FOLDER=${AIRFLOW_LOGS} ENV AIRFLOW__CORE__PLUGINS_FOLDER=${AIRFLOW_PLUGINS} ENV AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=${AIRFLOW_LOGS}/scheduler 

Durch die Implementierung von Airflow haben wir folgende Ergebnisse erzielt:

  • Reduzierter Release-Zyklus: Beim Rollout eines neuen Modells (oder einer neuen Datenaufbereitungs-Pipeline) muss nun ein neues Airflow-Diagramm geschrieben werden. Die Diagramme selbst werden im Repository gespeichert und mit dem Code bereitgestellt. Dieser Prozess liegt vollständig in den Händen des Entwicklers. Admins sind glücklich, wir ziehen sie nicht mehr an Kleinigkeiten.
  • Spark-Anwendungsprotokolle, die früher direkt zur Hölle gingen, werden jetzt in Aiflow mit einer praktischen Zugriffsoberfläche gespeichert. Sie können die Protokolle für jeden Tag anzeigen, ohne sie in HDFS-Verzeichnissen auszuwählen.
  • Die fehlgeschlagene Berechnung kann mit einer Schaltfläche in der Benutzeroberfläche neu gestartet werden. Dies ist sehr praktisch, selbst June kann damit umgehen.
  • Sie können Spark-Jobs über die Benutzeroberfläche aufzeichnen, ohne die Spark-Einstellungen auf dem lokalen Computer ausführen zu müssen. Tester sind zufrieden - alle Einstellungen für die ordnungsgemäße Funktion von Spark-Submit wurden bereits in Dockerfile vorgenommen
  • Aiflow-Standardbrötchen - Zeitpläne, Neustart fehlgeschlagener Jobs, schöne Diagramme (z. B. Ausführungszeit der Anwendung, Statistiken über erfolgreiche und erfolglose Starts).

Wohin als nächstes? Jetzt haben wir eine große Anzahl von Datenquellen und -senken, deren Anzahl zunehmen wird. Änderungen in einer Hydramatrices-Repository-Klasse können in einer anderen Pipeline (oder sogar im Online-Teil) abstürzen:

  • Clickhouse läuft über → Hive
  • Datenvorverarbeitung: Hive → Hive
  • Bereitstellen von c2c-Modellen: Hive → Redis
  • Erstellung von Verzeichnissen (wie die Art der Monetarisierung von Inhalten): Postgres → Redis
  • Modellvorbereitung: Lokale FS → HDFS

In einer solchen Situation brauchen wir wirklich einen Stand für das automatische Testen von Pipelines bei der Datenaufbereitung. Dies wird die Kosten für das Testen von Änderungen im Repository erheblich senken, die Einführung neuer Modelle in der Produktion beschleunigen und den Endorphinspiegel in Testern drastisch erhöhen. Aber ohne Airflow wäre es unmöglich, einen Ständer für diese Art von Autotest aufzustellen!

Ich habe diesen Artikel geschrieben, um über unsere Erfahrungen bei der Implementierung von Airflow zu berichten, die für andere Teams in einer ähnlichen Situation nützlich sein können. Sie haben bereits ein großes Arbeitssystem und möchten etwas Neues, Modisches und Jugendliches ausprobieren. Sie müssen keine Angst vor Aktualisierungen des Arbeitssystems haben, sondern müssen versuchen, zu experimentieren. Solche Experimente eröffnen normalerweise neue Horizonte für die weitere Entwicklung.

Source: https://habr.com/ru/post/de456630/


All Articles