Aprenda Metaflow en 10 minutos

Metaflow es un marco de Python creado en Netflix y enfocado en el campo de la ciencia de datos. A saber, está diseñado para crear proyectos destinados a trabajar con datos y para gestionar dichos proyectos. Recientemente, la compañía lo transfirió a la categoría de código abierto. El marco Metaflow ha sido ampliamente utilizado dentro de Netflix en los últimos 2 años. En particular, permitió reducir significativamente el tiempo requerido para la conclusión de proyectos en producción.



El material que estamos traduciendo hoy es una guía rápida de Metaflow.

¿Qué es el metaflujo?


A continuación se muestra un gráfico que ilustra la implementación del marco Metaflow en Netflix.


Implementación de Metaflow en Netflix

En noviembre de 2018, este marco se utilizó en 134 proyectos de la empresa.

Metaflow es un marco para crear y ejecutar flujos de trabajo de ciencia de datos. Cuenta con las siguientes características:

  • Gestión de recursos informáticos.
  • Lanzamiento de tareas en contenedores.
  • Gestión de dependencias externas.
  • Versionado, re-ejecución de tareas, ejecución continua de tareas suspendidas.
  • API de cliente para examinar los resultados de tareas que se pueden usar en el entorno de Jupyter Notebook.
  • Soporte para la ejecución de tareas locales (por ejemplo, en una computadora portátil) y remotas (en la nube). Posibilidad de cambiar entre estos modos.

El usuario vtuulos escribió en Ycombinator que Metaflow puede crear automáticamente instantáneas (instantáneas) de código, datos y dependencias. Todo esto se coloca en un repositorio con direccionamiento por contenido, que generalmente se basa en S3, aunque el sistema de archivos local también es compatible. Esto le permite continuar realizando tareas detenidas, reproducir resultados obtenidos previamente y explorar todo lo relacionado con las tareas, por ejemplo, en Jupyter Notebook.

En general, podemos decir que Metaflow tiene como objetivo aumentar la productividad de los científicos de datos. Esto se hace debido al hecho de que el marco les permite participar exclusivamente en el trabajo con datos, sin distraerse resolviendo tareas relacionadas. Además, Metaflow acelera la retirada de proyectos basados ​​en ella en producción.


Las necesidades de un científico de datos relacionadas con sus responsabilidades directas y la solución de tareas auxiliares relacionadas con la infraestructura en la que se realizan los cálculos.

Escenarios de flujo de trabajo con Metaflow


Aquí hay algunos escenarios de flujo de trabajo que puede organizar usando Metaflow:

  • Colaboración Un científico de datos quiere ayudar a otro a encontrar la fuente del error. Al mismo tiempo, al asistente le gustaría descargar a su computadora todo el entorno en el que funcionó la tarea que se bloqueó.
  • Continuación de las tareas detenidas desde el lugar donde fueron detenidas. Alguna tarea se detuvo con un error (o se detuvo intencionalmente). El error fue corregido (o el código fue editado). Es necesario reiniciar la tarea para que su trabajo continúe desde el lugar donde falló (o se detuvo).
  • Ejecución de tareas híbridas. Es necesario realizar un cierto paso del flujo de trabajo localmente (tal vez este es el paso de descargar datos de un archivo que está almacenado en una carpeta en la computadora), y otro paso que requiere grandes recursos computacionales (tal vez esto es entrenar el modelo) debe realizarse en la nube.
  • Examen de metadatos obtenidos después de completar una tarea. Tres científicos de datos participan en la selección de hiperparámetros del mismo modelo, tratando de mejorar la precisión de este modelo. Después de eso, debe analizar los resultados de completar las tareas de capacitación del modelo y seleccionar el conjunto de hiperparámetros que ha demostrado ser el mejor.
  • Usando múltiples versiones del mismo paquete. En el proyecto debe usar diferentes versiones, por ejemplo, bibliotecas de sklearn. Durante el preprocesamiento, se requiere su versión 0.20, y durante el modelado, se requiere la versión 0.22.

Flujo de trabajo típico de Metaflow


