A tarefa de implantar modelos de aprendizado de máquina na produção é sempre dolorosa, porque é muito desconfortável sair de um notebook jupyter acolhedor para o mundo do monitoramento e da tolerância a falhas.
Já escrevemos sobre a
primeira iteração de refatoração do sistema de recomendação do cinema online ivi. No ano passado, quase não finalizamos a arquitetura do aplicativo (do global - apenas passando do obsoleto python 2.7 e python 3.4 para o "fresh" python 3.6), mas adicionamos alguns novos modelos de ML e imediatamente enfrentamos o problema de implantar novos algoritmos na produção. No artigo, contarei sobre nossa experiência na implementação de uma ferramenta de gerenciamento de fluxo de tarefas como o Apache Airflow: por que a equipe teve essa necessidade, o que não se adequava à solução existente, quais muletas tiveram que ser cortadas ao longo do caminho e o que veio dela.
→ A versão em vídeo do relatório pode ser assistida no YouTube (a partir das 03:00:00)
aqui .
Hydra Team
Vou falar um pouco sobre o projeto: ivi é várias dezenas de milhares de unidades de conteúdo, temos um dos maiores diretórios jurídicos do RuNet. A página principal da versão web ivi é um recorte personalizado do catálogo, projetado para fornecer ao usuário o conteúdo mais rico e relevante com base em seus comentários (visualizações, classificações e assim por diante).
A parte on-line do sistema de recomendação é um aplicativo de back-end do Flask com uma carga de até 600 RPS. Off-line, o modelo é treinado em mais de 250 milhões de visualizações de conteúdo por mês. Os pipelines de preparação de dados para treinamento são implementados no Spark, que é executado no topo do repositório Hive.
A equipe agora tem 7 desenvolvedores que estão envolvidos na criação de modelos e na sua implantação em produção - essa é uma equipe bastante grande que requer ferramentas convenientes para gerenciar fluxos de tarefas.
Arquitetura offline
Abaixo, você vê o diagrama da infraestrutura de fluxo de dados para o sistema de recomendação.
Dois armazenamentos de dados são representados aqui - Hive para feedback do usuário (visualizações, classificações) e Postgres para várias informações comerciais (tipos de monetização de conteúdo etc.), enquanto a transferência do Postgres para o Hive é ajustada. Um pacote de aplicativos Spark suga dados do Hive: e treina nossos modelos nesses dados (ALS para recomendações pessoais, vários modelos colaborativos de similaridade de conteúdo).
Os aplicativos Spark são tradicionalmente gerenciados a partir de uma máquina virtual dedicada, que chamamos de hydra-updater usando um monte de scripts cron + shell. Este pacote foi criado no departamento de operações da ivi em tempos imemoriais e funcionou muito bem. O script de shell era um ponto de entrada único para o lançamento de aplicativos spark - ou seja, cada novo modelo começava a girar no produto somente depois que os administradores terminavam esse script.
Alguns dos artefatos do treinamento do modelo são armazenados no HDFS para armazenamento eterno (e aguardando alguém fazer o download deles e transferi-los para o servidor em que a parte on-line está girando), e alguns são gravados diretamente do driver Spark para o armazenamento rápido Redis, que geralmente usamos memória para várias dezenas de processos python da parte online.
Essa arquitetura acumulou uma série de desvantagens ao longo do tempo:
O diagrama mostra que os fluxos de dados têm uma estrutura bastante complicada - sem uma ferramenta simples e clara para gerenciar esse bem, o desenvolvimento e a operação se transformarão em horror, decadência e sofrimento.
Além de gerenciar aplicativos spark, o script admin faz muitas coisas úteis: reiniciar os serviços em batalha, um despejo de Redis e outras coisas do sistema. Obviamente, durante um longo período de operação, o script cresceu demais com muitas funções, pois cada novo modelo nosso gerou algumas dezenas de linhas. O script começou a parecer sobrecarregado demais em termos de funcionalidade; portanto, como uma equipe do sistema de recomendação, queríamos remover em algum lugar parte da funcionalidade que diz respeito ao lançamento e gerenciamento de aplicativos Spark. Para esses propósitos, decidimos usar o Airflow.
Muletas para fluxo de ar
Além de resolver todos esses problemas, é claro, a maneira como criamos novos para nós mesmos - implantar o Airflow para iniciar e monitorar os aplicativos Spark acabou sendo difícil.
A principal dificuldade era que ninguém iria remodelar toda a infraestrutura para nós, porque O recurso devops é uma coisa escassa. Por esse motivo, tivemos que não apenas implementar o Airflow, mas integrá-lo ao sistema existente, o que é muito mais difícil de ser visto do zero.
Quero falar sobre as dores que encontramos durante o processo de implementação e as muletas que tivemos que cortar para obter o Airflow.
A primeira e principal dor : como integrar o Airflow em um grande script de shell do departamento de operações.
Aqui a solução é a mais óbvia - começamos a disparar gráficos diretamente do script shell usando o binário do fluxo de ar com a tecla trigger_dag. Com essa abordagem, não usamos o sheduler Airflow e, de fato, o aplicativo Spark é iniciado com a mesma coroa - isso não é religiosamente muito correto. Mas conseguimos uma integração perfeita com uma solução existente. Aqui está a aparência do início do script de shell do nosso principal aplicativo Spark, que é chamado historicamente de hidramatrizes.
log "$FUNCNAME started" local RETVAL=0 export AIRFLOW_CONFIG=/opt/airflow/airflow.cfg AIRFLOW_API=api/dag_last_run/hydramatrices/all log "run /var/www/airflow/bin/airflow trigger_dag hydramatrices" /var/www/airflow/bin/airflow trigger_dag hydramatrices 2>&1 | tee -a $LOGFILE
Dor: O script de shell do departamento de operações deve, de alguma forma, determinar o status do gráfico Airflow para controlar seu próprio fluxo de execução.
Muleta: estendemos a API REST do Airflow com um ponto de extremidade para o monitoramento do DAG dentro dos scripts do shell. Agora, cada gráfico possui três estados: EXECUTANDO, SUCEDIDO, FALHADO.
De fato, depois de iniciar os cálculos no Airflow, simplesmente pesquisamos regularmente o gráfico em execução: marcamos a solicitação GET para determinar se o DAG foi concluído ou não. Quando o terminal de monitoramento responde sobre a execução bem-sucedida do gráfico, o script de shell continua executando seu fluxo.
Quero dizer que a API REST do Airflow é apenas uma coisa impetuosa que permite configurar flexivelmente seus pipelines - por exemplo, você pode encaminhar parâmetros POST para gráficos.
A extensão da API Airflow é apenas uma classe Python que se parece com isso:
import json import os from airflow import settings from airflow.models import DagBag, DagRun from flask import Blueprint, request, Response airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api') AIRFLOW_DAGS = '{}/dags'.format( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) class ApiResponse: """ GET """ STATUS_OK = 200 STATUS_NOT_FOUND = 404 def __init__(self): pass @staticmethod def standard_response(status: int, payload: dict) -> Response: json_data = json.dumps(payload) resp = Response(json_data, status=status, mimetype='application/json') return resp def success(self, payload: dict) -> Response: return self.standard_response(self.STATUS_OK, payload) def error(self, status: int, message: str) -> Response: return self.standard_response(status, {'error': message}) def not_found(self, message: str = 'Resource not found') -> Response: return self.error(self.STATUS_NOT_FOUND, message)
Usamos a API no script de shell - pesquisamos o endpoint a cada 10 minutos:
TRIGGER=$? [ "$TRIGGER" -eq "0" ] && log "trigger airflow DAG succeeded" || { log "trigger airflow DAG failed"; return 1; } CMD="curl -s http://$HYDRA_SERVER/$AIRFLOW_API | jq .dag_last_run.state" STATE=$(eval $CMD) while [ $STATE == \"running\" ]; do log "Generating matrices in progress..." sleep 600 STATE=$(eval $CMD) done [ $STATE == \"success\" ] && RETVAL=0 || RETVAL=1 [ $RETVAL -eq 0 ] && log "$FUNCNAME succeeded" || log "$FUNCNAME failed" return $RETVAL
Problema : se você executar um trabalho do Spark usando o envio de spark no modo de cluster, saberá que os logs no STDOUT são uma folha não informativa com as linhas "SPARK APPLICATION_ID IS RUNNING". Os logs do próprio aplicativo Spark podem ser visualizados, por exemplo, usando o comando yarn logs. No script de shell, esse problema foi resolvido simplesmente: um túnel SSH foi aberto em uma das máquinas de cluster e o envio de spark foi executado no modo cliente para esta máquina. Nesse caso, o STDOUT terá logs legíveis e compreensíveis. No Airflow, decidimos sempre usar a decisão de cluster e esse número não funcionará.
Muleta: após o envio do spark, nós extraímos os logs do driver do HDFS por application_id e o exibimos na interface Airflow simplesmente através do operador Python print (). O único aspecto negativo - na interface Airflow, os logs aparecem somente após o envio do spark, você precisa seguir o tempo real em outros lugares - por exemplo, o focinho da web do YARN.
def get_logs(config: BaseConfig, app_id: str) -> None: """ :param config: :param app_id: """ hdfs = HDFSInteractor(config) logs_path = '/tmp/logs/{username}/logs/{app_id}'.format(username=config.CURRENT_USERNAME, app_id=app_id) logs_files = hdfs.files_in_folder(logs_path) logs_files = [file for file in logs_files if file[-4:] != '.tmp'] for file in logs_files: with hdfs.hdfs_client.read(os.path.join(logs_path, file), encoding='utf-8', delimiter='\n') as reader: print_line = False for line in reader: if re.search('stdout', line) and len(line) > 30: print_line = True if re.search('stderr', line): print_line = False if print_line: print(line)
Dor : para testadores e desenvolvedores, seria bom ter uma bancada de testes do Airflow, mas estamos economizando recursos de devops, por isso pensamos em como implantar o ambiente de teste por um longo tempo.
Muleta: empacotamos o Airflow em um contêiner de encaixe e o Dockerfile o colocou no repositório com trabalhos de faísca. Assim, cada desenvolvedor ou testador pode aumentar seu próprio fluxo de ar em uma máquina local. Devido ao fato de os aplicativos serem executados no modo de cluster, quase não são necessários recursos locais para o docker.
Uma instalação local do spark estava oculta no contêiner do docker e em toda a sua configuração por meio de variáveis de ambiente - você não precisa mais gastar várias horas configurando o ambiente. Abaixo, dei um exemplo com um fragmento de arquivo docker para um contêiner com Airflow, onde você pode ver como o Airflow é configurado usando variáveis de ambiente:
FROM ubuntu:16.04 ARG AIRFLOW_VERSION=1.9.0 ARG AIRFLOW_HOME ARG USERNAME=airflow ARG USER_ID ARG GROUP_ID ARG LOCALHOST ARG AIRFLOW_PORT ARG PIPENV_PATH ARG PROJECT_HYDRAMATRICES_DOCKER_PATH RUN apt-get update \ && apt-get install -y \ python3.6 \ python3.6-dev \ && update-alternatives --install /usr/bin/python3 python3.6 /usr/bin/python3.6 0 \ && apt-get -y install python3-pip RUN mv /root/.pydistutils.cf /root/.pydistutils.cfg RUN pip3 install pandas==0.20.3 \ apache-airflow==$AIRFLOW_VERSION \ psycopg2==2.7.5 \ ldap3==2.5.1 \ cryptography
Como resultado da implementação do Airflow, alcançamos os seguintes resultados:
- Reduzido o ciclo de lançamento: a implantação de um novo modelo (ou um pipeline de preparação de dados) agora se resume à criação de um novo gráfico do Airflow; os próprios gráficos são armazenados no repositório e implantados com o código. Esse processo está inteiramente nas mãos do desenvolvedor. Os administradores estão felizes, já não os usamos em ninharias.
- Os logs de aplicativos Spark que costumavam ir direto ao inferno agora são armazenados no Aiflow com uma interface de acesso conveniente. Você pode ver os logs de qualquer dia sem escolher os diretórios HDFS.
- O cálculo com falha pode ser reiniciado com um botão na interface, é muito conveniente, até junho pode lidar com isso.
- Você pode marcar tarefas do spark na interface sem precisar executar as configurações do Spark na máquina local. Os testadores estão satisfeitos - todas as configurações do envio de spark para funcionar corretamente já foram feitas no Dockerfile
- Aiflow standard buns - agenda, reinicia trabalhos interrompidos, belos gráficos (por exemplo, tempo de execução do aplicativo, estatísticas de lançamentos bem-sucedidos e sem êxito).
Para onde ir a seguir? Agora, temos um grande número de fontes de dados e sumidouros, cujo número aumentará. Alterações em qualquer classe de repositório hydramatrices podem falhar em outro pipeline (ou mesmo na parte online):
- Estouros de Clickhouse → Hive
- pré-processamento de dados: Hive → Hive
- implantar modelos c2c: Hive → Redis
- preparação de diretórios (como o tipo de monetização do conteúdo): Postgres → Redis
- preparação do modelo: FS local → HDFS
Em tal situação, precisamos realmente de um suporte para testes automáticos de tubulações na preparação de dados. Isso reduzirá bastante o custo das alterações de teste no repositório, acelerará o lançamento de novos modelos na produção e aumentará drasticamente o nível de endorfinas nos testadores. Mas sem o Airflow, seria impossível implantar um suporte para esse tipo de teste automático!
Eu escrevi este artigo para contar sobre a nossa experiência na implementação do Airflow, que pode ser útil para outras equipes em uma situação semelhante - você já possui um grande sistema de trabalho e deseja experimentar algo novo, elegante e jovem. Não é preciso ter medo de nenhuma atualização do sistema em funcionamento; você precisa experimentar - essas experiências geralmente abrem novos horizontes para o desenvolvimento.