Lernen Sie Metaflow in 10 Minuten

Metaflow ist ein Python-Framework, das in Netflix erstellt wurde und sich auf den Bereich Data Science konzentriert. Es soll nämlich Projekte erstellen, die auf die Arbeit mit Daten abzielen, und solche Projekte verwalten. Das Unternehmen hat es kürzlich in die Kategorie Open Source überführt. Das Metaflow-Framework wurde in den letzten 2 Jahren in Netflix häufig verwendet. Insbesondere konnte er den Zeitaufwand für den Abschluss von Projekten in der Produktion deutlich reduzieren.



Das Material, das wir heute übersetzen, ist eine Kurzanleitung für Metaflow.

Was ist Metaflow?


Nachfolgend sehen Sie ein Diagramm, das die Implementierung des Metaflow-Frameworks in Netflix veranschaulicht.


Metaflow-Implementierung in Netflix

Im November 2018 wurde dieses Framework in 134 Projekten des Unternehmens eingesetzt.

Metaflow ist ein Framework zum Erstellen und Ausführen von Data Science-Workflows. Es bietet die folgenden Funktionen:

  • Computing-Ressourcenverwaltung.
  • Start der containerisierten Aufgabe.
  • Externe Abhängigkeiten verwalten.
  • Versionierung, erneutes Ausführen von Aufgaben, Fortsetzung der Ausführung angehaltener Aufgaben.
  • Client-API zum Untersuchen der Ergebnisse von Aufgaben, die in der Jupyter Notebook-Umgebung verwendet werden können.
  • Unterstützung für die Ausführung lokaler (z. B. auf einem Laptop) und entfernter (in der Cloud) Aufgaben. Möglichkeit, zwischen diesen Modi zu wechseln.

Benutzer vtuulos schrieb auf Ycombinator, dass Metaflow automatisch Snapshots (Snapshots) von Code, Daten und Abhängigkeiten erstellen kann. All dies wird in einem Repository mit inhaltlicher Adressierung abgelegt, das in der Regel auf S3 basiert, obwohl auch das lokale Dateisystem unterstützt wird. Auf diese Weise können Sie angehaltene Aufgaben fortsetzen, zuvor erhaltene Ergebnisse reproduzieren und alle mit Aufgaben zusammenhängenden Aufgaben untersuchen, z. B. im Jupyter-Notizbuch.

Generell können wir sagen, dass Metaflow darauf abzielt, die Produktivität von Datenwissenschaftlern zu steigern. Dies geschieht aufgrund der Tatsache, dass das Framework es ihnen ermöglicht, ausschließlich mit Daten zu arbeiten, ohne durch die Lösung verwandter Aufgaben abgelenkt zu werden. Darüber hinaus beschleunigt Metaflow die Rücknahme von darauf basierenden Projekten in der Produktion.


Die Bedürfnisse eines Datenwissenschaftlers bezogen sich auf seine direkten Verantwortlichkeiten und die Lösung von Hilfsaufgaben im Zusammenhang mit der Infrastruktur, auf der die Berechnungen durchgeführt werden

Workflow-Szenarien mit Metaflow


