La tâche de déployer des modèles d'apprentissage automatique en production est toujours pénible, car il est très inconfortable de sortir d'un ordinateur portable jupyter confortable dans le monde de la surveillance et de la tolérance aux pannes.
Nous avons déjà écrit sur la
première itération de refactorisation du système de recommandation du cinéma en ligne ivi. Au cours de la dernière année, nous n'avons presque pas finalisé l'architecture de l'application (de global - passant seulement de python 2.7 et python 3.4 obsolète à python 3.6 «frais»), mais nous avons ajouté quelques nouveaux modèles ML et nous sommes immédiatement heurtés au problème du déploiement de nouveaux algorithmes en production. Dans l'article, je vais parler de notre expérience dans la mise en œuvre d'un outil de gestion de flux de tâches comme Apache Airflow: pourquoi l'équipe avait ce besoin, ce qui ne convenait pas à la solution existante, quelles béquilles devaient être coupées en cours de route et ce qui en était le résultat.
→ La version vidéo du rapport peut être visionnée sur YouTube (à partir de 03:00:00)
ici .
Équipe Hydra
Je vais vous parler un peu du projet: ivi, c'est plusieurs dizaines de milliers d'unités de contenu, nous avons l'un des plus grands répertoires juridiques de RuNet. La page principale de la version Web ivi est une coupe personnalisée du catalogue, qui est conçue pour fournir à l'utilisateur le contenu le plus riche et le plus pertinent en fonction de ses commentaires (vues, notes, etc.).
La partie en ligne du système de recommandation est une application backend Flask avec une charge allant jusqu'à 600 RPS. Hors ligne, le modèle est formé sur plus de 250 millions de vues de contenu par mois. Les pipelines de préparation des données pour la formation sont implémentés sur Spark, qui s'exécute au-dessus du référentiel Hive.
L'équipe compte maintenant 7 développeurs qui sont engagés à la fois dans la création de modèles et leur déploiement en production - il s'agit d'une équipe assez importante qui nécessite des outils pratiques pour gérer les flux de tâches.
Architecture hors ligne
Ci-dessous, vous voyez le diagramme d'infrastructure de flux de données pour le système de recommandation.
Deux stockages de données sont illustrés ici - Hive pour les commentaires des utilisateurs (vues, évaluations) et Postgres pour diverses informations commerciales (types de monétisation de contenu, etc.), tandis que le transfert de Postgres à Hive est ajusté. Un pack d'applications Spark aspire les données de Hive: et forme nos modèles sur ces données (ALS pour les recommandations personnelles, divers modèles collaboratifs de similitude de contenu).
Les applications Spark sont traditionnellement gérées à partir d'une machine virtuelle dédiée, que nous appelons hydra-updater en utilisant un tas de scripts cron + shell. Ce bundle a été créé dans le département des opérations ivi dans des temps immémoriaux et a très bien fonctionné. Shell-script était un point d'entrée unique pour lancer des applications spark - c'est-à -dire que chaque nouveau modèle a commencé à tourner dans la prod uniquement après que les administrateurs ont terminé ce script.
Certains des artefacts de la formation des modèles sont stockés dans HDFS pour un stockage éternel (et attendent que quelqu'un les télécharge à partir de là et les transfère vers le serveur où la partie en ligne tourne), et certains sont écrits directement du pilote Spark vers le stockage rapide Redis, que nous utilisons comme général mémoire pour plusieurs dizaines de processus python de la partie en ligne.
Une telle architecture a accumulé un certain nombre d'inconvénients au fil du temps:
Le diagramme montre que les flux de données ont une structure plutôt compliquée et compliquée - sans un outil simple et clair pour gérer ce bien, le développement et le fonctionnement se transformeront en horreur, en décadence et en souffrance.
En plus de gérer les applications spark, le script d'administration fait beaucoup de choses utiles: redémarrage des services dans la bataille, un vidage Redis et d'autres choses système. De toute évidence, sur une longue période de fonctionnement, le script a envahi de nombreuses fonctions, car chaque nouveau modèle de la nôtre a généré quelques dizaines de lignes. Le script a commencé à sembler trop surchargé en termes de fonctionnalités.Par conséquent, en tant qu'équipe du système de recommandation, nous voulions retirer quelque part une partie des fonctionnalités qui concernent le lancement et la gestion des applications Spark. À ces fins, nous avons décidé d'utiliser Airflow.
Béquilles pour Airflow
En plus de résoudre tous ces problèmes, bien sûr, nous en avons créé de nouveaux pour nous - déployer Airflow pour lancer et surveiller les applications Spark s'est avéré difficile.
La principale difficulté était que personne ne remodelerait l'ensemble de l'infrastructure pour nous, car Devops Resource est une chose rare. Pour cette raison, nous avons dû non seulement implémenter Airflow, mais l'intégrer dans le système existant, ce qui est beaucoup plus difficile à voir à partir de zéro.
Je veux parler des douleurs que nous avons rencontrées pendant le processus de mise en œuvre et des béquilles que nous avons dû entailler pour obtenir Airflow.
La première et principale douleur : comment intégrer Airflow dans un grand script shell du département des opérations.
Ici, la solution est la plus évidente - nous avons commencé à déclencher des graphiques directement à partir du script shell en utilisant le binaire airflow avec la clé trigger_dag. Avec cette approche, nous n'utilisons pas le sheduler Airflow, et en fait l'application Spark est lancée avec la même couronne - ce n'est religieusement pas très correct. Mais nous avons obtenu une intégration transparente avec une solution existante. Voici à quoi ressemble le début du script shell de notre principale application Spark, qui est historiquement appelée hydramatrices.
log "$FUNCNAME started" local RETVAL=0 export AIRFLOW_CONFIG=/opt/airflow/airflow.cfg AIRFLOW_API=api/dag_last_run/hydramatrices/all log "run /var/www/airflow/bin/airflow trigger_dag hydramatrices" /var/www/airflow/bin/airflow trigger_dag hydramatrices 2>&1 | tee -a $LOGFILE
Douleur: Le script shell du département des opérations doit en quelque sorte déterminer l'état du graphique Airflow afin de contrôler son propre flux d'exécution.
Crutch: nous avons étendu l'API REST Airflow avec un point de terminaison pour la surveillance DAG directement dans les scripts shell. Maintenant, chaque graphique a trois états: RUNNING, SUCCEED, FAILED.
En fait, après avoir démarré les calculs dans Airflow, nous interrogeons simplement régulièrement le graphique en cours d'exécution: nous bullet la requête GET pour déterminer si le DAG s'est terminé ou non. Lorsque le point de terminaison de surveillance répond de l'exécution réussie du graphique, le script shell continue d'exécuter son flux.
Je veux dire que l'API Airflow REST est juste une chose ardente qui vous permet de configurer de manière flexible vos pipelines - par exemple, vous pouvez transmettre les paramètres POST aux graphiques.
L'extension API Airflow est juste une classe Python qui ressemble Ă ceci:
import json import os from airflow import settings from airflow.models import DagBag, DagRun from flask import Blueprint, request, Response airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api') AIRFLOW_DAGS = '{}/dags'.format( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) class ApiResponse: """ GET """ STATUS_OK = 200 STATUS_NOT_FOUND = 404 def __init__(self): pass @staticmethod def standard_response(status: int, payload: dict) -> Response: json_data = json.dumps(payload) resp = Response(json_data, status=status, mimetype='application/json') return resp def success(self, payload: dict) -> Response: return self.standard_response(self.STATUS_OK, payload) def error(self, status: int, message: str) -> Response: return self.standard_response(status, {'error': message}) def not_found(self, message: str = 'Resource not found') -> Response: return self.error(self.STATUS_NOT_FOUND, message)
Nous utilisons l'API dans le script shell - nous interrogeons le point de terminaison toutes les 10 minutes:
TRIGGER=$? [ "$TRIGGER" -eq "0" ] && log "trigger airflow DAG succeeded" || { log "trigger airflow DAG failed"; return 1; } CMD="curl -s http://$HYDRA_SERVER/$AIRFLOW_API | jq .dag_last_run.state" STATE=$(eval $CMD) while [ $STATE == \"running\" ]; do log "Generating matrices in progress..." sleep 600 STATE=$(eval $CMD) done [ $STATE == \"success\" ] && RETVAL=0 || RETVAL=1 [ $RETVAL -eq 0 ] && log "$FUNCNAME succeeded" || log "$FUNCNAME failed" return $RETVAL
Douleur : si vous exécutez un travail Spark à l'aide de spark-submit en mode cluster, vous savez que les journaux dans STDOUT sont une feuille non informative avec les lignes «SPARK APPLICATION_ID IS RUNNING». Les journaux de l'application Spark elle-même peuvent être affichés, par exemple, à l'aide de la commande Yarn logs. Dans le script shell, ce problème a été résolu simplement: un tunnel SSH a été ouvert à l'une des machines du cluster et spark-submit a été exécuté en mode client pour cette machine. Dans ce cas, STDOUT aura des journaux lisibles et compréhensibles. Dans Airflow, nous avons décidé de toujours utiliser cluster-decide, et un tel nombre ne fonctionnera pas.
Crutch: une fois que spark-submit a fonctionné, nous extrayons les journaux du pilote de HDFS par application_id et les affichons dans l'interface Airflow via l'opérateur Python print (). Le seul point négatif - dans l'interface Airflow, les journaux n'apparaissent qu'après que le spark-submit a fonctionné, vous devez surveiller le temps réel dans d'autres endroits - par exemple, le museau Web YARN.
def get_logs(config: BaseConfig, app_id: str) -> None: """ :param config: :param app_id: """ hdfs = HDFSInteractor(config) logs_path = '/tmp/logs/{username}/logs/{app_id}'.format(username=config.CURRENT_USERNAME, app_id=app_id) logs_files = hdfs.files_in_folder(logs_path) logs_files = [file for file in logs_files if file[-4:] != '.tmp'] for file in logs_files: with hdfs.hdfs_client.read(os.path.join(logs_path, file), encoding='utf-8', delimiter='\n') as reader: print_line = False for line in reader: if re.search('stdout', line) and len(line) > 30: print_line = True if re.search('stderr', line): print_line = False if print_line: print(line)
Douleur : pour les testeurs et les développeurs, ce serait bien d'avoir un banc de test Airflow, mais nous économisons des ressources Devops, nous avons donc réfléchi à la façon de déployer l'environnement de test pendant longtemps.
Béquille: nous avons emballé Airflow dans un conteneur Docker et Dockerfile l'a placé directement dans le référentiel avec des tâches d'allumage. Ainsi, chaque développeur ou testeur peut augmenter son propre flux d'air sur une machine locale. En raison du fait que les applications s'exécutent en mode cluster, les ressources locales pour docker ne sont presque pas nécessaires.
Une installation locale de l'étincelle était cachée à l'intérieur du conteneur docker et de sa configuration entière via des variables d'environnement - vous n'avez plus besoin de passer plusieurs heures à configurer l'environnement. Ci-dessous, j'ai donné un exemple avec un fragment de fichier docker pour un conteneur avec Airflow, où vous pouvez voir comment Airflow est configuré à l'aide de variables d'environnement:
FROM ubuntu:16.04 ARG AIRFLOW_VERSION=1.9.0 ARG AIRFLOW_HOME ARG USERNAME=airflow ARG USER_ID ARG GROUP_ID ARG LOCALHOST ARG AIRFLOW_PORT ARG PIPENV_PATH ARG PROJECT_HYDRAMATRICES_DOCKER_PATH RUN apt-get update \ && apt-get install -y \ python3.6 \ python3.6-dev \ && update-alternatives --install /usr/bin/python3 python3.6 /usr/bin/python3.6 0 \ && apt-get -y install python3-pip RUN mv /root/.pydistutils.cf /root/.pydistutils.cfg RUN pip3 install pandas==0.20.3 \ apache-airflow==$AIRFLOW_VERSION \ psycopg2==2.7.5 \ ldap3==2.5.1 \ cryptography
Grâce à la mise en œuvre d'Airflow, nous avons obtenu les résultats suivants:
- Réduction du cycle de publication: le déploiement d'un nouveau modèle (ou d'un pipeline de préparation de données) se résume désormais à l'écriture d'un nouveau graphique Airflow, les graphiques eux-mêmes sont stockés dans le référentiel et déployés avec le code. Ce processus est entièrement entre les mains du développeur. Les administrateurs sont contents, on ne les tire plus sur des bagatelles.
- Les journaux des applications Spark qui allaient directement en enfer sont désormais stockés dans Aiflow avec une interface d'accès pratique. Vous pouvez voir les journaux de n'importe quel jour sans les sélectionner dans les répertoires HDFS.
- Le calcul qui a échoué peut être redémarré avec un seul bouton dans l'interface, c'est très pratique, même June peut le gérer.
- Vous pouvez puiser des tâches spark à partir de l'interface sans avoir à exécuter les paramètres Spark sur la machine locale. Les testeurs sont satisfaits - tous les paramètres pour que spark-submit fonctionne correctement sont déjà définis dans Dockerfile
- Petits pains standard Aiflow - planifications, redémarrage des travaux abandonnés, beaux graphiques (par exemple, temps d'exécution de l'application, statistiques des lancements réussis et infructueux).
Où aller ensuite? Nous avons maintenant un grand nombre de sources et de puits de données, dont le nombre augmentera. Les changements dans n'importe quelle classe de référentiel hydramatrices peuvent planter dans un autre pipeline (ou même dans la partie en ligne):
- Clickhouse déborde → Hive
- prétraitement des données: Hive → Hive
- déployer des modèles c2c: Hive → Redis
- préparation d'annuaires (comme le type de monétisation de contenu): Postgres → Redis
- préparation du modèle: FS local → HDFS
Dans une telle situation, nous avons vraiment besoin d'un stand pour les tests automatiques des pipelines dans la préparation des données. Cela réduira considérablement le coût des tests de modifications dans le référentiel, accélérera le déploiement de nouveaux modèles en production et augmentera considérablement le niveau d'endorphines dans les testeurs. Mais sans Airflow, il serait impossible de déployer un stand pour ce genre de test automatique!
J'ai écrit cet article pour parler de notre expérience dans la mise en œuvre d'Airflow, qui peut être utile à d'autres équipes dans une situation similaire - vous avez déjà un grand système de travail et vous voulez essayer quelque chose de nouveau, à la mode et jeune. Pas besoin d'avoir peur des mises à jour du système de travail, vous devez essayer et expérimenter - ces expériences ouvrent généralement de nouveaux horizons pour un développement ultérieur.