Considere un flujo de trabajo típico de Metaflow desde una perspectiva conceptual y de programación.

LookMira conceptual del flujo de trabajo Metaflow


Desde un punto de vista conceptual, los flujos de trabajo de Metaflow (cadenas de tareas) están representados por gráficos acíclicos dirigidos (DAG). Las siguientes ilustraciones lo ayudarán a comprender mejor esta idea.


Gráfico acíclico lineal


Gráfico acíclico con caminos "paralelos"

Cada nodo del gráfico representa un paso de procesamiento de datos en el flujo de trabajo.

En cada paso de la cadena de tareas, Metaflow ejecuta código Python regular sin ningún cambio especial. El código se ejecuta en contenedores separados en los que se empaqueta el código junto con sus dependencias.

Un aspecto clave de la arquitectura Metaflow está representado por el hecho de que le permite implementar casi cualquier biblioteca externa del ecosistema conda en proyectos basados ​​en ella sin usar complementos. Esto distingue a Metaflow de otras soluciones similares de uso general. Por ejemplo, de Airflow.

▍ Flujo de trabajo de Metaflow en términos de programación


Cada cadena de tareas (flujo) puede representarse como una clase estándar de Python (los nombres de tales clases generalmente tienen la palabra Flow ) si cumple los siguientes requisitos mínimos:

  • La clase es descendiente de la clase Metaflow FlowSpec .
  • Cada función que representa un paso en la cadena de tareas tiene el decorador @step .
  • Al final de cada función @step , debe haber una indicación de una función similar que le sigue. Esto se puede hacer usando una construcción de este tipo: self.next(self.function_name_here) .
  • La clase implementa las funciones de start y end .

Considere un ejemplo de una cadena mínima de tareas que consta de tres nodos.

Su esquema se ve así:

 start → process_message → end 

Aquí está su código:

 from metaflow import FlowSpec, step class LinearFlow(FlowSpec):         """     ,      Metaflow.    """       #         @step    def start(self):        self.message = 'Thanks for reading.'        self.next(self.process_message)    @step    def process_message(self):        print('the message is: %s' % self.message)        self.next(self.end)    @step    def end(self):        print('the message is still: %s' % self.message) if __name__ == '__main__':    LinearFlow() 

Instrucciones de instalación de Metaflow


▍Instalación y ejecución de prueba


Aquí está la secuencia de pasos que debe realizar para instalar y ejecutar Metaflow por primera vez:

  • Instalar Metaflow (se recomienda Python 3): pip3 install metaflow .
  • Coloque el fragmento de código anterior ( aquí está en GitHub) en el archivo linear_flow.py .
  • Para ver la arquitectura de la cadena de tareas implementada por este código, utilice el comando python3 linear_flow.py show .
  • Para iniciar la secuencia, ejecute el python3 linear_flow.py run .

Debería obtener algo similar al que se muestra a continuación.


Verificación exitosa del estado de Metaflow

Aquí vale la pena prestar atención a algunas cosas. El marco Metaflow crea un .metaflow datos .metaflow local. Allí, almacena todos los metadatos relacionados con la ejecución de tareas y las instantáneas asociadas con las sesiones de ejecución de tareas. Si ha configurado los ajustes de Metaflow relacionados con el almacenamiento en la nube, las instantáneas se almacenarán en AWS S3 Bucket, y los metadatos relacionados con el inicio de tareas irán al servicio de metadatos, basado en RDS (Relational Data Store, almacén de datos relacionales). Más adelante hablaremos sobre cómo explorar estos metadatos utilizando la API del cliente. Otro truco, aunque importante, al que vale la pena prestar atención, es que los identificadores de proceso (pid, ID de proceso) adjuntos a los diferentes pasos difieren. Recuerde: dijimos anteriormente que Metaflow contiene de forma independiente cada paso de la cadena de tareas y realiza cada paso en su propio entorno (pasando solo datos entre los pasos).

▍Instalación y configuración de conda (si planea implementar dependencias)


Siga estos pasos para instalar conda:


Ahora está listo para incrustar dependencias de conda en sus cadenas de tareas. Los detalles de este proceso se discutirán a continuación.

Ejemplo de flujo de trabajo realista


Arriba, hablamos sobre cómo instalar Metaflow y cómo asegurarse de que el sistema esté operativo. Además, discutimos los conceptos básicos de la arquitectura del flujo de trabajo y observamos un ejemplo simple. Aquí observamos un ejemplo más complejo, al tiempo que revelamos algunos de los conceptos de Metaflow.

▍Trabajo


Cree un flujo de trabajo utilizando Metaflow que implemente las siguientes funciones:

  • Carga de datos de películas CSV en un marco de datos de Pandas.
  • Cálculo paralelo de cuartiles para géneros.
  • Guardar un diccionario con los resultados de los cálculos.

▍ Cadena de tareas


El esqueleto de la clase GenreStatsFlow se muestra a GenreStatsFlow . Después de analizarlo, comprenderá la esencia del enfoque implementado aquí para resolver nuestro problema.

 from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter class GenreStatsFlow(FlowSpec):  """    ,  ,   .         :    1)  CSV-   Pandas.    2)     .    3)     .  """   @step  def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """       # TODO:  CSV         self.genres = []    self.next(self.compute_statistics, foreach='genres') #  1     @catch(var='compute_failed') #  2  @retry(times=1) #  3  @step  def compute_statistics(self):    """    .   ."""    self.genre = self.input #  4    # TODO:        self.next(self.join)     @step  def join(self, inputs):    """       ."""    # TODO:      self.next(self.end)     @step  def end(self):      """End the flow."""      pass   if __name__ == '__main__':  GenreStatsFlow() 

Considere algunas partes importantes de este ejemplo. El código contiene comentarios de la forma # n , a los que nos referiremos a continuación.

  • En el 1 , en el paso de start , preste atención al parámetro foreach . Gracias a él, las copias de los pasos compute_statistics se compute_statistics en compute_statistics en un for each ciclo for each entrada en la lista de genres .
  • En el 2 decorador @catch(var='compute_failed') detectará cualquier excepción que compute_statistics paso compute_statistics y lo escribirá en la variable compute_failed (se puede leer en el siguiente paso).
  • En el 3 decorador @retry(times=1) hace exactamente lo que su nombre sugiere. Es decir, cuando se producen errores, repite el paso.
  • ¿De self.input 4 , en compute_statistics , self.input ? La cuestión es que la input es una variable de clase proporcionada por Metaflow. Contiene datos aplicables a una instancia particular de compute_statistics (cuando hay múltiples copias de una función ejecutada en paralelo). Metaflow agrega esta variable solo cuando los nodos están representados por varios procesos paralelos, o cuando se combinan varios nodos.
  • Aquí hay un ejemplo de ejecución de la misma función en paralelo: compute_statistics . Pero, si es necesario, puede ejecutar simultáneamente funciones completamente diferentes que no están relacionadas entre sí. Para hacer esto, cambie lo que se muestra en el 1 a algo como self.next(self.func1, self.function2, self.function3) . Por supuesto, con este enfoque, también será necesario reescribir el paso de join , lo que permitirá procesar los resultados de varias funciones en él.

Aquí se explica cómo imaginar la clase de esqueleto anterior.


Representación visual de la clase GenreStatsFlow.

▍Lea el archivo de datos y transfiera los parámetros


  • Descargue este archivo CSV de película.
  • Ahora necesita equipar el programa con soporte para la posibilidad de transferir dinámicamente la ruta al archivo movie_data y el valor max_genres a la max_genres . El mecanismo de los argumentos externos nos ayudará en esto. Metaflow le permite pasar argumentos al programa utilizando indicadores adicionales en el comando de inicio del flujo de trabajo. Por ejemplo, podría verse así: python3 tutorial_flow.py run --movie_data=path/to/movies.csv --max_genres=5 .
  • Metaflow le da al desarrollador objetos IncludeFile y Parameter que le permiten leer la entrada en el código del flujo de trabajo. Nos referimos a los argumentos pasados ​​asignando objetos IncludeFile y Parameter a las variables de clase. Depende de qué es exactamente lo que queremos leer: el archivo o el valor habitual.

Así es como se ve el código al leer los parámetros pasados ​​al programa cuando se lanzó desde la línea de comandos:

     movie_data = IncludeFile("movie_data",                             help="The path to a movie metadata file.",                             default = 'movies.csv')                               max_genres = Parameter('max_genres',                help="The max number of genres to return statistics for",                default=5) 

▍Inclusión de conda en la cadena de tareas


  • Si aún no ha instalado conda, consulte la sección sobre instalación y configuración de conda en este artículo.
  • Agregue el decorador GenreStatsFlow proporcionado por Metaflow a la clase GenreStatsFlow. Este decorador espera recibir la versión de Python. Se puede establecer en el código u obtener mediante una función auxiliar. A continuación se muestra el código que demuestra el uso del decorador y muestra una función auxiliar.

     def get_python_version():    """     ,    python,       .         conda        python.    """    import platform    versions = {'2' : '2.7.15',                '3' : '3.7.4'}    return versions[platform.python_version_tuple()[0]] #       python. @conda_base(python=get_python_version()) class GenreStatsFlow(FlowSpec): 
  • Ahora puede agregar el decorador @conda a cualquier paso en la cadena de tareas. Espera un objeto con dependencias, que se le pasa a través del parámetro de libraries . Metaflow, antes de comenzar el paso, se encargará de preparar el contenedor con las dependencias especificadas. Si es necesario, puede usar de manera segura diferentes versiones de paquetes en diferentes pasos, ya que Metaflow inicia cada paso en un contenedor separado.

         @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self): 
  • Ahora ejecute el siguiente comando: python3 tutorial_flow.py --environment=conda run .

▍Inicio de implementación de pasos


 @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """        import pandas        from io import StringIO        #      Pandas.        self.dataframe = pandas.read_csv(StringIO(self.movie_data))        #   'genres'      .         #   .        self.genres = {genre for genres \                       in self.dataframe['genres'] \                       for genre in genres.split('|')}        self.genres = list(self.genres)        #        .        #  'foreach'             #          self.next(self.compute_statistics, foreach='genres') 