Hier sind einige Workflow-Szenarien, die Sie mit Metaflow organisieren können:

  • Zusammenarbeit Ein Datenwissenschaftler möchte einem anderen helfen, die Fehlerquelle zu finden. Gleichzeitig möchte der Assistent die gesamte Umgebung, in der die abgestürzte Aufgabe funktioniert hat, auf seinen Computer herunterladen.
  • Fortsetzung der gestoppten Aufgaben von der Stelle, an der sie gestoppt wurden. Einige Aufgaben wurden mit einem Fehler abgebrochen (oder absichtlich abgebrochen). Der Fehler wurde behoben (oder der Code wurde bearbeitet). Der Task muss neu gestartet werden, damit seine Arbeit an der Stelle fortgesetzt werden kann, an der er fehlgeschlagen ist (oder gestoppt wurde).
  • Hybrid-Task-Ausführung. Sie müssen einen bestimmten Schritt des Workflows lokal ausführen (möglicherweise ist dies der Schritt des Herunterladens von Daten aus einer Datei, die in einem Ordner auf dem Computer gespeichert ist), und ein weiterer Schritt, der große Rechenressourcen erfordert (möglicherweise wird das Modell trainiert), sollte in der Cloud ausgeführt werden.
  • Prüfung der nach Erledigung einer Aufgabe erhaltenen Metadaten. Drei Wissenschaftler beschäftigen sich mit der Auswahl von Hyperparametern desselben Modells, um die Genauigkeit dieses Modells zu verbessern. Danach müssen Sie die Ergebnisse der Ausführung der Aufgaben zum Trainieren des Modells analysieren und den Satz von Hyperparametern auswählen, der sich als der beste erwiesen hat.
  • Verwenden mehrerer Versionen desselben Pakets. Im Projekt müssen Sie verschiedene Versionen verwenden, z. B. sklearn-Bibliotheken. Während der Vorverarbeitung ist die Version 0.20 und während der Modellierung die Version 0.22 erforderlich.

Typischer Metaflow-Workflow


Betrachten Sie einen typischen Metaflow-Workflow aus konzeptioneller und programmtechnischer Sicht.

▍Konzeptioneller Blick auf den Metaflow-Workflow


Aus konzeptioneller Sicht werden Metaflow-Workflows (Taskketten) durch gerichtete azyklische Diagramme (DAGs) dargestellt. Die folgenden Abbildungen helfen Ihnen, diese Idee besser zu verstehen.


Linearer azyklischer Graph


Azyklischer Graph mit "parallelen" Pfaden

Jeder Knoten des Diagramms repräsentiert einen Datenverarbeitungsschritt im Workflow.

Bei jedem Schritt der Task-Kette führt Metaflow regulären Python-Code ohne besondere Änderungen aus. Der Code wird in separaten Containern ausgeführt, in die der Code zusammen mit seinen Abhängigkeiten gepackt wird.

Ein wesentlicher Aspekt der Metaflow-Architektur ist die Tatsache, dass Sie nahezu alle externen Bibliotheken aus dem Conda-Ökosystem in darauf basierende Projekte implementieren können, ohne Plugins zu verwenden. Dies unterscheidet Metaflow von anderen ähnlichen Allzwecklösungen. Zum Beispiel - von Airflow.

▍ Metaflow-Workflow in Bezug auf die Programmierung


Jede Task-Kette (Stream) kann als Standard-Python-Klasse dargestellt werden (in den Namen solcher Klassen steht normalerweise das Wort Flow ), wenn sie die folgenden Mindestanforderungen erfüllt:

  • Die Klasse ist der Nachfolger der Metaflow FlowSpec Klasse.
  • Auf jede Funktion, die einen Schritt in der @step Dekorator @step .
  • Am Ende jeder @step Funktion muss ein Hinweis auf eine ähnliche Funktion stehen, die darauf folgt. Dies kann mit einem Konstrukt dieser Art self.next(self.function_name_here) : self.next(self.function_name_here) .
  • Die Klasse implementiert die start und Endfunktionen.

Betrachten Sie ein Beispiel für eine minimale Aufgabenkette, die aus drei Knoten besteht.

Ihr Schema sieht so aus:

 start → process_message → end 

Hier ist ihr Code:

 from metaflow import FlowSpec, step class LinearFlow(FlowSpec):         """     ,      Metaflow.    """       #         @step    def start(self):        self.message = 'Thanks for reading.'        self.next(self.process_message)    @step    def process_message(self):        print('the message is: %s' % self.message)        self.next(self.end)    @step    def end(self):        print('the message is still: %s' % self.message) if __name__ == '__main__':    LinearFlow() 

Installationsanleitung für Metaflow


▍Installation und Probelauf


