Aprenda Metaflow em 10 minutos

O Metaflow é uma estrutura Python criada na Netflix e focada no campo da Ciência de Dados. Ou seja, ele foi projetado para criar projetos destinados a trabalhar com dados e gerenciar tais projetos. Recentemente, a empresa o transferiu para a categoria de código aberto. A estrutura do Metaflow tem sido amplamente usada na Netflix nos últimos 2 anos. Em particular, ele permitiu reduzir significativamente o tempo necessário para a conclusão de projetos em produção.



O material que estamos traduzindo hoje é um guia rápido para o Metaflow.

O que é o Metaflow?


Abaixo está um gráfico que ilustra a implementação da estrutura Metaflow na Netflix.


Implementação de Metaflow na Netflix

Em novembro de 2018, esse quadro foi utilizado em 134 projetos da empresa.

O Metaflow é uma estrutura para criar e executar fluxos de trabalho de ciência de dados. Possui os seguintes recursos:

  • Gerenciamento de recursos de computação.
  • Início da tarefa em contêiner.
  • Gerenciando dependências externas.
  • Controle de versão, reexecutando tarefas, continuando a execução de tarefas suspensas.
  • API do cliente para examinar os resultados das tarefas que podem ser usadas no ambiente do Jupyter Notebook.
  • Suporte para execução de tarefas locais (por exemplo, em um laptop) e remotas (na nuvem). Capacidade de alternar entre esses modos.

O usuário vtuulos escreveu no Ycombinator que o Metaflow pode criar automaticamente snapshots (snapshots) de código, dados e dependências. Tudo isso é colocado em um repositório com endereçamento por conteúdo, que geralmente é baseado no S3, embora o sistema de arquivos local também seja suportado. Isso permite que você continue executando tarefas interrompidas, reproduza resultados obtidos anteriormente e explore tudo relacionado a tarefas, por exemplo, no Jupyter Notebook.

Em geral, podemos dizer que o Metaflow visa aumentar a produtividade dos cientistas de dados. Isso é feito devido ao fato de a estrutura permitir que eles se envolvam exclusivamente no trabalho com dados, sem se distrair com a resolução de tarefas relacionadas. Além disso, o Metaflow acelera a retirada de projetos baseados nele na produção.


As necessidades de um cientista de dados relacionadas às suas responsabilidades diretas e a solução de tarefas auxiliares relacionadas à infraestrutura na qual os cálculos são realizados

Cenários de fluxo de trabalho com Metaflow


Aqui estão alguns cenários de fluxo de trabalho que você pode organizar usando o Metaflow:

  • Colaboração Um cientista de dados quer ajudar outro a encontrar a fonte do erro. Ao mesmo tempo, o assistente gostaria de baixar para o computador todo o ambiente em que a tarefa que falhou funcionava.
  • Continuação das tarefas paradas a partir do local em que foram interrompidas. Algumas tarefas foram interrompidas com um erro (ou foram intencionalmente interrompidas). O erro foi corrigido (ou o código foi editado). É necessário reiniciar a tarefa para que seu trabalho continue do local em que falhou (ou foi interrompido).
  • Execução de tarefas híbridas. Você precisa executar uma determinada etapa do fluxo de trabalho localmente (talvez seja a etapa de baixar dados de um arquivo armazenado em uma pasta no computador) e outra etapa que exija grandes recursos computacionais (talvez este seja o treinamento do modelo) deve ser executada na nuvem.
  • Exame dos metadados obtidos após a conclusão de uma tarefa. Três dados Os cientistas estão envolvidos na seleção de hiperparâmetros do mesmo modelo, tentando melhorar a precisão desse modelo. Depois disso, é necessário analisar os resultados da conclusão das tarefas de treinamento do modelo e selecionar o conjunto de hiperparâmetros que provaram ser os melhores.
  • Usando várias versões do mesmo pacote. No projeto, você precisa usar versões diferentes, por exemplo, sklearn libraries. Durante o pré-processamento, sua versão 0.20 é necessária e, durante a modelagem, a versão 0.22.