Considere algunas de las características de este código:

  • Tenga en cuenta que la expresión de importación de pandas está dentro de la función que describe el paso. El hecho es que esta dependencia es introducida por conda solo en el alcance de este paso.
  • Pero las variables declaradas aquí ( dataframe y genres ) están disponibles incluso en el código de los pasos realizados después de este paso. El punto es que Metaflow funciona sobre la base de los principios de separación de entornos de ejecución de código, pero permite que los datos se muevan naturalmente entre los pasos de la cadena de tareas.

▍ Implementación del paso compute_statistics


 @catch(var='compute_failed')    @retry    @conda(libraries={'pandas' : '0.25.3'})    @step    def compute_statistics(self):        """            .        """        #             # 'input'.        self.genre = self.input        print("Computing statistics for %s" % self.genre)        #         ,         #        .        selector = self.dataframe['genres'].\                   apply(lambda row: self.genre in row)        self.dataframe = self.dataframe[selector]        self.dataframe = self.dataframe[['movie_title', 'genres', 'gross']]        #     gross   .        points = [.25, .5, .75]        self.quartiles = self.dataframe['gross'].quantile(points).values        #  ,    .        self.next(self.join) 

Tenga en cuenta que en este paso nos referimos a la variable del dataframe que se declaró en el paso de start anterior. Estamos modificando esta variable. Al pasar a los siguientes pasos, este enfoque, que implica el uso de un nuevo objeto de dataframe modificado, le permite organizar un trabajo eficiente con datos.

▍ Implemente el paso de unión


 @conda(libraries={'pandas' : '0.25.3'})    @step    def join(self, inputs):        """               .        """        inputs = inputs[0:self.max_genres]        #   ,    .        self.genre_stats = {inp.genre.lower(): \                            {'quartiles': inp.quartiles,                             'dataframe': inp.dataframe} \                            for inp in inputs}        self.next(self.end) 