Die folgenden Schritte müssen ausgeführt werden, um Metaflow zum ersten Mal zu installieren und zu starten:

  • Installieren Sie Metaflow (Python 3 wird empfohlen): pip3 install metaflow .
  • linear_flow.py Sie das obige Codefragment ( hier auf GitHub) in die Datei linear_flow.py .
  • Verwenden Sie den Befehl python3 linear_flow.py show , um die Architektur der von diesem Code implementierten python3 linear_flow.py show .
  • python3 linear_flow.py run zum Starten des Streams den python3 linear_flow.py run .

Sie sollten etwas Ähnliches wie das unten gezeigte erhalten.


Erfolgreicher Metaflow Health Check

Hier lohnt es sich, auf einige Dinge zu achten. Das Metaflow-Framework erstellt ein lokales .metaflow Data .metaflow . Dort werden alle Metadaten im Zusammenhang mit der Ausführung von Aufgaben sowie Momentaufnahmen gespeichert, die mit Sitzungen zur Ausführung von Aufgaben verknüpft sind. Wenn Sie Metaflow-Einstellungen für den Cloud-Speicher konfiguriert haben, werden Snapshots in AWS S3 Bucket gespeichert, und Metadaten für Aufgabenstarts werden basierend auf RDS (Relational Data Store, relationaler Datenspeicher) an den Metadatendienst gesendet. Später werden wir darüber sprechen, wie diese Metadaten mithilfe der Client-API untersucht werden. Eine andere Kleinigkeit, obwohl wichtig, die es wert ist, beachtet zu werden, ist, dass sich die Prozesskennungen (pid, Prozess-IDs), die mit verschiedenen Schritten verknüpft sind, unterscheiden. Denken Sie daran - wir haben oben gesagt, dass Metaflow jeden Schritt der Taskkette unabhängig voneinander containerisiert und jeden Schritt in seiner eigenen Umgebung ausführt (wobei nur Daten zwischen den Schritten übertragen werden).

▍Installation und Konfiguration von conda (wenn Sie Abhängigkeiten implementieren möchten)


Befolgen Sie diese Schritte, um conda zu installieren:


Sie können nun Conda-Abhängigkeiten in Ihre Task-Ketten einbetten. Details dieses Prozesses werden unten diskutiert.

Realistisches Workflow-Beispiel


Oben haben wir darüber gesprochen, wie Sie Metaflow installieren und sicherstellen, dass das System betriebsbereit ist. Darüber hinaus haben wir die Grundlagen der Workflow-Architektur besprochen und ein einfaches Beispiel betrachtet. Hier sehen wir uns ein komplexeres Beispiel an und zeigen einige der Konzepte von Metaflow.

▍Job


Erstellen Sie einen Workflow mit Metaflow, der die folgenden Funktionen implementiert:

  • Laden von CSV-Filmdaten in einen Pandas-Datenrahmen.
  • Parallele Berechnung von Quartilen für Genres.
  • Speichern eines Wörterbuchs mit den Ergebnissen von Berechnungen.

▍ Auftragskette