Fluxo de trabalho típico do Metaflow


Considere um fluxo de trabalho típico do Metaflow de uma perspectiva conceitual e de programação.

LookVista conceitual do fluxo de trabalho do Metaflow


Do ponto de vista conceitual, os fluxos de trabalho do Metaflow (cadeias de tarefas) são representados por gráficos acíclicos direcionados (DAGs). As ilustrações abaixo ajudarão você a entender melhor essa ideia.


Gráfico acíclico linear


Gráfico acíclico com caminhos "paralelos"

Cada nó do gráfico representa uma etapa de processamento de dados no fluxo de trabalho.

Em cada etapa da cadeia de tarefas, o Metaflow executa o código Python regular sem nenhuma alteração especial. O código é executado em contêineres separados nos quais o código é compactado junto com suas dependências.

Um aspecto fundamental da arquitetura Metaflow é representado pelo fato de permitir a implementação de quase todas as bibliotecas externas do ecossistema conda em projetos baseados nela, sem o uso de plug-ins. Isso distingue o Metaflow de outras soluções similares de uso geral. Por exemplo - do Airflow.

Flow Fluxo de trabalho do Metaflow em termos de programação


Cada cadeia de tarefas (fluxo) pode ser representada como uma classe Python padrão (os nomes dessas classes geralmente têm a palavra Flow ) se ela atender aos seguintes requisitos mínimos:

  • A classe é descendente da classe Metaflow FlowSpec .
  • Cada função que representa uma etapa na cadeia de tarefas tem o decorador @step .
  • No final de cada função @step , deve haver uma indicação de uma função semelhante que a segue. Isso pode ser feito usando uma construção desse tipo: self.next(self.function_name_here) .
  • A classe implementa as funções start e end .

Considere um exemplo de uma cadeia mínima de tarefas que consiste em três nós.

Seu esquema é assim:

 start → process_message → end 

Aqui está o código dela:

 from metaflow import FlowSpec, step class LinearFlow(FlowSpec):         """     ,      Metaflow.    """       #         @step    def start(self):        self.message = 'Thanks for reading.'        self.next(self.process_message)    @step    def process_message(self):        print('the message is: %s' % self.message)        self.next(self.end)    @step    def end(self):        print('the message is still: %s' % self.message) if __name__ == '__main__':    LinearFlow() 

Instruções de instalação do Metaflow


RunInstalação e teste


Aqui está a sequência de etapas que você precisa executar para instalar e iniciar o Metaflow:

  • Instale o Metaflow (Python 3 recomendado): pip3 install metaflow .
  • Coloque o fragmento de código acima ( aqui está no GitHub) no arquivo linear_flow.py .
  • Para examinar a arquitetura da cadeia de tarefas implementada por este código, use o comando python3 linear_flow.py show .
  • Para iniciar o fluxo, execute o python3 linear_flow.py run .

Você deve obter algo semelhante ao mostrado abaixo.


Verificação de integridade bem-sucedida do Metaflow

Aqui vale a pena prestar atenção em algumas coisas. A estrutura do Metaflow cria um .metaflow dados .metaflow local. Lá, ele armazena todos os metadados relacionados à execução de tarefas e instantâneos associados às sessões de execução de tarefas. Se você definiu as configurações do Metaflow relacionadas ao armazenamento de dados na nuvem, os instantâneos serão armazenados no AWS S3 Bucket e os metadados relacionados às ativações de tarefas irão para o serviço Metadata, com base no RDS (Relational Data Store, Relational Data Store). Mais tarde, falaremos sobre como explorar esses metadados usando a API do cliente. Outra ninharia, embora importante, que vale a pena prestar atenção, é que os identificadores de processo (pid, IDs de processo) anexados a diferentes etapas diferem. Lembre-se - dissemos acima que o Metaflow armazena independentemente cada etapa da cadeia de tarefas e executa cada etapa em seu próprio ambiente (passando apenas dados entre as etapas).

AndInstalação e configuração do conda (se você planeja implementar dependências)


Siga estas etapas para instalar o conda:


