在10分钟内学习元流

Metaflow是在Netflix中创建的Python框架,专注于数据科学领域。 即,它旨在创建旨在处理数据的项目并管理此类项目。 最近,该公司将其转移到开源类别。 在过去的两年中,Metaflow框架已在Netflix内部广泛使用。 特别是,他允许大大减少完成生产项目所需的时间。



我们今天翻译的材料是Metaflow的快速指南。

什么是Metaflow?


下图说明了Netflix中Metaflow框架的实现。


Netflix中的Metaflow实施

2018年11月,该框架在公司的134个项目中使用。

元流是用于创建和执行数据科学工作流的框架。 它具有以下功能:

  • 计算资源管理。
  • 容器化任务启动。
  • 管理外部依赖项。
  • 版本控制,重新执行任务,继续执行挂起的任务。
  • 客户端API,用于检查可在Jupyter Notebook环境中使用的任务的结果。
  • 支持本地(例如,在笔记本电脑上)和远程(在云中)任务执行。 在这些模式之间切换的能力。

vtuulos用户在Ycombinator上写道 ,Metaflow可以自动创建代码,数据和依赖项的快照(快照)。 所有这些都放置在按内容编址的存储库中,该内容通常基于S3,尽管也支持本地文件系统。 这样,您可以继续执行已停止的任务,重现以前获得的结果,并浏览与任务相关的所有内容,例如在Jupyter Notebook中。

一般而言,我们可以说Metaflow旨在提高数据科学家的生产力。 之所以如此,是因为该框架允许他们专门从事数据工作,而不会因解决相关任务而分心。 此外,Metaflow加速了基于生产的项目撤出。


数据科学家的需求与其直接职责有关,并且与执行计算的基础结构有关的辅助任务的解决方案

Metaflow的工作流程方案


您可以使用Metaflow组织以下工作流程方案:

  • 协同合作 一位数据科学家想要帮助另一位科学家找到错误的根源。 同时,助手希望将崩溃的任务在其中正常工作的整个环境下载到他的计算机上。
  • 从停止的位置继续停止的任务。 某些任务因错误而停止(或有意停止)。 错误已得到纠正(或代码已编辑)。 有必要重新启动任务,以便从失败(或停止)的地方继续工作。
  • 混合任务执行。 有必要在本地执行工作流程的某个步骤(也许这是从存储在计算机文件夹中的文件中下载数据的步骤),并且需要大量计算资源的另一步骤(也许这是在训练模型)应该在云中执行。
  • 检查完成任务后获得的元数据。 三名数据科学家致力于同一模型的超参数的选择,试图提高该模型的准确性。 之后,您需要分析用于训练模型的任务的结果,并选择表现出最好的超参数集。
  • 使用同一软件包的多个版本。 在项目中,您需要使用不同的版本,例如sklearn库。 在预处理期间,需要版本0.20,在建模期间,需要版本0.22。

典型的元流工作流


从概念和编程角度考虑典型的Metaflow工作流程。

Meta从概念上了解Metaflow工作流程


从概念的角度来看,Metaflow工作流(任务链)由有向无环图 (DAG)表示。 下面的插图将帮助您更好地理解这个想法。


线性无环图


具有“平行”路径的非循环图

图的每个节点代表工作流程中的数据处理步骤。

在任务链的每个步骤中,Metaflow都会执行常规的Python代码,而无需进行任何特殊更改。 代码在单独的容器中执行,代码及其相关性打包到这些容器中。

Metaflow体系结构的关键方面在于它使您无需使用插件即可将conda生态系统中的几乎所有外部库实现到基于该库的项目中。 这将Metaflow与其他类似的通用解决方案区分开来。 例如-从气流。

▍在编程方面的Metaflow工作流程


如果每个任务链(流)满足以下最低要求,则可以表示为标准Python类(此类的名称通常带有单词Flow )。

  • 该类是Metaflow FlowSpec类的后代。
  • 代表任务链中步骤的每个函数都应用了@step装饰器。
  • 在每个@step函数的末尾,必须有一个@step@step的类似函数的指示。 这可以使用以下类型的构造来完成: self.next(self.function_name_here)
  • 该类实现startend函数。

