引入Airflow在ivi中管理Spark作业:希望与拐杖

在生产中部署机器学习模型的任务总是很痛苦,因为要离开舒适的Jupyter笔记本进入监视和容错的世界是非常不舒服的。

我们已经写过关于重构在线电影院ivi推荐系统的第一次迭代的文章 。 在过去的一年中,我们几乎没有最终确定应用程序体系结构(从全局-仅从过时的python 2.7和python 3.4过渡到“新鲜的” python 3.6),但是我们添加了一些新的ML模型,并立即遇到了在生产中推出新算法的问题。 在本文中,我将介绍我们在实现诸如Apache Airflow之类的任务流管理工具方面的经验:为什么团队有此需求,不适合现有解决方案的东西,必须解决的关键问题以及随之而来的事情。

→可以在YouTube上观看报告的视频版本(从03:00:00开始)。




九头蛇队


我将稍微介绍一下该项目:ivi是数以万计的内容单元,我们拥有RuNet中最大的法律目录之一。 ivi网络版本的主页是从目录中进行的个性化剪裁,旨在根据用户的反馈(视图,等级等)为用户提供最丰富,最相关的内容。


推荐系统的在线部分是Flask后端应用程序,负载高达600 RPS。 在离线状态下,该模型每月接受超过2.5亿次内容观看的培训。 在Hive存储库顶部运行的Spark上实现了用于培训的数据准备管道。

该团队现在有7个开发人员,他们既要创建模型又要把它们推广到生产中-这是一个相当大的团队,需要方便的工具来管理任务流。

离线架构


在下面,您可以看到推荐系统的数据流基础结构图。


这里描述了两个数据存储-Hive用于用户反馈(视图,评级)和Postgres用于各种业务信息(内容获利的类型等),而从Postgres到Hive的传输已进行了调整。 一包Spark应用程序从Hive吸取数据:并在此数据上训练我们的模型(用于个人推荐的ALS,各种内容相似性的协作模型)。

传统上,Spark应用程序是通过专用虚拟机进行管理的,我们使用一堆cron + shell脚本将其称为hydra-updater。 这个捆绑包是在ivi运营部门创建的,而且时间紧迫。 Shell脚本是启动Spark应用程序的单个入口点-也就是说,每个新模型只有在管理员完成此脚本后才开始在产品中运行。

模型训练的某些工件存储在HDFS中以进行永久存储(并等待有人从那里下载它们并传输到在线部分正在旋转的服务器中),而另一些则直接从Spark驱动程序写入Redis快速存储中,我们通常将其使用在线部分的数十个python进程的内存。

随着时间的流逝,这种架构累积了许多缺点:


该图显示数据流具有相当复杂的结构-如果没有简单明了的工具来管理这种产品,开发和操作将变成恐怖,衰败和痛苦。

除了管理Spark应用程序外,管理脚本还执行许多有用的操作:在战斗中重新启动服务,Redis转储和其他系统功能。 显然,在长期的运行过程中,该脚本具有许多功能,因为我们的每个新模型都在其中生成了几十行。 在功能方面,该脚本开始显得过于繁琐,因此,作为推荐系统的团队,我们希望在某些地方删除与启动和管理Spark应用程序有关的功能。 为此,我们决定使用Airflow。

拐杖


当然,除了解决所有这些问题外,在我们为自己创建新问题的方式上,部署Airflow来启动和监视Spark应用程序也很困难。

主要的困难是没有人会为我们重塑整个基础架构,因为 开发资源是一件稀缺的事情。 因此,我们不仅必须实施Airflow,还必须将其集成到现有系统中,这从头开始很难看。

我想谈谈我们在实施过程中遇到的痛苦,以及为了获得Airflow而必须付出的艰辛。

第一个也是主要的难题 :如何将Airflow集成到运营部门的大型Shell脚本中。

在这里,解决方案是最明显的-我们开始使用气流二进制文件和trigger_dag键直接从Shell脚本触发图形。 通过这种方法,我们不使用Airflow sheduler,实际上,Spark应用程序是用相同的表冠启动的-从宗教上讲这不是很正确。 但是我们与现有解决方案实现了无缝集成。 这是从我们主要的Spark应用程序的shell脚本开始的样子,该脚本以前称为hydramatrices。

log "$FUNCNAME started" local RETVAL=0 export AIRFLOW_CONFIG=/opt/airflow/airflow.cfg AIRFLOW_API=api/dag_last_run/hydramatrices/all log "run /var/www/airflow/bin/airflow trigger_dag hydramatrices" /var/www/airflow/bin/airflow trigger_dag hydramatrices 2>&1 | tee -a $LOGFILE 