Agora você está pronto para incorporar dependências conda em suas cadeias de tarefas. Detalhes deste processo serão discutidos abaixo.

Exemplo de fluxo de trabalho realista


Acima, falamos sobre como instalar o Metaflow e como garantir que o sistema esteja operacional. Além disso, discutimos o básico da arquitetura de fluxo de trabalho e analisamos um exemplo simples. Aqui, examinamos um exemplo mais complexo, enquanto revelamos alguns dos conceitos do Metaflow.

▍Job


Crie um fluxo de trabalho usando o Metaflow que implemente as seguintes funções:

  • Carregando dados de filme CSV em um dataframe do Pandas.
  • Cálculo paralelo de quartis para gêneros.
  • Salvando um dicionário com os resultados dos cálculos.

Chain Cadeia de tarefas


O esqueleto da classe GenreStatsFlow é mostrado GenreStatsFlow . Após analisá-lo, você entenderá a essência da abordagem implementada aqui para resolver nosso problema.

 from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter class GenreStatsFlow(FlowSpec):  """    ,  ,   .         :    1)  CSV-   Pandas.    2)     .    3)     .  """   @step  def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """       # TODO:  CSV         self.genres = []    self.next(self.compute_statistics, foreach='genres') #  1     @catch(var='compute_failed') #  2  @retry(times=1) #  3  @step  def compute_statistics(self):    """    .   ."""    self.genre = self.input #  4    # TODO:        self.next(self.join)     @step  def join(self, inputs):    """       ."""    # TODO:      self.next(self.end)     @step  def end(self):      """End the flow."""      pass   if __name__ == '__main__':  GenreStatsFlow() 

Considere algumas partes importantes deste exemplo. O código contém comentários do formulário # n , ao qual nos referiremos abaixo.

  • No 1 , na etapa start , preste atenção ao parâmetro foreach . Graças a isso, cópias das etapas compute_statistics são compute_statistics em compute_statistics em um for each loop for each entrada na lista de genres .
  • No 2 decorador @catch(var='compute_failed') captura qualquer exceção que compute_statistics etapa compute_statistics e a grava na variável compute_failed (pode ser lida na próxima etapa).
  • No 3 decorador @retry(times=1) faz exatamente o que o nome sugere. Ou seja, quando ocorrem erros, ele repete a etapa.
  • De onde self.input 4 , em compute_statistics , self.input ? A questão é que input é uma variável de classe fornecida pelo Metaflow. Ele contém dados aplicáveis ​​a uma instância específica de compute_statistics (quando há várias cópias de uma função executada em paralelo). Essa variável é adicionada pelo Metaflow somente quando os nós são representados por vários processos paralelos ou quando vários nós são combinados.
  • Aqui está um exemplo de como executar a mesma função em paralelo - compute_statistics . Mas, se necessário, você pode executar simultaneamente funções completamente diferentes que não estão relacionadas uma à outra. Para fazer isso, altere o que é mostrado no 1 para algo como self.next(self.func1, self.function2, self.function3) . Obviamente, com essa abordagem, será necessário reescrever a etapa de join , possibilitando processar os resultados de várias funções nela.

Veja como imaginar a classe de esqueleto acima.


Representação visual da classe GenreStatsFlow

EadLeia o arquivo de dados e os parâmetros de transferência


  • Faça o download deste arquivo CSV do filme.
  • Agora você precisa equipar o programa com suporte para a possibilidade de transferir dinamicamente o caminho para o arquivo movie_data e o valor max_genres para a max_genres . O mecanismo de argumentos externos nos ajudará nisso. O Metaflow permite passar argumentos para o programa usando sinalizadores adicionais no comando start do fluxo de trabalho. Por exemplo, pode ser assim: python3 tutorial_flow.py run --movie_data=path/to/movies.csv --max_genres=5 .
  • O Metaflow fornece ao desenvolvedor os objetos IncludeFile e Parameter que permitem ler a entrada no código do fluxo de trabalho. Nos referimos aos argumentos passados ​​ao atribuir objetos IncludeFile e Parameter a variáveis ​​de classe. Depende do que exatamente queremos ler - o arquivo ou o valor usual.

Veja como o código se parece com a leitura dos parâmetros passados ​​para o programa quando foi iniciado a partir da linha de comando:

     movie_data = IncludeFile("movie_data",                             help="The path to a movie metadata file.",                             default = 'movies.csv')                               max_genres = Parameter('max_genres',                help="The max number of genres to return statistics for",                default=5) 

ClusInclusão de conda na cadeia de tarefas


  • Se você ainda não instalou o conda, consulte a seção sobre instalação e configuração do conda neste artigo.
  • Adicione o decorador GenreStatsFlow fornecido pelo Metaflow à classe GenreStatsFlow. Este decorador espera receber a versão python. Pode ser definido no código ou obtido usando uma função auxiliar. Abaixo está o código que demonstra o uso do decorador e mostra uma função auxiliar.

     def get_python_version():    """     ,    python,       .         conda        python.    """    import platform    versions = {'2' : '2.7.15',                '3' : '3.7.4'}    return versions[platform.python_version_tuple()[0]] #       python. @conda_base(python=get_python_version()) class GenreStatsFlow(FlowSpec): 
  • Agora você pode adicionar o decorador @conda a qualquer etapa da cadeia de tarefas. Ele espera um objeto com dependências, que é passado a ele através do parâmetro libraries . O meta-fluxo, antes de iniciar a etapa, assumirá a tarefa de preparar o contêiner com as dependências especificadas. Se necessário, você pode usar com segurança versões diferentes de pacotes em etapas diferentes, pois o Metaflow inicia cada etapa em um contêiner separado.

         @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self): 
  • Agora execute o seguinte comando: python3 tutorial_flow.py --environment=conda run .

Início da implementação


 @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """        import pandas        from io import StringIO        #      Pandas.        self.dataframe = pandas.read_csv(StringIO(self.movie_data))        #   'genres'      .         #   .        self.genres = {genre for genres \                       in self.dataframe['genres'] \                       for genre in genres.split('|')}        self.genres = list(self.genres)        #        .        #  'foreach'             #          self.next(self.compute_statistics, foreach='genres') 