考虑一个由三个节点组成的最小任务链的示例。

她的方案如下所示:

 start → process_message → end 

这是她的代码:

 from metaflow import FlowSpec, step class LinearFlow(FlowSpec):         """     ,      Metaflow.    """       #         @step    def start(self):        self.message = 'Thanks for reading.'        self.next(self.process_message)    @step    def process_message(self):        print('the message is: %s' % self.message)        self.next(self.end)    @step    def end(self):        print('the message is still: %s' % self.message) if __name__ == '__main__':    LinearFlow() 

Metaflow安装说明


and安装与试运行


这是安装和首次启动Metaflow所需执行的步骤序列:

  • 安装Metaflow(建议使用Python 3): pip3 install metaflow
  • 将上面的代码片段(在GitHub上)放在linear_flow.py文件中。
  • 要查看此代码实现的任务链的体系结构,请使用python3 linear_flow.py show命令。
  • 要启动流,请运行python3 linear_flow.py run

您应该获得类似于以下所示的内容。


成功的Metaflow健康检查

这里值得注意一些事情。 Metaflow框架创建一个本地.metaflow数据.metaflow 。 在那里,它存储与任务执行相关的所有元数据以及与任务执行会话相关的快照。 如果您配置了与云数据存储相关的Metaflow设置,则快照将存储在AWS S3存储桶中,而与任务启动相关的元数据将转到基于RDS(关系数据存储,关系数据存储)的元数据服务。 稍后,我们将讨论如何使用客户端API探索该元数据。 另一个琐事虽然很重要,但值得注意,但它是附加到不同步骤的进程标识符(pid,进程ID)不同。 记住-我们上面说过,Metaflow独立地将任务链的每个步骤容器化,并在其自己的环境中执行每个步骤(仅在步骤之间传递数据)。

conconda的安装和配置(如果您计划实现依赖关系)


请按照以下步骤安装conda:

  • 下载并安装Miniconda。
  • 使用命令conda config --add channels conda-forge添加- 一个conda通道 conda config --add channels conda-forge

现在您可以在任务链中嵌入conda依赖项了。 该过程的细节将在下面讨论。

现实的工作流程示例


上面,我们讨论了如何安装Metaflow,以及如何确保系统正常运行。 此外,我们讨论了工作流体系结构的基础,并看了一个简单的示例。 在这里,我们看一个更复杂的示例,同时揭示了Metaflow的一些概念。

▍工作


使用Metaflow创建实现以下功能的工作流:

  • 将CSV电影数据加载到Pandas数据框中。
  • 流派四分位数的并行计算。
  • 保存带有计算结果的字典。

▍任务链