痛苦:运营部门外壳脚本必须以某种方式确定气流图的状态,以便控制其自身的执行流程。

拐杖:我们在外壳程序脚本中使用了DAG监视端点来扩展了Airflow REST API。 现在,每个图形都具有三种状态:运行,成功,失败。

实际上,在Airflow中开始计算之后,我们只需定期轮询运行中的图形:我们将GET请求设为项目符号,以确定DAG是否已完成。 当监视端点对图的成功执行作出响应时,shell脚本将继续执行其流程。
我想说的是,Airflow REST API只是一个火热的事情,它允许您灵活地配置管道-例如,您可以将POST参数转发到图形。

Airflow API扩展仅仅是一个Python类,看起来像这样:

 import json import os from airflow import settings from airflow.models import DagBag, DagRun from flask import Blueprint, request, Response airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api') AIRFLOW_DAGS = '{}/dags'.format( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) class ApiResponse: """    GET """ STATUS_OK = 200 STATUS_NOT_FOUND = 404 def __init__(self): pass @staticmethod def standard_response(status: int, payload: dict) -> Response: json_data = json.dumps(payload) resp = Response(json_data, status=status, mimetype='application/json') return resp def success(self, payload: dict) -> Response: return self.standard_response(self.STATUS_OK, payload) def error(self, status: int, message: str) -> Response: return self.standard_response(status, {'error': message}) def not_found(self, message: str = 'Resource not found') -> Response: return self.error(self.STATUS_NOT_FOUND, message) 