Considere alguns dos recursos deste código:

  • Observe que a expressão de importação de pandas está dentro da função que descreve a etapa. O fato é que essa dependência é introduzida pela conda apenas no escopo desta etapa.
  • Mas as variáveis ​​declaradas aqui ( dataframe e genres ) estão disponíveis mesmo no código das etapas executadas após esta etapa. O ponto é que o Metaflow funciona com base nos princípios de separação dos ambientes de execução de código, mas permite que os dados se movam naturalmente entre as etapas da cadeia de tarefas.

▍ Implementação da etapa compute_statistics


 @catch(var='compute_failed')    @retry    @conda(libraries={'pandas' : '0.25.3'})    @step    def compute_statistics(self):        """            .        """        #             # 'input'.        self.genre = self.input        print("Computing statistics for %s" % self.genre)        #         ,         #        .        selector = self.dataframe['genres'].\                   apply(lambda row: self.genre in row)        self.dataframe = self.dataframe[selector]        self.dataframe = self.dataframe[['movie_title', 'genres', 'gross']]        #     gross   .        points = [.25, .5, .75]        self.quartiles = self.dataframe['gross'].quantile(points).values        #  ,    .        self.next(self.join) 

Observe que nesta etapa nos referimos à variável dataframe que foi declarada na etapa start anterior. Estamos modificando essa variável. Ao passar para as próximas etapas, essa abordagem, que implica o uso de um novo objeto de dataframe modificado, permite organizar um trabalho eficiente com dados.

▍ Implementar a etapa de junção


 @conda(libraries={'pandas' : '0.25.3'})    @step    def join(self, inputs):        """               .        """        inputs = inputs[0:self.max_genres]        #   ,    .        self.genre_stats = {inp.genre.lower(): \                            {'quartiles': inp.quartiles,                             'dataframe': inp.dataframe} \                            for inp in inputs}        self.next(self.end) 