Das Grundgerüst der GenreStatsFlow Klasse ist GenreStatsFlow . Nachdem Sie es analysiert haben, werden Sie das Wesentliche des hier implementierten Ansatzes zur Lösung unseres Problems verstehen.

 from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter class GenreStatsFlow(FlowSpec):  """    ,  ,   .         :    1)  CSV-   Pandas.    2)     .    3)     .  """   @step  def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """       # TODO:  CSV         self.genres = []    self.next(self.compute_statistics, foreach='genres') #  1     @catch(var='compute_failed') #  2  @retry(times=1) #  3  @step  def compute_statistics(self):    """    .   ."""    self.genre = self.input #  4    # TODO:        self.next(self.join)     @step  def join(self, inputs):    """       ."""    # TODO:      self.next(self.end)     @step  def end(self):      """End the flow."""      pass   if __name__ == '__main__':  GenreStatsFlow() 

Betrachten Sie einige wichtige Teile dieses Beispiels. Der Code enthält Kommentare der Form # n , auf die wir uns weiter unten beziehen werden.

  • 1 in 1 im start auf den Parameter foreach . Dank dessen werden Kopien der compute_statistics Schritte for each Eintrag in der genres Liste in einer Schleife compute_statistics .
  • In 2 @catch(var='compute_failed') 2 @catch(var='compute_failed') Dekorator @catch(var='compute_failed') alle Ausnahmen ab, compute_statistics Schritt compute_statistics , und schreibt sie in die Variable compute_failed (sie kann im nächsten Schritt gelesen werden).
  • In 3 @retry(times=1) Dekorator @retry(times=1) genau das, worauf sein Name hinweist. Wenn nämlich Fehler auftreten, wiederholt er den Schritt.
  • Woher kommt in 4 in compute_statistics self.input ? Die Sache ist, dass die input eine von Metaflow bereitgestellte Klassenvariable ist. Es enthält Daten, die für eine bestimmte Instanz von compute_statistics (wenn mehrere Kopien einer Funktion gleichzeitig ausgeführt werden). Diese Variable wird von Metaflow nur hinzugefügt, wenn Knoten durch mehrere parallele Prozesse dargestellt werden oder wenn mehrere Knoten kombiniert werden.
  • Hier ist ein Beispiel für die Ausführung derselben Funktion in parallel - compute_statistics . Bei Bedarf können Sie aber auch völlig unterschiedliche Funktionen gleichzeitig ausführen, die nicht miteinander zusammenhängen. Ändern Sie dazu die Darstellung in 1 in self.next(self.func1, self.function2, self.function3) . Natürlich muss bei diesem Ansatz auch der join , damit die Ergebnisse verschiedener Funktionen darauf verarbeitet werden können.

So stellen Sie sich die oben genannte Skelettklasse vor.


Visuelle Darstellung der GenreStatsFlow-Klasse

▍Datendatei lesen und Parameter übertragen


  • Laden Sie diese CSV-Filmdatei herunter.
  • Jetzt müssen Sie das Programm mit Unterstützung für die Möglichkeit ausrüsten, den Pfad zur Datei movie_data und den Wert max_genres dynamisch an die movie_data zu max_genres . Der Mechanismus externer Argumente wird uns dabei helfen. Mit Metaflow können Sie Argumente an das Programm übergeben, indem Sie zusätzliche Flags im Workflow-Startbefehl verwenden. Es könnte beispielsweise so aussehen: python3 tutorial_flow.py run --movie_data=path/to/movies.csv --max_genres=5 .
  • Metaflow stellt dem Entwickler IncludeFile und Parameter Objekte zur Verfügung, mit denen Sie die Eingabe im Workflow-Code lesen können. Wir verweisen auf die Argumente, die beim Zuweisen von IncludeFile und Parameter Objekten zu Klassenvariablen übergeben werden. Es kommt darauf an, was genau wir lesen wollen - die Datei oder den üblichen Wert.

Der Code sieht so aus, als würde er die Parameter lesen, die an das Programm übergeben wurden, als es über die Befehlszeile gestartet wurde:

     movie_data = IncludeFile("movie_data",                             help="The path to a movie metadata file.",                             default = 'movies.csv')                               max_genres = Parameter('max_genres',                help="The max number of genres to return statistics for",                default=5) 

▍Einbeziehung von Conda in die Auftragskette


  • Wenn Sie conda noch nicht installiert haben, lesen Sie den Abschnitt zum Installieren und Konfigurieren von conda in diesem Artikel.
  • Fügen GenreStatsFlow der GenreStatsFlow-Klasse den von GenreStatsFlow bereitgestellten Dekorator @conda_base hinzu. Dieser Dekorateur erwartet die Python-Version. Sie kann entweder im Code eingestellt oder über eine Hilfsfunktion abgerufen werden. Unten ist der Code, der die Verwendung des Dekorators demonstriert und eine Hilfsfunktion zeigt.

     def get_python_version():    """     ,    python,       .         conda        python.    """    import platform    versions = {'2' : '2.7.15',                '3' : '3.7.4'}    return versions[platform.python_version_tuple()[0]] #       python. @conda_base(python=get_python_version()) class GenreStatsFlow(FlowSpec): 
  • Sie können jetzt den @conda Dekorator zu jedem Schritt in der Task-Kette hinzufügen. Es erwartet ein Objekt mit Abhängigkeiten, das über den Parameter libraries an das Objekt übergeben wird. Bevor der Schritt gestartet wird, übernimmt Metaflow die Aufgabe, den Container mit den angegebenen Abhängigkeiten vorzubereiten. Bei Bedarf können Sie verschiedene Versionen von Paketen sicher in verschiedenen Schritten verwenden, da Metaflow jeden Schritt in einem separaten Container startet.

         @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self): 
  • python3 tutorial_flow.py --environment=conda run nun den folgenden Befehl aus: python3 tutorial_flow.py --environment=conda run .

▍Starten Sie die Implementierung


 @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """        import pandas        from io import StringIO        #      Pandas.        self.dataframe = pandas.read_csv(StringIO(self.movie_data))        #   'genres'      .         #   .        self.genres = {genre for genres \                       in self.dataframe['genres'] \                       for genre in genres.split('|')}        self.genres = list(self.genres)        #        .        #  'foreach'             #          self.next(self.compute_statistics, foreach='genres') 

