Presentación de Airflow para administrar Spark Jobs en ivi: esperanzas y muletas

La tarea de desplegar modelos de aprendizaje automático en la producción siempre es dolorosa, porque es muy incómodo salir de una acogedora computadora portátil Jupyter al mundo de la monitorización y la tolerancia a fallas.

Ya escribimos sobre la primera iteración de refactorizar el sistema de recomendación del cine en línea ivi. Durante el año pasado, casi no finalizamos la arquitectura de la aplicación (de global, solo pasamos de python 2.7 y python 3.4 obsoletos a python 3.6 "nuevo"), pero agregamos algunos modelos nuevos de ML e inmediatamente nos encontramos con el problema de implementar nuevos algoritmos en producción. En el artículo, contaré sobre nuestra experiencia en la implementación de una herramienta de gestión de flujo de tareas como Apache Airflow: por qué el equipo tenía esta necesidad, qué no se adaptaba a la solución existente, qué muletas tenían que cortarse en el camino y qué surgió de ella.

→ La versión de video del informe se puede ver en YouTube (a partir de las 03:00:00) aquí .




Equipo Hydra


Te contaré un poco sobre el proyecto: ivi es varias decenas de miles de unidades de contenido, tenemos uno de los directorios legales más grandes de RuNet. La página principal de la versión web de ivi es un corte personalizado del catálogo, que está diseñado para proporcionar al usuario el contenido más rico y relevante en función de sus comentarios (vistas, calificaciones, etc.).


La parte en línea del sistema de recomendación es una aplicación de fondo de Flask con una carga de hasta 600 RPS. Fuera de línea, el modelo está entrenado en más de 250 millones de vistas de contenido por mes. Los canales de preparación de datos para la capacitación se implementan en Spark, que se ejecuta en la parte superior del repositorio de Hive.

El equipo ahora tiene 7 desarrolladores que se dedican tanto a crear modelos como a implementarlos en producción; este es un equipo bastante grande que requiere herramientas convenientes para administrar los flujos de tareas.

Arquitectura sin conexión


A continuación puede ver el diagrama de infraestructura de flujo de datos para el sistema de recomendación.


Aquí se muestran dos almacenamientos de datos: Hive para comentarios de los usuarios (vistas, calificaciones) y Postgres para diversa información comercial (tipos de monetización de contenido, etc.), mientras se ajusta la transferencia de Postgres a Hive. Un paquete de aplicaciones Spark absorbe datos de Hive: y entrena a nuestros modelos con estos datos (ALS para recomendaciones personales, varios modelos colaborativos de similitud de contenido).

Las aplicaciones de Spark se han gestionado tradicionalmente desde una máquina virtual dedicada, a la que llamamos Hydra-Updater utilizando un montón de scripts cron + shell. Este paquete fue creado en el departamento de operaciones de ivi en tiempos inmemoriales y funcionó muy bien. Shell-script fue un punto de entrada único para lanzar aplicaciones de chispa, es decir, cada nuevo modelo comenzó a girar en el producto solo después de que los administradores terminaron este script.

Algunos de los artefactos de la capacitación de modelos se almacenan en HDFS para el almacenamiento eterno (y esperando que alguien los descargue de allí y los transfiera al servidor donde gira la parte en línea), y algunos se escriben directamente desde el controlador Spark al almacenamiento rápido de Redis, que usamos como general memoria para varias docenas de procesos python de la parte en línea.

Tal arquitectura ha acumulado una serie de desventajas con el tiempo:


El diagrama muestra que los flujos de datos tienen una estructura bastante complicada y complicada: sin una herramienta simple y clara para administrar este bien, el desarrollo y la operación se convertirán en horror, decadencia y sufrimiento.

Además de administrar aplicaciones de chispa, el script de administración hace muchas cosas útiles: reiniciar servicios en batalla, un volcado de Redis y otras cosas del sistema. Obviamente, durante un largo período de operación, el script ha crecido con muchas funciones, ya que cada nuevo modelo nuestro generó un par de docenas de líneas. El script comenzó a verse demasiado sobrecargado en términos de funcionalidad, por lo tanto, como equipo del sistema de recomendación, queríamos eliminar en algún lugar una parte de la funcionalidad que se refiere al lanzamiento y administración de aplicaciones Spark. Para estos fines, decidimos usar Airflow.