GenreStatsFlow类的框架如下GenreStatsFlow 。 经过分析后,您将了解此处解决问题的方法的实质。

 from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter class GenreStatsFlow(FlowSpec):  """    ,  ,   .         :    1)  CSV-   Pandas.    2)     .    3)     .  """   @step  def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """       # TODO:  CSV         self.genres = []    self.next(self.compute_statistics, foreach='genres') #  1     @catch(var='compute_failed') #  2  @retry(times=1) #  3  @step  def compute_statistics(self):    """    .   ."""    self.genre = self.input #  4    # TODO:        self.next(self.join)     @step  def join(self, inputs):    """       ."""    # TODO:      self.next(self.end)     @step  def end(self):      """End the flow."""      pass   if __name__ == '__main__':  GenreStatsFlow() 

考虑该示例的一些重要部分。 该代码包含以下形式的注释: # n

  • 1start步骤中,请注意foreach参数。 多亏了它, for each genres列表中for each条目的for each循环, compute_statisticscompute_statistics步骤的副本。
  • 2 @catch(var='compute_failed')装饰器@catch(var='compute_failed')捕获compute_statistics步骤中compute_statistics任何异常,并将其写入compute_failed变量(可在下一步中读取)。
  • 3 @retry(times=1)装饰器@retry(times=1)确实执行其名称所暗示的功能。 即,当发生错误时,他重复该步骤。
  • 4compute_statistics self.input ? 问题是, input是Metaflow提供的类变量。 它包含适用于compute_statistics特定实例的compute_statistics (当有多个并行执行的函数副本时)。 仅当节点由多个并行进程表示或合并多个节点时,此变量才由Metaflow添加。
  • 这是在parallel- compute_statistics中运行相同功能的compute_statistics 。 但是,如有必要,您可以同时运行彼此不相关的完全不同的功能。 为此,将 1显示的内容更改为self.next(self.func1, self.function2, self.function3) 。 当然,使用这种方法,也将有必要重写join步骤,从而有可能在其上处理各种功能的结果。

这是想象上面的骨架类的方法。


GenreStatsFlow类的可视表示

▍读取数据文件并传输参数


  • 下载电影CSV文件。
  • 现在,您需要为该程序提供支持,以实现将路径动态传输到movie_data文件并将max_genres值动态传输到max_genres的可能性。 外部争论的机制将在这方面帮助我们。 Metaflow允许您使用工作流启动命令中的其他标志将参数传递给程序。 例如,它可能看起来像这样: python3 tutorial_flow.py run --movie_data=path/to/movies.csv --max_genres=5
  • Metaflow为开发人员提供了IncludeFileParameter对象,您可以通过它们读取工作流代码中的输入。 我们通过将IncludeFileParameter对象分配给类变量来引用传递的Parameter 。 这取决于我们究竟要读取什么-文件或通常的值。

这是从命令行启动时读取传递给程序的参数的代码:

     movie_data = IncludeFile("movie_data",                             help="The path to a movie metadata file.",                             default = 'movies.csv')                               max_genres = Parameter('max_genres',                help="The max number of genres to return statistics for",                default=5) 

con将conda包含在任务链中


  • 如果尚未安装conda,请参阅本文中有关安装和配置conda的部分。
  • GenreStatsFlow提供的GenreStatsFlow装饰器添加到GenreStatsFlow类中。 该装饰器期望获得python版本。 它既可以在代码中设置,也可以使用辅助功能获得。 下面的代码演示了装饰器的用法,并显示了一个辅助函数。

     def get_python_version():    """     ,    python,       .         conda        python.    """    import platform    versions = {'2' : '2.7.15',                '3' : '3.7.4'}    return versions[platform.python_version_tuple()[0]] #       python. @conda_base(python=get_python_version()) class GenreStatsFlow(FlowSpec): 
  • 现在,您可以将@conda装饰器添加到任务链中的任何步骤。 它期望一个具有依赖关系的对象,该对象通过libraries参数传递给它。 在开始该步骤之前,Metaflow将承担准备具有指定依赖项的容器的任务。 如果需要,您可以在不同的步骤中安全地使用不同版本的软件包,因为Metaflow在单独的容器中启动每个步骤。

         @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self): 
  • 现在执行以下命令: python3 tutorial_flow.py --environment=conda run

▍步骤实施开始


 @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """        import pandas        from io import StringIO        #      Pandas.        self.dataframe = pandas.read_csv(StringIO(self.movie_data))        #   'genres'      .         #   .        self.genres = {genre for genres \                       in self.dataframe['genres'] \                       for genre in genres.split('|')}        self.genres = list(self.genres)        #        .        #  'foreach'             #          self.next(self.compute_statistics, foreach='genres') 

考虑一下此代码的一些功能:

  • 请注意,pandas import表达式位于描述该步骤的函数内。 事实是,conda仅在此步骤的范围内引入了这种依赖性。
  • 但是,即使在此步骤之后执行的步骤代码中,此处声明的变量( dataframegenres )仍然可用。 关键是,Metaflow在分离代码执行环境的原理的基础上工作,但允许数据自然地在任务链的各个步骤之间移动。