Betrachten Sie einige der Funktionen dieses Codes:

  • Beachten Sie, dass sich der Pandas-Import-Ausdruck in der Funktion befindet, die den Schritt beschreibt. Tatsache ist, dass diese Abhängigkeit von conda nur im Rahmen dieses Schritts eingeführt wird.
  • Die hier deklarierten Variablen ( dataframe und genres ) sind jedoch auch im Code der nach diesem Schritt ausgeführten Schritte verfügbar. Der Punkt ist, dass Metaflow auf der Grundlage der Prinzipien der Trennung von Code-Ausführungsumgebungen arbeitet, aber es Daten ermöglicht, sich auf natürliche Weise zwischen den Schritten der Task-Kette zu bewegen.

▍ Implementierung des Schritts compute_statistics


 @catch(var='compute_failed')    @retry    @conda(libraries={'pandas' : '0.25.3'})    @step    def compute_statistics(self):        """            .        """        #             # 'input'.        self.genre = self.input        print("Computing statistics for %s" % self.genre)        #         ,         #        .        selector = self.dataframe['genres'].\                   apply(lambda row: self.genre in row)        self.dataframe = self.dataframe[selector]        self.dataframe = self.dataframe[['movie_title', 'genres', 'gross']]        #     gross   .        points = [.25, .5, .75]        self.quartiles = self.dataframe['gross'].quantile(points).values        #  ,    .        self.next(self.join) 

Bitte beachten Sie, dass wir uns in diesem Schritt auf die dataframe beziehen, die im vorherigen start deklariert wurde. Wir ändern diese Variable. Wenn Sie mit den nächsten Schritten dataframe , können Sie mit diesem Ansatz, der die Verwendung eines neuen modifizierten dataframe impliziert, die effiziente Arbeit mit Daten organisieren.

▍ Implementieren Sie den Join-Schritt


 @conda(libraries={'pandas' : '0.25.3'})    @step    def join(self, inputs):        """               .        """        inputs = inputs[0:self.max_genres]        #   ,    .        self.genre_stats = {inp.genre.lower(): \                            {'quartiles': inp.quartiles,                             'dataframe': inp.dataframe} \                            for inp in inputs}        self.next(self.end) 

Hier sind einige Punkte hervorzuheben:

  • In diesem Schritt verwenden wir eine völlig andere Version der Pandas-Bibliothek.
  • Jedes Element im compute_statistics ist eine Kopie der zuvor ausgeführten compute_statistics . Es enthält den Zustand des entsprechenden Funktionslaufs, also die Werte verschiedener Variablen. input[0].quartiles können also Quartile für das comedy Genre enthalten, und input[1].quartiles input[0].quartiles können Quartile für das sci-fi Genre enthalten.

▍Fertiger Entwurf