我们在外壳程序脚本中使用API​​-我们每10分钟轮询一次端点:

  TRIGGER=$? [ "$TRIGGER" -eq "0" ] && log "trigger airflow DAG succeeded" || { log "trigger airflow DAG failed"; return 1; } CMD="curl -s http://$HYDRA_SERVER/$AIRFLOW_API | jq .dag_last_run.state" STATE=$(eval $CMD) while [ $STATE == \"running\" ]; do log "Generating matrices in progress..." sleep 600 STATE=$(eval $CMD) done [ $STATE == \"success\" ] && RETVAL=0 || RETVAL=1 [ $RETVAL -eq 0 ] && log "$FUNCNAME succeeded" || log "$FUNCNAME failed" return $RETVAL 

痛苦 :如果您曾经在集群模式下使用spark-submit运行Spark作业,那么您知道STDOUT中的日志是一个没有信息的工作表,其行为“ SPARK APPLICATION_ID IS RUNNING”。 例如,可以使用yarn yarn命令查看Spark应用程序本身的日志。 在shell脚本中,此问题得以简单解决:向其中一台群集计算机打开了SSH隧道,并在客户端模式下为此计算机执行了spark-submit。 在这种情况下,STDOUT将具有可读且可理解的日志。 在Airflow中,我们决定始终使用cluster-decide,而这样的数字将不起作用。

拐杖:提交火花后,我们通过application_id从HDFS中获取驱动程序日志,并通过Python print()运算符将其显示在Airflow界面中。 唯一的缺点-在Airflow界面中,日志仅在执行火花提交后才显示,您必须在其他地方监视实时-例如,YARN网络枪口。

 def get_logs(config: BaseConfig, app_id: str) -> None: """   :param config: :param app_id: """ hdfs = HDFSInteractor(config) logs_path = '/tmp/logs/{username}/logs/{app_id}'.format(username=config.CURRENT_USERNAME, app_id=app_id) logs_files = hdfs.files_in_folder(logs_path) logs_files = [file for file in logs_files if file[-4:] != '.tmp'] for file in logs_files: with hdfs.hdfs_client.read(os.path.join(logs_path, file), encoding='utf-8', delimiter='\n') as reader: print_line = False for line in reader: if re.search('stdout', line) and len(line) > 30: print_line = True if re.search('stderr', line): print_line = False if print_line: print(line) 

痛苦 :对于测试人员和开发人员来说,拥有一个Airflow测试平台会很好,但是我们节省了devops资源,因此很长一段时间以来,我们一直在考虑如何部署测试环境。

拐杖:我们将Airflow打包在一个Docker容器中,而Dockerfile将它放置在带有Spark作业的存储库中。 因此,每个开发人员或测试人员都可以在本地计算机上提高自己的气流。 由于应用程序以集群模式运行,因此几乎不需要docker的本地资源。

spark的本地安装通过环境变量隐藏在docker容器及其整个配置中-您不再需要花费几个小时来设置环境。 下面我给出了一个示例,其中包含带有Airflow的容器的docker文件片段,您可以在其中看到如何使用环境变量配置Airflow:

 FROM ubuntu:16.04 ARG AIRFLOW_VERSION=1.9.0 ARG AIRFLOW_HOME ARG USERNAME=airflow ARG USER_ID ARG GROUP_ID ARG LOCALHOST ARG AIRFLOW_PORT ARG PIPENV_PATH ARG PROJECT_HYDRAMATRICES_DOCKER_PATH RUN apt-get update \ && apt-get install -y \ python3.6 \ python3.6-dev \ && update-alternatives --install /usr/bin/python3 python3.6 /usr/bin/python3.6 0 \ && apt-get -y install python3-pip RUN mv /root/.pydistutils.cf /root/.pydistutils.cfg RUN pip3 install pandas==0.20.3 \ apache-airflow==$AIRFLOW_VERSION \ psycopg2==2.7.5 \ ldap3==2.5.1 \ cryptography #   ,       ENV PROJECT_HYDRAMATRICES_DOCKER_PATH=${PROJECT_HYDRAMATRICES_DOCKER_PATH} ENV PIPENV_PATH=${PIPENV_PATH} ENV SPARK_HOME=/usr/lib/spark2 ENV HADOOP_CONF_DIR=$PROJECT_HYDRAMATRICES_DOCKER_PATH/etc/hadoop-conf-preprod ENV PYTHONPATH=${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib ENV PIP_NO_BINARY=numpy ENV AIRFLOW_HOME=${AIRFLOW_HOME} ENV AIRFLOW_DAGS=${AIRFLOW_HOME}/dags ENV AIRFLOW_LOGS=${AIRFLOW_HOME}/logs ENV AIRFLOW_PLUGINS=${AIRFLOW_HOME}/plugins #      Airflow (log url) BASE_URL="http://${AIRFLOW_CURRENT_HOST}:${AIRFLOW_PORT}" ; #   Airflow ENV AIRFLOW__WEBSERVER__BASE_URL=${BASE_URL} ENV AIRFLOW__WEBSERVER__ENDPOINT_URL=${BASE_URL} ENV AIRFLOW__CORE__AIRFLOW_HOME=${AIRFLOW_HOME} ENV AIRFLOW__CORE__DAGS_FOLDER=${AIRFLOW_DAGS} ENV AIRFLOW__CORE__BASE_LOG_FOLDER=${AIRFLOW_LOGS} ENV AIRFLOW__CORE__PLUGINS_FOLDER=${AIRFLOW_PLUGINS} ENV AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=${AIRFLOW_LOGS}/scheduler 

通过实施Airflow,我们获得了以下结果:

  • 缩短了发布周期:推出新模型(或数据准备管道)现在可以编写新的Airflow图,这些图本身存储在存储库中并与代码一起部署。 此过程完全在开发人员手中。 管理员很高兴,我们不再为琐事而烦恼。
  • 曾经直奔地狱的Spark应用程序日志现在通过方便的访问界面存储在Aiflow中。 您可以查看任何一天的日志,而无需选择HDFS目录。
  • 可以通过界面中的一个按钮重新启动失败的计算,这非常方便,即使June也可以处理它。
  • 您可以从界面中执行Spark作业,而不必在本地计算机上运行Spark设置。 测试人员很高兴-已在Dockerfile中进行了火花提交正常工作的所有设置
  • Aiflow标准包子-计划,重新启动失败的作业,精美的图表(例如,应用程序执行时间,启动成功与失败的统计信息)。

接下来要去哪里? 现在,我们拥有大量的数据源和接收器,并且其数量将不断增长。 任何水文计量库类中的更改都可能在另一个管道(甚至在在线部分)中崩溃:

  • Clickhouse溢出→Hive
  • 数据预处理:Hive→Hive
  • 部署c2c模型:Hive→Redis
  • 准备目录(例如内容货币化的类型):Postgres→Redis
  • 模型准备:本地FS→HDFS

在这种情况下,我们确实需要一个在数据准备中自动测试管道的支架。 这将大大降低测试存储库中更改的成本,加快在生产中推出新模型的速度,并显着提高测试人员中内啡肽的水平。 但是如果没有气流,就不可能部署这种自动测试的支架!

我写这篇文章是为了介绍我们在实施Airflow方面的经验,这对于处于类似情况的其他团队可能很有用-您已经拥有庞大的工作系统,并且想要尝试一些新颖,时尚和年轻的事物。 无需担心会对工作系统进行任何更新,您需要尝试一下-这种实验通常为进一步的开发打开了新的视野。

Source: https://habr.com/ru/post/zh-CN456630/


All Articles