Metaflow ist ein Python-Framework, das in Netflix erstellt wurde und sich auf den Bereich Data Science konzentriert. Es soll nämlich Projekte erstellen, die auf die Arbeit mit Daten abzielen, und solche Projekte verwalten. Das Unternehmen hat es kürzlich in die Kategorie Open Source überführt. Das Metaflow-Framework wurde in den letzten 2 Jahren in Netflix häufig verwendet. Insbesondere konnte er den Zeitaufwand für den Abschluss von Projekten in der Produktion deutlich reduzieren.

Das Material, das wir heute übersetzen, ist eine Kurzanleitung für Metaflow.
Was ist Metaflow?
Nachfolgend sehen Sie ein Diagramm, das die Implementierung des Metaflow-Frameworks in Netflix veranschaulicht.
Metaflow-Implementierung in NetflixIm November 2018 wurde dieses Framework in 134 Projekten des Unternehmens eingesetzt.
Metaflow ist ein Framework zum Erstellen und Ausführen von Data Science-Workflows. Es bietet die folgenden Funktionen:
- Computing-Ressourcenverwaltung.
- Start der containerisierten Aufgabe.
- Externe Abhängigkeiten verwalten.
- Versionierung, erneutes Ausführen von Aufgaben, Fortsetzung der Ausführung angehaltener Aufgaben.
- Client-API zum Untersuchen der Ergebnisse von Aufgaben, die in der Jupyter Notebook-Umgebung verwendet werden können.
- Unterstützung für die Ausführung lokaler (z. B. auf einem Laptop) und entfernter (in der Cloud) Aufgaben. Möglichkeit, zwischen diesen Modi zu wechseln.
Benutzer vtuulos
schrieb auf Ycombinator, dass Metaflow automatisch Snapshots (Snapshots) von Code, Daten und Abhängigkeiten erstellen kann. All dies wird in einem Repository mit inhaltlicher Adressierung abgelegt, das in der Regel auf S3 basiert, obwohl auch das lokale Dateisystem unterstützt wird. Auf diese Weise können Sie angehaltene Aufgaben fortsetzen, zuvor erhaltene Ergebnisse reproduzieren und alle mit Aufgaben zusammenhängenden Aufgaben untersuchen, z. B. im Jupyter-Notizbuch.
Generell können wir sagen, dass Metaflow darauf abzielt, die Produktivität von Datenwissenschaftlern zu steigern. Dies geschieht aufgrund der Tatsache, dass das Framework es ihnen ermöglicht, ausschließlich mit Daten zu arbeiten, ohne durch die Lösung verwandter Aufgaben abgelenkt zu werden. Darüber hinaus beschleunigt Metaflow die Rücknahme von darauf basierenden Projekten in der Produktion.
Die Bedürfnisse eines Datenwissenschaftlers bezogen sich auf seine direkten Verantwortlichkeiten und die Lösung von Hilfsaufgaben im Zusammenhang mit der Infrastruktur, auf der die Berechnungen durchgeführt werdenWorkflow-Szenarien mit Metaflow
Hier sind einige Workflow-Szenarien, die Sie mit Metaflow organisieren können:
- Zusammenarbeit Ein Datenwissenschaftler möchte einem anderen helfen, die Fehlerquelle zu finden. Gleichzeitig möchte der Assistent die gesamte Umgebung, in der die abgestürzte Aufgabe funktioniert hat, auf seinen Computer herunterladen.
- Fortsetzung der gestoppten Aufgaben von der Stelle, an der sie gestoppt wurden. Einige Aufgaben wurden mit einem Fehler abgebrochen (oder absichtlich abgebrochen). Der Fehler wurde behoben (oder der Code wurde bearbeitet). Der Task muss neu gestartet werden, damit seine Arbeit an der Stelle fortgesetzt werden kann, an der er fehlgeschlagen ist (oder gestoppt wurde).
- Hybrid-Task-Ausführung. Sie müssen einen bestimmten Schritt des Workflows lokal ausführen (möglicherweise ist dies der Schritt des Herunterladens von Daten aus einer Datei, die in einem Ordner auf dem Computer gespeichert ist), und ein weiterer Schritt, der große Rechenressourcen erfordert (möglicherweise wird das Modell trainiert), sollte in der Cloud ausgeführt werden.
- Prüfung der nach Erledigung einer Aufgabe erhaltenen Metadaten. Drei Wissenschaftler beschäftigen sich mit der Auswahl von Hyperparametern desselben Modells, um die Genauigkeit dieses Modells zu verbessern. Danach müssen Sie die Ergebnisse der Ausführung der Aufgaben zum Trainieren des Modells analysieren und den Satz von Hyperparametern auswählen, der sich als der beste erwiesen hat.
- Verwenden mehrerer Versionen desselben Pakets. Im Projekt müssen Sie verschiedene Versionen verwenden, z. B. sklearn-Bibliotheken. Während der Vorverarbeitung ist die Version 0.20 und während der Modellierung die Version 0.22 erforderlich.
Typischer Metaflow-Workflow
Betrachten Sie einen typischen Metaflow-Workflow aus konzeptioneller und programmtechnischer Sicht.
▍Konzeptioneller Blick auf den Metaflow-Workflow
Aus konzeptioneller Sicht werden Metaflow-Workflows (Taskketten) durch
gerichtete azyklische Diagramme (DAGs) dargestellt. Die folgenden Abbildungen helfen Ihnen, diese Idee besser zu verstehen.
Linearer azyklischer GraphAzyklischer Graph mit "parallelen" PfadenJeder Knoten des Diagramms repräsentiert einen Datenverarbeitungsschritt im Workflow.
Bei jedem Schritt der Task-Kette führt Metaflow regulären Python-Code ohne besondere Änderungen aus. Der Code wird in separaten Containern ausgeführt, in die der Code zusammen mit seinen Abhängigkeiten gepackt wird.
Ein wesentlicher Aspekt der Metaflow-Architektur ist die Tatsache, dass Sie nahezu alle externen Bibliotheken aus dem Conda-Ökosystem in darauf basierende Projekte implementieren können, ohne Plugins zu verwenden. Dies unterscheidet Metaflow von anderen ähnlichen Allzwecklösungen. Zum Beispiel - von Airflow.
▍ Metaflow-Workflow in Bezug auf die Programmierung
Jede Task-Kette (Stream) kann als Standard-Python-Klasse dargestellt werden (in den Namen solcher Klassen steht normalerweise das Wort
Flow
), wenn sie die folgenden Mindestanforderungen erfüllt:
- Die Klasse ist der Nachfolger der Metaflow
FlowSpec
Klasse. - Auf jede Funktion, die einen Schritt in der
@step
Dekorator @step
. - Am Ende jeder
@step
Funktion muss ein Hinweis auf eine ähnliche Funktion stehen, die darauf folgt. Dies kann mit einem Konstrukt dieser Art self.next(self.function_name_here)
: self.next(self.function_name_here)
. - Die Klasse implementiert die
start
und Endfunktionen.
Betrachten Sie ein Beispiel für eine minimale Aufgabenkette, die aus drei Knoten besteht.
Ihr Schema sieht so aus:
start → process_message → end
Hier ist ihr Code:
from metaflow import FlowSpec, step class LinearFlow(FlowSpec): """ , Metaflow. """
Installationsanleitung für Metaflow
▍Installation und Probelauf
Die folgenden Schritte müssen ausgeführt werden, um Metaflow zum ersten Mal zu installieren und zu starten:
- Installieren Sie Metaflow (Python 3 wird empfohlen):
pip3 install metaflow
. linear_flow.py
Sie das obige Codefragment ( hier auf GitHub) in die Datei linear_flow.py
.- Verwenden Sie den Befehl
python3 linear_flow.py show
, um die Architektur der von diesem Code implementierten python3 linear_flow.py show
. python3 linear_flow.py run
zum Starten des Streams den python3 linear_flow.py run
.
Sie sollten etwas Ähnliches wie das unten gezeigte erhalten.
Erfolgreicher Metaflow Health CheckHier lohnt es sich, auf einige Dinge zu achten. Das Metaflow-Framework erstellt ein lokales
.metaflow
Data
.metaflow
. Dort werden alle Metadaten im Zusammenhang mit der Ausführung von Aufgaben sowie Momentaufnahmen gespeichert, die mit Sitzungen zur Ausführung von Aufgaben verknüpft sind. Wenn Sie Metaflow-Einstellungen für den Cloud-Speicher konfiguriert haben, werden Snapshots in AWS S3 Bucket gespeichert, und Metadaten für Aufgabenstarts werden basierend auf RDS (Relational Data Store, relationaler Datenspeicher) an den Metadatendienst gesendet. Später werden wir darüber sprechen, wie diese Metadaten mithilfe der Client-API untersucht werden. Eine andere Kleinigkeit, obwohl wichtig, die es wert ist, beachtet zu werden, ist, dass sich die Prozesskennungen (pid, Prozess-IDs), die mit verschiedenen Schritten verknüpft sind, unterscheiden. Denken Sie daran - wir haben oben gesagt, dass Metaflow jeden Schritt der Taskkette unabhängig voneinander containerisiert und jeden Schritt in seiner eigenen Umgebung ausführt (wobei nur Daten zwischen den Schritten übertragen werden).
▍Installation und Konfiguration von conda (wenn Sie Abhängigkeiten implementieren möchten)
Befolgen Sie diese Schritte, um conda zu installieren:
Sie können nun Conda-Abhängigkeiten in Ihre Task-Ketten einbetten. Details dieses Prozesses werden unten diskutiert.
Realistisches Workflow-Beispiel
Oben haben wir darüber gesprochen, wie Sie Metaflow installieren und sicherstellen, dass das System betriebsbereit ist. Darüber hinaus haben wir die Grundlagen der Workflow-Architektur besprochen und ein einfaches Beispiel betrachtet. Hier sehen wir uns ein komplexeres Beispiel an und zeigen einige der Konzepte von Metaflow.
▍Job
Erstellen Sie einen Workflow mit Metaflow, der die folgenden Funktionen implementiert:
- Laden von CSV-Filmdaten in einen Pandas-Datenrahmen.
- Parallele Berechnung von Quartilen für Genres.
- Speichern eines Wörterbuchs mit den Ergebnissen von Berechnungen.
▍ Auftragskette
Das Grundgerüst der
GenreStatsFlow
Klasse ist
GenreStatsFlow
. Nachdem Sie es analysiert haben, werden Sie das Wesentliche des hier implementierten Ansatzes zur Lösung unseres Problems verstehen.
from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter class GenreStatsFlow(FlowSpec): """ , , . : 1) CSV- Pandas. 2) . 3) . """ @step def start(self): """ : 1) Pandas. 2) . 3) . """
Betrachten Sie einige wichtige Teile dieses Beispiels. Der Code enthält Kommentare der Form
# n
, auf die wir uns weiter unten beziehen werden.
1
in 1
im start
auf den Parameter foreach
. Dank dessen werden Kopien der compute_statistics
Schritte for each
Eintrag in der genres
Liste in einer Schleife compute_statistics
.- In
2
@catch(var='compute_failed')
2
@catch(var='compute_failed')
Dekorator @catch(var='compute_failed')
alle Ausnahmen ab, compute_statistics
Schritt compute_statistics
, und schreibt sie in die Variable compute_failed
(sie kann im nächsten Schritt gelesen werden). - In
3
@retry(times=1)
Dekorator @retry(times=1)
genau das, worauf sein Name hinweist. Wenn nämlich Fehler auftreten, wiederholt er den Schritt. - Woher kommt in
4
in compute_statistics
self.input
? Die Sache ist, dass die input
eine von Metaflow bereitgestellte Klassenvariable ist. Es enthält Daten, die für eine bestimmte Instanz von compute_statistics
(wenn mehrere Kopien einer Funktion gleichzeitig ausgeführt werden). Diese Variable wird von Metaflow nur hinzugefügt, wenn Knoten durch mehrere parallele Prozesse dargestellt werden oder wenn mehrere Knoten kombiniert werden. - Hier ist ein Beispiel für die Ausführung derselben Funktion in parallel -
compute_statistics
. Bei Bedarf können Sie aber auch völlig unterschiedliche Funktionen gleichzeitig ausführen, die nicht miteinander zusammenhängen. Ändern Sie dazu die Darstellung in 1
in self.next(self.func1, self.function2, self.function3)
. Natürlich muss bei diesem Ansatz auch der join
, damit die Ergebnisse verschiedener Funktionen darauf verarbeitet werden können.
So stellen Sie sich die oben genannte Skelettklasse vor.
Visuelle Darstellung der GenreStatsFlow-Klasse▍Datendatei lesen und Parameter übertragen
- Laden Sie diese CSV-Filmdatei herunter.
- Jetzt müssen Sie das Programm mit Unterstützung für die Möglichkeit ausrüsten, den Pfad zur Datei
movie_data
und den Wert max_genres
dynamisch an die movie_data
zu max_genres
. Der Mechanismus externer Argumente wird uns dabei helfen. Mit Metaflow können Sie Argumente an das Programm übergeben, indem Sie zusätzliche Flags im Workflow-Startbefehl verwenden. Es könnte beispielsweise so aussehen: python3 tutorial_flow.py run --movie_data=path/to/movies.csv --max_genres=5
. - Metaflow stellt dem Entwickler
IncludeFile
und Parameter
Objekte zur Verfügung, mit denen Sie die Eingabe im Workflow-Code lesen können. Wir verweisen auf die Argumente, die beim Zuweisen von IncludeFile
und Parameter
Objekten zu Klassenvariablen übergeben werden. Es kommt darauf an, was genau wir lesen wollen - die Datei oder den üblichen Wert.
Der Code sieht so aus, als würde er die Parameter lesen, die an das Programm übergeben wurden, als es über die Befehlszeile gestartet wurde:
movie_data = IncludeFile("movie_data", help="The path to a movie metadata file.", default = 'movies.csv') max_genres = Parameter('max_genres', help="The max number of genres to return statistics for", default=5)
▍Einbeziehung von Conda in die Auftragskette
- Wenn Sie conda noch nicht installiert haben, lesen Sie den Abschnitt zum Installieren und Konfigurieren von conda in diesem Artikel.
- Fügen
GenreStatsFlow
der GenreStatsFlow-Klasse den von GenreStatsFlow
bereitgestellten Dekorator @conda_base hinzu. Dieser Dekorateur erwartet die Python-Version. Sie kann entweder im Code eingestellt oder über eine Hilfsfunktion abgerufen werden. Unten ist der Code, der die Verwendung des Dekorators demonstriert und eine Hilfsfunktion zeigt.
def get_python_version(): """ , python, . conda python. """ import platform versions = {'2' : '2.7.15', '3' : '3.7.4'} return versions[platform.python_version_tuple()[0]] # python. @conda_base(python=get_python_version()) class GenreStatsFlow(FlowSpec):
- Sie können jetzt den
@conda
Dekorator zu jedem Schritt in der Task-Kette hinzufügen. Es erwartet ein Objekt mit Abhängigkeiten, das über den Parameter libraries
an das Objekt übergeben wird. Bevor der Schritt gestartet wird, übernimmt Metaflow die Aufgabe, den Container mit den angegebenen Abhängigkeiten vorzubereiten. Bei Bedarf können Sie verschiedene Versionen von Paketen sicher in verschiedenen Schritten verwenden, da Metaflow jeden Schritt in einem separaten Container startet.
@conda(libraries={'pandas' : '0.24.2'}) @step def start(self):
python3 tutorial_flow.py --environment=conda run
nun den folgenden Befehl aus: python3 tutorial_flow.py --environment=conda run
.
▍Starten Sie die Implementierung
@conda(libraries={'pandas' : '0.24.2'}) @step def start(self): """ : 1) Pandas. 2) . 3) . """ import pandas from io import StringIO # Pandas. self.dataframe = pandas.read_csv(StringIO(self.movie_data)) # 'genres' . # . self.genres = {genre for genres \ in self.dataframe['genres'] \ for genre in genres.split('|')} self.genres = list(self.genres) # . # 'foreach' # self.next(self.compute_statistics, foreach='genres')
Betrachten Sie einige der Funktionen dieses Codes:
- Beachten Sie, dass sich der Pandas-Import-Ausdruck in der Funktion befindet, die den Schritt beschreibt. Tatsache ist, dass diese Abhängigkeit von conda nur im Rahmen dieses Schritts eingeführt wird.
- Die hier deklarierten Variablen (
dataframe
und genres
) sind jedoch auch im Code der nach diesem Schritt ausgeführten Schritte verfügbar. Der Punkt ist, dass Metaflow auf der Grundlage der Prinzipien der Trennung von Code-Ausführungsumgebungen arbeitet, aber es Daten ermöglicht, sich auf natürliche Weise zwischen den Schritten der Task-Kette zu bewegen.
▍ Implementierung des Schritts compute_statistics
@catch(var='compute_failed') @retry @conda(libraries={'pandas' : '0.25.3'}) @step def compute_statistics(self): """ . """ # # 'input'. self.genre = self.input print("Computing statistics for %s" % self.genre) # , # . selector = self.dataframe['genres'].\ apply(lambda row: self.genre in row) self.dataframe = self.dataframe[selector] self.dataframe = self.dataframe[['movie_title', 'genres', 'gross']] # gross . points = [.25, .5, .75] self.quartiles = self.dataframe['gross'].quantile(points).values # , . self.next(self.join)
Bitte beachten Sie, dass wir uns in diesem Schritt auf die
dataframe
beziehen, die im vorherigen
start
deklariert wurde. Wir ändern diese Variable. Wenn Sie mit den nächsten Schritten
dataframe
, können Sie mit diesem Ansatz, der die Verwendung eines neuen modifizierten
dataframe
impliziert, die effiziente Arbeit mit Daten organisieren.
▍ Implementieren Sie den Join-Schritt
@conda(libraries={'pandas' : '0.25.3'}) @step def join(self, inputs): """ . """ inputs = inputs[0:self.max_genres] # , . self.genre_stats = {inp.genre.lower(): \ {'quartiles': inp.quartiles, 'dataframe': inp.dataframe} \ for inp in inputs} self.next(self.end)
Hier sind einige Punkte hervorzuheben:
- In diesem Schritt verwenden wir eine völlig andere Version der Pandas-Bibliothek.
- Jedes Element im
compute_statistics
ist eine Kopie der zuvor ausgeführten compute_statistics
. Es enthält den Zustand des entsprechenden Funktionslaufs, also die Werte verschiedener Variablen. input[0].quartiles
können also Quartile für das comedy
Genre enthalten, und input[1].quartiles
input[0].quartiles
können Quartile für das sci-fi
Genre enthalten.
▍Fertiger Entwurf
Den vollständigen Projektcode, den wir gerade überprüft haben, finden Sie
hier .
Um zu sehen, wie der in der Datei
tutorial_flow.py
beschriebene Workflow
tutorial_flow.py
, müssen Sie den folgenden Befehl ausführen:
python3 tutorial_flow.py --environment=conda show
Verwenden Sie den folgenden Befehl, um den Workflow zu starten:
python3 tutorial_flow.py --environment=conda run --movie_data=path/to/movies.csv --max_genres=7
Untersuchen der Ergebnisse der Ausführung eines Workflows mithilfe der Client-API
Sie können die von Metaflow bereitgestellte Client-
API verwenden, um Momentaufnahmen von Daten und den Status früherer Starts des Workflows zu überprüfen. Diese API ist ideal, um die Details der in der Jupyter Notebook-Umgebung durchgeführten Experimente zu untersuchen.
Hier ist ein einfaches Beispiel für die Ausgabe der Variablen
genre_stats
aus den Daten des letzten erfolgreichen Starts von
GenreStatsFlow
.
from metaflow import Flow, get_metadata
Ausführen von Workflows in der Cloud
Nachdem Sie den Workflow auf einem normalen Computer erstellt und getestet haben, ist es sehr wahrscheinlich, dass Sie den Code in der Cloud ausführen möchten, um die Arbeit zu beschleunigen.
Derzeit unterstützt Metaflow nur die Integration mit AWS. In der folgenden Abbildung sehen Sie eine Zuordnung der von Metaflow verwendeten lokalen und Cloud-Ressourcen.
Metaflow- und AWS-IntegrationUm Metaflow mit AWS zu verbinden, müssen Sie die folgenden Schritte ausführen:
- Zunächst müssen Sie ein einmaliges AWS-Setup durchführen, indem Sie Ressourcen erstellen, mit denen Metaflow arbeiten kann. Dieselben Ressourcen können beispielsweise von Mitgliedern eines Arbeitsteams verwendet werden, die sich gegenseitig die Ergebnisse von Arbeitsabläufen demonstrieren. Entsprechende Anleitungen finden Sie hier. Die Einstellungen sind schnell genug, da Metaflow über eine CloudFormation-Einstellungsvorlage verfügt.
- Als Nächstes müssen Sie auf dem lokalen Computer den
metaflow configure aws
und Antworten auf metaflow configure aws
eingeben. Mit diesen Daten kann Metaflow cloudbasierte Data Warehouses verwenden. --with batch
lokale Workflows in der Cloud zu starten, fügen --with batch
dem Workflow- --with batch
einfach den --with batch
key hinzu. Zum Beispiel könnte es so aussehen: python3 sample_flow.py run --with batch
.- Um einen hybriden Start des Workflows durchzuführen,
@batch
einige Schritte lokal und einige in der Cloud auszuführen, müssen Sie den @batch
Dekorator zu den Schritten hinzufügen, die in der Cloud ausgeführt werden müssen. Beispiel: @batch(cpu=1, memory=500)
.
Zusammenfassung
An dieser Stelle möchte ich einige Metaflow-Funktionen erwähnen, die sowohl die Vor- als auch die Nachteile dieses Frameworks berücksichtigen können:
- Metaflow ist eng in AWS integriert. In den Framework-Entwicklungsplänen wird jedoch eine größere Anzahl von Cloud-Anbietern unterstützt.
- Metaflow ist ein Tool, das nur die Befehlszeilenschnittstelle unterstützt. Es hat keine grafische Oberfläche (im Gegensatz zu anderen universellen Frameworks zum Organisieren von Arbeitsprozessen wie Airflow).
Sehr geehrte Leser! Planen Sie die Verwendung von Metaflow?