▍执行compute_statistics步骤


 @catch(var='compute_failed')    @retry    @conda(libraries={'pandas' : '0.25.3'})    @step    def compute_statistics(self):        """            .        """        #             # 'input'.        self.genre = self.input        print("Computing statistics for %s" % self.genre)        #         ,         #        .        selector = self.dataframe['genres'].\                   apply(lambda row: self.genre in row)        self.dataframe = self.dataframe[selector]        self.dataframe = self.dataframe[['movie_title', 'genres', 'gross']]        #     gross   .        points = [.25, .5, .75]        self.quartiles = self.dataframe['gross'].quantile(points).values        #  ,    .        self.next(self.join) 

请注意,在此步骤中,我们引用在上一个start步骤中声明的dataframe变量。 我们正在修改此变量。 在继续进行下一步时,此方法(意味着使用新的修改后的dataframe对象)使您可以组织有效的数据工作。

▍实施加入步骤


 @conda(libraries={'pandas' : '0.25.3'})    @step    def join(self, inputs):        """               .        """        inputs = inputs[0:self.max_genres]        #   ,    .        self.genre_stats = {inp.genre.lower(): \                            {'quartiles': inp.quartiles,                             'dataframe': inp.dataframe} \                            for inp in inputs}        self.next(self.end) 

这里值得强调以下几点:

  • 在此步骤中,我们使用完全不同版本的pandas库。
  • inputs数组中的每个元素都是先前执行的compute_statistics的副本。 它包含相应功能运行的状态,即各种变量的值。 因此, input[0].quartiles可以包含comedy类型的四分位数,而input[1].quartiles input[0].quartiles可以包含sci-fi类型的四分位数。

▍准备项目


我们刚刚审查过的完整项目代码可以在这里找到。

为了查看tutorial_flow.py文件中描述的工作流程如何工作,您需要运行以下命令:

 python3 tutorial_flow.py --environment=conda show 

使用以下命令启动工作流程:

 python3 tutorial_flow.py --environment=conda run --movie_data=path/to/movies.csv --max_genres=7 

检查使用客户端API运行工作流的结果


为了检查数据快照和工作流先前启动的状态,您可以使用Metaflow提供的客户端API 。 该API非常适合探索在Jupyter Notebook环境中执行的实验的详细信息。

这是从最后一次成功启动GenreStatsFlow的数据中获取的genre_stats变量输出的简单示例。

 from metaflow import Flow, get_metadata #      print("Using metadata provider: %s" % get_metadata()) #     MovieStatsFlow. run = Flow('GenreStatsFlow').latest_successful_run print("Using analysis from '%s'" % str(run)) genre_stats = run.data.genre_stats print(genre_stats) 

在云中运行工作流程


在常规计算机上创建并测试工作流之后,很可能希望在云中运行代码以加快工作速度。

当前,Metaflow仅支持与AWS集成。 在下图中,您可以看到Metaflow使用的内部部署和云资源的映射。


元流和AWS集成

要将Metaflow连接到AWS,您必须完成以下步骤:

  • 首先,您需要通过创建Metaflow可以使用的资源来进行一次AWS设置。 例如,工作团队的成员可以使用相同的资源,他们可以互相演示工作流程的结果。 可以在此处找到相关说明。 设置速度足够快,因为Metaflow具有CloudFormation设置模板。
  • 接下来,在本地计算机上,您需要运行metaflow configure aws并输入系统问题的答案。 有了这些数据,Metaflow将能够使用基于云的数据仓库。
  • 现在,要在云中启动本地工作流,只需在工作流开始--with batch添加--with batch键即可。 例如,它可能看起来像这样: python3 sample_flow.py run --with batch
  • 为了执行工作流的混合启动,也就是说,要在本地执行某些步骤,并在云中执行某些步骤,您需要将@batch装饰器添加到需要在云中执行的那些步骤。 例如,像这样: @batch(cpu=1, memory=500)

总结


在这里,我想指出一些Metaflow功能,这些功能既可以考虑该框架的优点,也可以考虑其缺点:

  • Metaflow与AWS紧密集成。 但是在框架开发计划中,有对大量云提供商的支持。
  • Metaflow是仅支持命令行界面的工具。 它没有图形界面(不同于其他用于组织工作流程的通用框架,例如Airflow)。

亲爱的读者们! 您打算使用Metaflow吗?

Source: https://habr.com/ru/post/zh-CN482462/


All Articles