Muletas para flujo de aire


Además de resolver todos estos problemas, por supuesto, en la forma en que creamos otros nuevos para nosotros: implementar Airflow para iniciar y monitorear aplicaciones Spark resultó ser difícil.

La principal dificultad fue que nadie nos remodelaría toda la infraestructura, porque El recurso devops es algo escaso. Por esta razón, tuvimos que no solo implementar Airflow, sino integrarlo en el sistema existente, que es mucho más difícil de ver desde cero.

Quiero hablar sobre los dolores que encontramos durante el proceso de implementación, y las muletas que tuvimos que cortar para obtener Airflow.

El primer y principal dolor : cómo integrar Airflow en un gran script de shell del departamento de operaciones.

Aquí la solución es la más obvia: comenzamos a activar gráficos directamente desde el script de shell utilizando el binario de flujo de aire con la tecla trigger_dag. Con este enfoque, no utilizamos el programador Airflow y, de hecho, la aplicación Spark se inicia con la misma corona; esto es religiosamente poco correcto. Pero obtuvimos una integración perfecta con una solución existente. Así es como se ve el comienzo del script de shell de nuestra aplicación principal de Spark, que históricamente se llama hidramatrices.

log "$FUNCNAME started" local RETVAL=0 export AIRFLOW_CONFIG=/opt/airflow/airflow.cfg AIRFLOW_API=api/dag_last_run/hydramatrices/all log "run /var/www/airflow/bin/airflow trigger_dag hydramatrices" /var/www/airflow/bin/airflow trigger_dag hydramatrices 2>&1 | tee -a $LOGFILE 

Dolor: el script de shell del departamento de operaciones debe determinar de alguna manera el estado del gráfico Airflow para controlar su propio flujo de ejecución.

Crutch: ampliamos la API REST de Airflow con un punto final para la supervisión de DAG dentro de los scripts de shell. Ahora cada gráfico tiene tres estados: CORRER, EXITAR, FALLAR.

De hecho, después de comenzar los cálculos en Airflow, simplemente sondeamos regularmente el gráfico en ejecución: enviamos la solicitud GET para determinar si el DAG se ha completado o no. Cuando el punto final de supervisión responde sobre la ejecución exitosa del gráfico, el script de shell continúa ejecutando su flujo.
Quiero decir que la API REST de Airflow es solo una cosa ardiente que le permite configurar de manera flexible sus tuberías; por ejemplo, puede reenviar parámetros POST a gráficos.

La extensión API de Airflow es solo una clase de Python que se parece a esto:

 import json import os from airflow import settings from airflow.models import DagBag, DagRun from flask import Blueprint, request, Response airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api') AIRFLOW_DAGS = '{}/dags'.format( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) class ApiResponse: """    GET """ STATUS_OK = 200 STATUS_NOT_FOUND = 404 def __init__(self): pass @staticmethod def standard_response(status: int, payload: dict) -> Response: json_data = json.dumps(payload) resp = Response(json_data, status=status, mimetype='application/json') return resp def success(self, payload: dict) -> Response: return self.standard_response(self.STATUS_OK, payload) def error(self, status: int, message: str) -> Response: return self.standard_response(status, {'error': message}) def not_found(self, message: str = 'Resource not found') -> Response: return self.error(self.STATUS_NOT_FOUND, message) 