Aqui vale destacar alguns pontos:

  • Nesta etapa, usamos uma versão completamente diferente da biblioteca de pandas.
  • Cada elemento na matriz de inputs é uma cópia dos compute_statistics executados anteriormente. Ele contém o estado da função correspondente executada, ou seja, os valores de várias variáveis. Assim, a input[0].quartiles pode conter quartis para o gênero de comedy , e a input[1].quartiles input[0].quartiles pode conter quartis para o gênero de sci-fi .

Draft Rascunho pronto


O código completo do projeto que acabamos de revisar pode ser encontrado aqui .

Para examinar como o fluxo de trabalho descrito no arquivo tutorial_flow.py , você precisa executar o seguinte comando:

 python3 tutorial_flow.py --environment=conda show 

Use o seguinte comando para iniciar o fluxo de trabalho:

 python3 tutorial_flow.py --environment=conda run --movie_data=path/to/movies.csv --max_genres=7 

Examinando os resultados da execução de um fluxo de trabalho usando a API do cliente


Para examinar instantâneos de dados e o status de lançamentos anteriores do fluxo de trabalho, você pode usar a API do cliente fornecida pelo Metaflow. Essa API é ideal para explorar os detalhes de experimentos realizados no ambiente do Jupyter Notebook.

Aqui está um exemplo simples da saída da variável genre_stats , extraída dos dados do último lançamento bem-sucedido do GenreStatsFlow .

 from metaflow import Flow, get_metadata #      print("Using metadata provider: %s" % get_metadata()) #     MovieStatsFlow. run = Flow('GenreStatsFlow').latest_successful_run print("Using analysis from '%s'" % str(run)) genre_stats = run.data.genre_stats print(genre_stats) 

Executando fluxos de trabalho na nuvem


Depois de criar e testar o fluxo de trabalho em um computador comum, é muito provável que você queira executar o código na nuvem para acelerar o trabalho.

Atualmente, o Metaflow suporta apenas a integração com a AWS. Na imagem a seguir, você pode ver um mapeamento dos recursos locais e de nuvem usados ​​pelo Metaflow.


Metaflow e integração da AWS

Para conectar o Metaflow à AWS, você deve concluir as seguintes etapas:

  • Primeiro, você precisa fazer uma configuração única da AWS, criando recursos com os quais o Metaflow pode trabalhar. Os mesmos recursos podem ser usados, por exemplo, por membros de uma equipe de trabalho que demonstram entre si os resultados dos fluxos de trabalho. Você pode encontrar instruções relevantes aqui. As configurações são rápidas o suficiente, porque o Metaflow possui um modelo de configurações do CloudFormation.
  • Em seguida, no computador local, você precisa executar o metaflow configure aws e inserir respostas para as perguntas do sistema. Com esses dados, o Metaflow poderá usar data warehouses baseados em nuvem.
  • Agora, para iniciar fluxos de trabalho locais na nuvem, adicione a chave --with batch ao --with batch start do fluxo de trabalho. Por exemplo, pode ser assim: python3 sample_flow.py run --with batch .
  • Para executar uma inicialização híbrida do fluxo de trabalho, ou seja, para executar algumas etapas localmente e outras na nuvem, você precisa adicionar o decorador do @batch às etapas que precisam ser executadas na nuvem. Por exemplo, assim: @batch(cpu=1, memory=500) .

Sumário


Aqui, gostaria de observar alguns recursos do Metaflow que podem ser considerados as vantagens e desvantagens dessa estrutura:

  • O Metaflow está totalmente integrado à AWS. Porém, nos planos de desenvolvimento da estrutura, há suporte para um número maior de provedores de nuvem.
  • Metaflow é uma ferramenta que suporta apenas a interface da linha de comandos. Ele não possui uma interface gráfica (diferente de outras estruturas universais para organizar processos de trabalho, como o Airflow).

Caros leitores! Você planeja usar o Metaflow?

Source: https://habr.com/ru/post/pt482462/


All Articles