Aquí vale la pena destacar un par de puntos:

  • En este paso, usamos una versión completamente diferente de la biblioteca de pandas.
  • Cada elemento en la matriz de inputs es una copia de las compute_statistics ejecutadas compute_statistics . Contiene el estado de la ejecución de la función correspondiente, es decir, los valores de varias variables. Entonces, la input[0].quartiles puede contener cuartiles para el género de comedy , y la input[1].quartiles input[0].quartiles puede contener cuartiles para el género de sci-fi .

▍Proyecto listo


El código completo del proyecto que acabamos de revisar se puede encontrar aquí .

Para ver cómo funciona el flujo de trabajo descrito en el archivo tutorial_flow.py , debe ejecutar el siguiente comando:

 python3 tutorial_flow.py --environment=conda show 

Use el siguiente comando para iniciar el flujo de trabajo:

 python3 tutorial_flow.py --environment=conda run --movie_data=path/to/movies.csv --max_genres=7 

Examinar los resultados de ejecutar un flujo de trabajo utilizando la API del cliente


Para examinar las instantáneas de datos y el estado de los lanzamientos anteriores del flujo de trabajo, puede usar la API del cliente proporcionada por Metaflow. Esta API es ideal para explorar los detalles de los experimentos realizados en el entorno de Jupyter Notebook.

Aquí hay un ejemplo simple de la salida de la variable genre_stats , tomada de los datos del último lanzamiento exitoso de GenreStatsFlow .

 from metaflow import Flow, get_metadata #      print("Using metadata provider: %s" % get_metadata()) #     MovieStatsFlow. run = Flow('GenreStatsFlow').latest_successful_run print("Using analysis from '%s'" % str(run)) genre_stats = run.data.genre_stats print(genre_stats) 

Ejecución de flujos de trabajo en la nube


Después de haber creado y probado el flujo de trabajo en una computadora normal, es muy probable que desee ejecutar el código en la nube para acelerar el trabajo.

Actualmente, Metaflow solo admite la integración con AWS. En la siguiente imagen, puede ver una asignación de los recursos locales y en la nube utilizados por Metaflow.


Integración de Metaflow y AWS

Para conectar Metaflow a AWS, debe completar los siguientes pasos:

  • Primero, debe realizar una configuración de AWS única creando recursos con los que Metaflow pueda trabajar. Los mismos recursos pueden ser utilizados, por ejemplo, por miembros de un equipo de trabajo que se demuestran entre sí los resultados de los flujos de trabajo. Puede encontrar instrucciones relevantes aquí. La configuración es lo suficientemente rápida, porque Metaflow tiene una plantilla de configuración de CloudFormation.
  • A continuación, en la computadora local, debe ejecutar el metaflow configure aws e ingresar las respuestas a las preguntas del sistema. Con estos datos, Metaflow podrá usar almacenes de datos basados ​​en la nube.
  • Ahora, para comenzar los flujos de trabajo locales en la nube, simplemente agregue la tecla --with batch al --with batch inicio del flujo de trabajo. Por ejemplo, podría verse así: python3 sample_flow.py run --with batch .
  • Para realizar un lanzamiento híbrido del flujo de trabajo, es decir, para realizar algunos pasos localmente y algunos en la nube, debe agregar el decorador @batch a esos pasos que deben realizarse en la nube. Por ejemplo, así: @batch(cpu=1, memory=500) .

Resumen


Aquí, me gustaría señalar un par de características de Metaflow que pueden considerarse tanto las ventajas como las desventajas de este marco:

  • Metaflow está estrechamente integrado con AWS. Pero en los planes de desarrollo del marco existe soporte para un mayor número de proveedores de la nube.
  • Metaflow es una herramienta que solo admite la interfaz de línea de comandos. No tiene una interfaz gráfica (a diferencia de otros marcos universales para organizar procesos de trabajo, como Airflow).

Estimados lectores! ¿Planeas usar Metaflow?

Source: https://habr.com/ru/post/482462/


All Articles