Usamos la API en el script de shell: sondeamos el punto final cada 10 minutos:

  TRIGGER=$? [ "$TRIGGER" -eq "0" ] && log "trigger airflow DAG succeeded" || { log "trigger airflow DAG failed"; return 1; } CMD="curl -s http://$HYDRA_SERVER/$AIRFLOW_API | jq .dag_last_run.state" STATE=$(eval $CMD) while [ $STATE == \"running\" ]; do log "Generating matrices in progress..." sleep 600 STATE=$(eval $CMD) done [ $STATE == \"success\" ] && RETVAL=0 || RETVAL=1 [ $RETVAL -eq 0 ] && log "$FUNCNAME succeeded" || log "$FUNCNAME failed" return $RETVAL 

Dolor : si alguna vez ejecuta un trabajo de Spark usando el envío de chispa en modo de clúster, entonces sabe que los registros en STDOUT son una hoja no informativa con las líneas "SPARK APPLICATION_ID IS RUNNING". Los registros de la aplicación Spark en sí podrían verse, por ejemplo, utilizando el comando yarn logs. En un script de shell, este problema se resolvió simplemente: se abrió un túnel SSH en una de las máquinas de clúster y se ejecutó el envío de chispas en modo cliente para esta máquina. En este caso, STDOUT tendrá registros legibles y comprensibles. En Airflow, decidimos usar siempre cluster-decide, y ese número no funcionará.

Muleta: después de que spark-submit ha funcionado, extraemos los registros del controlador de HDFS mediante application_id y lo mostramos en la interfaz Airflow simplemente a través del operador Python print (). Lo único negativo: en la interfaz Airflow, los registros aparecen solo después de que el envío de chispas ha funcionado, debe controlar el tiempo real en otros lugares, por ejemplo, el hocico web YARN.

 def get_logs(config: BaseConfig, app_id: str) -> None: """   :param config: :param app_id: """ hdfs = HDFSInteractor(config) logs_path = '/tmp/logs/{username}/logs/{app_id}'.format(username=config.CURRENT_USERNAME, app_id=app_id) logs_files = hdfs.files_in_folder(logs_path) logs_files = [file for file in logs_files if file[-4:] != '.tmp'] for file in logs_files: with hdfs.hdfs_client.read(os.path.join(logs_path, file), encoding='utf-8', delimiter='\n') as reader: print_line = False for line in reader: if re.search('stdout', line) and len(line) > 30: print_line = True if re.search('stderr', line): print_line = False if print_line: print(line) 

Dolor : para los probadores y desarrolladores, sería bueno tener un banco de pruebas de Airflow, pero estamos ahorrando recursos de DevOps, por lo que pensamos en cómo implementar el entorno de prueba durante mucho tiempo.

Muleta: empacamos Airflow en un contenedor acoplable, y Dockerfile lo colocó en el repositorio con trabajos de chispa. Por lo tanto, cada desarrollador o probador puede aumentar su propio flujo de aire en una máquina local. Debido al hecho de que las aplicaciones se ejecutan en modo de clúster, los recursos locales para Docker casi no son necesarios.

Una instalación local de la chispa estaba oculta dentro del contenedor acoplable y su configuración completa a través de variables de entorno: ya no necesita pasar varias horas configurando el entorno. A continuación, proporcioné un ejemplo con un fragmento de archivo acoplable para un contenedor con Airflow, donde puede ver cómo se configura Airflow utilizando variables de entorno:

 FROM ubuntu:16.04 ARG AIRFLOW_VERSION=1.9.0 ARG AIRFLOW_HOME ARG USERNAME=airflow ARG USER_ID ARG GROUP_ID ARG LOCALHOST ARG AIRFLOW_PORT ARG PIPENV_PATH ARG PROJECT_HYDRAMATRICES_DOCKER_PATH RUN apt-get update \ && apt-get install -y \ python3.6 \ python3.6-dev \ && update-alternatives --install /usr/bin/python3 python3.6 /usr/bin/python3.6 0 \ && apt-get -y install python3-pip RUN mv /root/.pydistutils.cf /root/.pydistutils.cfg RUN pip3 install pandas==0.20.3 \ apache-airflow==$AIRFLOW_VERSION \ psycopg2==2.7.5 \ ldap3==2.5.1 \ cryptography #   ,       ENV PROJECT_HYDRAMATRICES_DOCKER_PATH=${PROJECT_HYDRAMATRICES_DOCKER_PATH} ENV PIPENV_PATH=${PIPENV_PATH} ENV SPARK_HOME=/usr/lib/spark2 ENV HADOOP_CONF_DIR=$PROJECT_HYDRAMATRICES_DOCKER_PATH/etc/hadoop-conf-preprod ENV PYTHONPATH=${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib ENV PIP_NO_BINARY=numpy ENV AIRFLOW_HOME=${AIRFLOW_HOME} ENV AIRFLOW_DAGS=${AIRFLOW_HOME}/dags ENV AIRFLOW_LOGS=${AIRFLOW_HOME}/logs ENV AIRFLOW_PLUGINS=${AIRFLOW_HOME}/plugins #      Airflow (log url) BASE_URL="http://${AIRFLOW_CURRENT_HOST}:${AIRFLOW_PORT}" ; #   Airflow ENV AIRFLOW__WEBSERVER__BASE_URL=${BASE_URL} ENV AIRFLOW__WEBSERVER__ENDPOINT_URL=${BASE_URL} ENV AIRFLOW__CORE__AIRFLOW_HOME=${AIRFLOW_HOME} ENV AIRFLOW__CORE__DAGS_FOLDER=${AIRFLOW_DAGS} ENV AIRFLOW__CORE__BASE_LOG_FOLDER=${AIRFLOW_LOGS} ENV AIRFLOW__CORE__PLUGINS_FOLDER=${AIRFLOW_PLUGINS} ENV AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=${AIRFLOW_LOGS}/scheduler 

Como resultado de la implementación de Airflow, logramos los siguientes resultados:

  • Reducción del ciclo de lanzamiento: la implementación de un nuevo modelo (o una tubería de preparación de datos) ahora se reduce a escribir un nuevo gráfico de Airflow, los gráficos se almacenan en el repositorio y se implementan con el código. Este proceso está completamente en manos del desarrollador. Los administradores están contentos, ya no los tiramos de bagatelas.
  • Los registros de aplicaciones de Spark que solían ir directamente al infierno ahora se almacenan en Aiflow con una interfaz de acceso conveniente. Puede ver los registros de cualquier día sin seleccionar en los directorios HDFS.
  • El cálculo fallido puede reiniciarse con un botón en la interfaz, es muy conveniente, incluso junio puede manejarlo.
  • Puede hacer viñetas de trabajos de chispa desde la interfaz sin tener que ejecutar la configuración de Spark en la máquina local. Los probadores están contentos: todas las configuraciones para que el envío de chispas funcione correctamente ya están hechas en Dockerfile
  • Bollos estándar de Aiflow: horarios, reinicio de trabajos caídos, gráficos hermosos (por ejemplo, tiempo de ejecución de la aplicación, estadísticas de lanzamientos exitosos y no exitosos).

¿A dónde ir después? Ahora tenemos una gran cantidad de fuentes de datos y sumideros, cuyo número crecerá. Los cambios en cualquier clase de repositorio de hidramatrices pueden bloquearse en otra tubería (o incluso en la parte en línea):

  • Clickhouse se desborda → Colmena
  • preprocesamiento de datos: Hive → Hive
  • implementar modelos c2c: Colmena → Redis
  • preparación de directorios (como el tipo de monetización de contenido): Postgres → Redis
  • preparación del modelo: FS local → HDFS

En tal situación, realmente necesitamos un soporte para las pruebas automáticas de tuberías en la preparación de datos. Esto reducirá en gran medida el costo de probar los cambios en el repositorio, acelerará la implementación de nuevos modelos en producción y aumentará drásticamente el nivel de endorfinas en los probadores. ¡Pero sin Airflow, sería imposible implementar un soporte para este tipo de prueba automática!

Escribí este artículo para hablar sobre nuestra experiencia en la implementación de Airflow, que puede ser útil para otros equipos en una situación similar: ya tiene un gran sistema de trabajo y desea probar algo nuevo, moderno y juvenil. No es necesario tener miedo a las actualizaciones del sistema de trabajo, debe probar y experimentar, tales experimentos generalmente abren nuevos horizontes para un mayor desarrollo.

Source: https://habr.com/ru/post/456630/


All Articles