Den vollständigen Projektcode, den wir gerade überprüft haben, finden Sie hier .

Um zu sehen, wie der in der Datei tutorial_flow.py beschriebene Workflow tutorial_flow.py , müssen Sie den folgenden Befehl ausführen:

 python3 tutorial_flow.py --environment=conda show 

Verwenden Sie den folgenden Befehl, um den Workflow zu starten:

 python3 tutorial_flow.py --environment=conda run --movie_data=path/to/movies.csv --max_genres=7 

Untersuchen der Ergebnisse der Ausführung eines Workflows mithilfe der Client-API


Sie können die von Metaflow bereitgestellte Client- API verwenden, um Momentaufnahmen von Daten und den Status früherer Starts des Workflows zu überprüfen. Diese API ist ideal, um die Details der in der Jupyter Notebook-Umgebung durchgeführten Experimente zu untersuchen.

Hier ist ein einfaches Beispiel für die Ausgabe der Variablen genre_stats aus den Daten des letzten erfolgreichen Starts von GenreStatsFlow .

 from metaflow import Flow, get_metadata #      print("Using metadata provider: %s" % get_metadata()) #     MovieStatsFlow. run = Flow('GenreStatsFlow').latest_successful_run print("Using analysis from '%s'" % str(run)) genre_stats = run.data.genre_stats print(genre_stats) 

Ausführen von Workflows in der Cloud


Nachdem Sie den Workflow auf einem normalen Computer erstellt und getestet haben, ist es sehr wahrscheinlich, dass Sie den Code in der Cloud ausführen möchten, um die Arbeit zu beschleunigen.

Derzeit unterstützt Metaflow nur die Integration mit AWS. In der folgenden Abbildung sehen Sie eine Zuordnung der von Metaflow verwendeten lokalen und Cloud-Ressourcen.


Metaflow- und AWS-Integration

Um Metaflow mit AWS zu verbinden, müssen Sie die folgenden Schritte ausführen:

  • Zunächst müssen Sie ein einmaliges AWS-Setup durchführen, indem Sie Ressourcen erstellen, mit denen Metaflow arbeiten kann. Dieselben Ressourcen können beispielsweise von Mitgliedern eines Arbeitsteams verwendet werden, die sich gegenseitig die Ergebnisse von Arbeitsabläufen demonstrieren. Entsprechende Anleitungen finden Sie hier. Die Einstellungen sind schnell genug, da Metaflow über eine CloudFormation-Einstellungsvorlage verfügt.
  • Als Nächstes müssen Sie auf dem lokalen Computer den metaflow configure aws und Antworten auf metaflow configure aws eingeben. Mit diesen Daten kann Metaflow cloudbasierte Data Warehouses verwenden.
  • --with batch lokale Workflows in der Cloud zu starten, fügen --with batch dem Workflow- --with batch einfach den --with batch key hinzu. Zum Beispiel könnte es so aussehen: python3 sample_flow.py run --with batch .
  • Um einen hybriden Start des Workflows durchzuführen, @batch einige Schritte lokal und einige in der Cloud auszuführen, müssen Sie den @batch Dekorator zu den Schritten hinzufügen, die in der Cloud ausgeführt werden müssen. Beispiel: @batch(cpu=1, memory=500) .

Zusammenfassung


An dieser Stelle möchte ich einige Metaflow-Funktionen erwähnen, die sowohl die Vor- als auch die Nachteile dieses Frameworks berücksichtigen können:

  • Metaflow ist eng in AWS integriert. In den Framework-Entwicklungsplänen wird jedoch eine größere Anzahl von Cloud-Anbietern unterstützt.
  • Metaflow ist ein Tool, das nur die Befehlszeilenschnittstelle unterstützt. Es hat keine grafische Oberfläche (im Gegensatz zu anderen universellen Frameworks zum Organisieren von Arbeitsprozessen wie Airflow).

Sehr geehrte Leser! Planen Sie die Verwendung von Metaflow?

Source: https://habr.com/ru/post/de482462/


All Articles