تعلم Metaflow في 10 دقائق

Metaflow هو إطار عمل Python تم إنشاؤه في Netflix ويركز على مجال علوم البيانات. وهي مصممة لإنشاء مشاريع تهدف إلى العمل مع البيانات ، وإدارة هذه المشاريع. في الآونة الأخيرة ، نقلتها الشركة إلى فئة المصادر المفتوحة. تم استخدام إطار Metaflow على نطاق واسع داخل Netflix في العامين الماضيين. على وجه الخصوص ، سمح لخفض كبير في الوقت اللازم لإنجاز المشاريع في الإنتاج.



المواد التي نترجمها اليوم هي دليل سريع إلى Metaflow.

ما هو Metaflow؟


يوجد أدناه رسم بياني يوضح تنفيذ إطار عمل Metaflow في Netflix.


تنفيذ Metaflow في Netflix

في نوفمبر 2018 ، تم استخدام هذا الإطار في 134 مشروعًا للشركة.

Metaflow هو إطار لإنشاء وتنفيذ مهام سير عمل علم البيانات. ويتميز الميزات التالية:

  • إدارة الموارد الحاسوبية.
  • مهمة إطلاق الحاويات.
  • إدارة التبعيات الخارجية.
  • الإصدار ، إعادة تنفيذ المهام ، استمرار تنفيذ المهام المعلقة.
  • واجهة برمجة تطبيقات العميل لفحص نتائج المهام التي يمكن استخدامها في بيئة Jupyter Notebook.
  • دعم محلي (على سبيل المثال ، على كمبيوتر محمول) وتنفيذ المهمة عن بُعد (في السحابة). القدرة على التبديل بين هذه الأوضاع.

كتب المستخدم vtuulos على Ycombinator أن Metaflow يمكنه تلقائيًا إنشاء لقطات (لقطات) من الكود والبيانات والتبعيات. يتم وضع كل هذا في مستودع مع عنونة حسب المحتوى ، والتي تعتمد عادةً على S3 ، على الرغم من أن نظام الملفات المحلي مدعوم أيضًا. يتيح لك ذلك مواصلة أداء المهام التي تم إيقافها وإعادة إنتاج النتائج التي تم الحصول عليها مسبقًا واستكشاف كل ما يتعلق بالمهام ، على سبيل المثال ، في Jupyter Notebook.

بشكل عام ، يمكننا القول أن Metaflow يهدف إلى زيادة إنتاجية علماء البيانات. يتم ذلك نظرًا لحقيقة أن الإطار يسمح لهم بالمشاركة حصريًا في العمل مع البيانات ، دون أن يصرف انتباههم عن طريق حل المهام ذات الصلة. بالإضافة إلى ذلك ، تسرع Metaflow من سحب المشاريع بناءً عليها في الإنتاج.


احتياجات عالم البيانات فيما يتعلق بمسؤولياته المباشرة وحل المهام المساعدة المتعلقة بالبنية التحتية التي يتم إجراء العمليات الحسابية عليها

سيناريوهات سير العمل مع Metaflow


فيما يلي بعض سيناريوهات سير العمل التي يمكنك تنظيمها باستخدام Metaflow:

  • التعاون. يريد أحد علماء البيانات مساعدة آخر في العثور على مصدر الخطأ. في نفس الوقت ، يرغب المساعد في تنزيل البيئة بالكامل التي تعمل بها المهمة التي تحطمت على جهاز الكمبيوتر الخاص به.
  • استمرار المهام المتوقفة من المكان الذي توقفت فيه. تم إيقاف بعض المهام بسبب وجود خطأ (أو تم إيقافها عن قصد). تم تصحيح الخطأ (أو تم تحرير الكود). من الضروري إعادة تشغيل المهمة حتى يستمر عملها من المكان الذي فشلت فيه (أو تم إيقافه).
  • تنفيذ المهمة المختلطة. تحتاج إلى تنفيذ خطوة معينة من سير العمل محليًا (ربما هذه هي خطوة تنزيل البيانات من ملف مخزّن في مجلد على الكمبيوتر) ، ويجب تنفيذ خطوة أخرى تتطلب موارد حسابية كبيرة (ربما هذا هو تدريب النموذج) في السحابة.
  • فحص البيانات الوصفية التي تم الحصول عليها بعد الانتهاء من المهمة ثلاثة علماء يشاركون في اختيار المعلمات الفائقة للنموذج نفسه ، في محاولة لتحسين دقة هذا النموذج. بعد ذلك ، تحتاج إلى تحليل نتائج مهام تدريب النموذج واختيار مجموعة من المعلمات الفائقة التي أظهرت أنها الأفضل.
  • باستخدام إصدارات متعددة من نفس الحزمة. تحتاج في المشروع إلى استخدام إصدارات مختلفة ، على سبيل المثال ، مكتبات sklearn. أثناء المعالجة المسبقة ، يكون إصدارها 0.20 مطلوبًا ، وخلال النسخ ، يكون الإصدار 0.22 مطلوبًا.

سير عمل Metaflow النموذجي


النظر في سير عمل Metaflow نموذجي من منظور المفاهيمي والبرمجة.

on النظرة النظرية في سير عمل Metaflow


من وجهة نظر مفاهيمية ، يتم تمثيل سير عمل Metaflow (سلاسل المهام) برسوم بيانية حادة موجهة (DAGs). ستساعدك الرسوم التوضيحية أدناه على فهم هذه الفكرة بشكل أفضل.


الخطية الحلقية الرسم البياني


الرسم البياني الحلقية مع مسارات "متوازية"

تمثل كل عقدة من الرسم البياني خطوة معالجة البيانات في سير العمل.

في كل خطوة من سلسلة المهام ، ينفذ Metaflow كود Python العادي دون أي تغييرات خاصة. يتم تنفيذ الكود في حاويات منفصلة يتم فيها تعبئة الكود مع تبعياته.

يتمثل أحد الجوانب الرئيسية في هندسة Metaflow في حقيقة أنه يتيح لك تنفيذ أي مكتبات خارجية تقريبًا من نظام conda البيئي إلى مشاريع تعتمد عليها دون استخدام المكونات الإضافية. هذا يميز Metaflow عن حلول أخرى للأغراض العامة مماثلة. على سبيل المثال - من Airflow.

work سير عمل Metaflow من حيث البرمجة


يمكن تمثيل كل سلسلة مهام (دفق) كفئة Python قياسية (عادةً ما تحتوي أسماء هذه الفئات على كلمة Flow ) إذا كانت تفي بالمتطلبات الدنيا التالية:

  • الفئة هي سليل فئة FlowSpec .
  • كل وظيفة تمثل خطوة في سلسلة المهام لها @step الديكور @step .
  • في نهاية كل وظيفة @step ، يجب أن يكون هناك إشارة إلى وظيفة مماثلة تتبعها. يمكن القيام بذلك باستخدام بنية من هذا النوع: self.next(self.function_name_here) .
  • ينفذ الفصل وظائف start end .

النظر في مثال على سلسلة من المهام الحد الأدنى تتكون من ثلاث نقاط.

مخططها يشبه هذا:

 start → process_message → end 

هنا كودها:

 from metaflow import FlowSpec, step class LinearFlow(FlowSpec):         """     ,      Metaflow.    """       #         @step    def start(self):        self.message = 'Thanks for reading.'        self.next(self.process_message)    @step    def process_message(self):        print('the message is: %s' % self.message)        self.next(self.end)    @step    def end(self):        print('the message is still: %s' % self.message) if __name__ == '__main__':    LinearFlow() 

تعليمات تركيب Metaflow


and التثبيت والتشغيل التجريبي


فيما يلي تسلسل الخطوات التي تحتاج إلى إكمالها لتثبيت Metaflow وتشغيلها لأول مرة:

  • تثبيت Metaflow (بيثون 3 مستحسن): pip3 install metaflow .
  • ضع جزء الشفرة أعلاه ( هنا على GitHub) في ملف linear_flow.py .
  • لإلقاء نظرة على بنية سلسلة المهام التي تنفذها هذه الشفرة ، استخدم أمر python3 linear_flow.py show .
  • لبدء الدفق ، python3 linear_flow.py run .

يجب أن تحصل على شيء مشابه لما هو موضح أدناه.


الناجحة Metaflow الصحة تحقق

هنا يجدر الانتباه إلى بعض الأشياء. ينشئ إطار عمل .metaflow بيانات محلي .metaflow . هناك ، يقوم بتخزين جميع البيانات الوصفية المتعلقة بتنفيذ المهام واللقطات المرتبطة بجلسات تنفيذ المهمة. إذا قمت بتكوين إعدادات Metaflow المتعلقة بالتخزين السحابي ، فسيتم تخزين اللقطات في AWS S3 Bucket ، وستذهب البيانات الوصفية المتعلقة بإطلاق المهام إلى خدمة Metadata ، استنادًا إلى RDS (مخزن البيانات Relational ، ومخزن البيانات الارتباطي). سنتحدث لاحقًا عن كيفية استكشاف هذه البيانات التعريفية باستخدام واجهة برمجة تطبيقات العميل. تافه آخر ، على الرغم من أهميته ، والذي يستحق الاهتمام به ، هو أن معرفات العملية (معرف المنتج ، معرفات العملية) المرتبطة بالخطوات المختلفة تختلف. تذكر - قلنا أعلاه أن Metaflow تقوم بشكل مستقل بتعبئة كل خطوة في سلسلة المهام وتنفذ كل خطوة في بيئتها الخاصة (تمرير البيانات فقط بين الخطوات).

and تثبيت وتكوين conda (إذا كنت تخطط لتنفيذ التبعيات)


اتبع هذه الخطوات لتثبيت conda:

  • قم بتنزيل وتثبيت Miniconda.
  • إضافة قناة conda config --add channels conda-forge مع الأمر conda config --add channels conda-forge .

أنت الآن جاهز لتضمين تبعيات كوندا في سلاسل المهام الخاصة بك. ستتم مناقشة تفاصيل هذه العملية أدناه.

مثال لسير العمل الواقعي


أعلاه ، تحدثنا عن كيفية تثبيت Metaflow ، وكيفية التأكد من تشغيل النظام. بالإضافة إلى ذلك ، ناقشنا أساسيات هندسة سير العمل ونظرنا إلى مثال بسيط. هنا نلقي نظرة على مثال أكثر تعقيدًا ، بينما نكشف عن بعض مفاهيم Metaflow.

▍Zadanie


قم بإنشاء سير عمل باستخدام Metaflow الذي يقوم بتنفيذ الوظائف التالية:

  • تحميل بيانات فيلم CSV في إطار بيانات Pandas.
  • حساب مواز للرباعيات للأنواع.
  • حفظ القاموس مع نتائج العمليات الحسابية.

▍ سلسلة المهام


يظهر الهيكل العظمي للفئة GenreStatsFlow . بعد تحليلها ، ستفهم جوهر النهج المطبق هنا لحل مشكلتنا.

 from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter class GenreStatsFlow(FlowSpec):  """    ,  ,   .         :    1)  CSV-   Pandas.    2)     .    3)     .  """   @step  def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """       # TODO:  CSV         self.genres = []    self.next(self.compute_statistics, foreach='genres') #  1     @catch(var='compute_failed') #  2  @retry(times=1) #  3  @step  def compute_statistics(self):    """    .   ."""    self.genre = self.input #  4    # TODO:        self.next(self.join)     @step  def join(self, inputs):    """       ."""    # TODO:      self.next(self.end)     @step  def end(self):      """End the flow."""      pass   if __name__ == '__main__':  GenreStatsFlow() 

النظر في بعض أجزاء مهمة من هذا المثال. تحتوي الكود على تعليقات على النموذج # n ، والذي سنشير إليه أدناه.

  • في 1 ، في خطوة start ، انتبه إلى المعلمة foreach . بفضله ، يتم compute_statistics نسخ من خطوات compute_statistics for each حلقة for each إدخال في قائمة genres .
  • في 2 @catch(var='compute_failed') مصمم @catch(var='compute_failed') بالقبض على أي استثناء compute_statistics خطوة compute_statistics إلى المتغير compute_failed (يمكن قراءته في الخطوة التالية).
  • في 3 مصمم الديكور @retry(times=1) بما يلمح اسمه بالضبط. وهي عندما تحدث الأخطاء ، يكرر هذه الخطوة.
  • من أين جاء 4 ، في compute_statistics ، من self.input ؟ الشيء ، هو input متغير فئة المقدمة من Metaflow. أنه يحتوي على بيانات قابلة للتطبيق على مثيل معين من compute_statistics (عندما يكون هناك نسخ متعددة من وظيفة تنفذ بالتوازي). تتم إضافة هذا المتغير بواسطة Metaflow فقط عندما يتم تمثيل العقد بعدة عمليات متوازية ، أو عندما يتم دمج عدة عقد.
  • فيما يلي مثال على تشغيل نفس الوظيفة بالتوازي - compute_statistics . لكن ، إذا لزم الأمر ، يمكنك تشغيل وظائف مختلفة تمامًا لا ترتبط ببعضها البعض. للقيام بذلك ، قم بتغيير ما يظهر في 1 إلى شيء مثل self.next(self.func1, self.function2, self.function3) . بالطبع ، مع هذا النهج ، سيكون من الضروري إعادة كتابة خطوة join أيضًا ، مما يتيح معالجة نتائج الوظائف المختلفة عليه.

إليك كيفية تخيل فئة الهيكل العظمي أعلاه.


التمثيل المرئي للفئة GenreStatsFlow

file قراءة ملف البيانات ونقل المعلمات


  • قم بتنزيل ملف الفيلم CSV هذا .
  • أنت الآن بحاجة إلى تزويد البرنامج بدعم لإمكانية نقل المسار إلى ملف max_genres وقيمة max_genres إلى max_genres . آلية الحجج الخارجية ستساعدنا في هذا. يتيح لك Metaflow تمرير الوسائط إلى البرنامج باستخدام إشارات إضافية في أمر بدء سير العمل. على سبيل المثال ، قد يبدو كالتالي: python3 tutorial_flow.py run --movie_data=path/to/movies.csv --max_genres=5 .
  • يمنح Metaflow مطور IncludeFile و Parameter للكائنات التي تتيح لك قراءة الإدخال في رمز سير العمل. نشير إلى الوسائط التي تم تمريرها عن طريق تعيين كائنات IncludeFile و Parameter لمتغيرات الفئة. يعتمد الأمر على ما نريد أن نقرأه بالضبط - الملف ، أو القيمة المعتادة.

إليك كيف يبدو الرمز في قراءة المعلمات التي تم تمريرها إلى البرنامج عندما تم إطلاقه من سطر الأوامر:

     movie_data = IncludeFile("movie_data",                             help="The path to a movie metadata file.",                             default = 'movies.csv')                               max_genres = Parameter('max_genres',                help="The max number of genres to return statistics for",                default=5) 

cond إدراج كوندا في سلسلة المهام


  • إذا لم تكن قد قمت بتثبيت conda بعد ، فراجع القسم الخاص بتثبيت وتكوين conda في هذه المقالة.
  • أضف GenreStatsFlow decorator المقدمة من Metaflow إلى فئة GenreStatsFlow. يتوقع أن يحصل هذا الديكور على إصدار الثعبان. يمكن ضبطه إما في الكود ، أو الحصول عليه باستخدام وظيفة مساعدة. أدناه هو الرمز الذي يوضح استخدام الديكور ويظهر وظيفة المساعد.

     def get_python_version():    """     ,    python,       .         conda        python.    """    import platform    versions = {'2' : '2.7.15',                '3' : '3.7.4'}    return versions[platform.python_version_tuple()[0]] #       python. @conda_base(python=get_python_version()) class GenreStatsFlow(FlowSpec): 
  • يمكنك الآن إضافة @conda إلى أي خطوة في سلسلة المهام. تتوقع كائنًا بتبعيات يتم تمريره إليه من خلال معلمة libraries . سيتولى Metaflow ، قبل بدء الخطوة ، مهمة إعداد الحاوية باستخدام التبعيات المحددة. إذا لزم الأمر ، يمكنك استخدام إصدارات مختلفة من الحزم بأمان في خطوات مختلفة ، حيث يقوم Metaflow بتشغيل كل خطوة في حاوية منفصلة.

         @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self): 
  • الآن قم بتنفيذ الأمر التالي: python3 tutorial_flow.py --environment=conda run .

implementationStep بداية التنفيذ


 @conda(libraries={'pandas' : '0.24.2'})    @step    def start(self):    """         :        1)      Pandas.        2)    .        3)        .    """        import pandas        from io import StringIO        #      Pandas.        self.dataframe = pandas.read_csv(StringIO(self.movie_data))        #   'genres'      .         #   .        self.genres = {genre for genres \                       in self.dataframe['genres'] \                       for genre in genres.split('|')}        self.genres = list(self.genres)        #        .        #  'foreach'             #          self.next(self.compute_statistics, foreach='genres') 

النظر في بعض الميزات من هذا الرمز:

  • لاحظ أن تعبير استيراد الباندا موجود داخل الوظيفة التي تصف الخطوة. والحقيقة هي أن هذا الاعتماد يتم تقديمه بواسطة conda فقط في نطاق هذه الخطوة.
  • لكن المتغيرات المعلنة هنا ( dataframe genres ) متوفرة حتى في رمز الخطوات المنجزة بعد هذه الخطوة. النقطة المهمة هي أن Metaflow تعمل على أساس مبادئ فصل بيئات تنفيذ التعليمات البرمجية ، ولكنها تتيح نقل البيانات بشكل طبيعي بين خطوات سلسلة المهام.

▍ تنفيذ خطوة الحوسبة الإحصائية


 @catch(var='compute_failed')    @retry    @conda(libraries={'pandas' : '0.25.3'})    @step    def compute_statistics(self):        """            .        """        #             # 'input'.        self.genre = self.input        print("Computing statistics for %s" % self.genre)        #         ,         #        .        selector = self.dataframe['genres'].\                   apply(lambda row: self.genre in row)        self.dataframe = self.dataframe[selector]        self.dataframe = self.dataframe[['movie_title', 'genres', 'gross']]        #     gross   .        points = [.25, .5, .75]        self.quartiles = self.dataframe['gross'].quantile(points).values        #  ,    .        self.next(self.join) 

يرجى ملاحظة أننا في هذه الخطوة نشير إلى متغير dataframe الذي تم إعلانه في خطوة start السابقة. نحن نقوم بتعديل هذا المتغير. عند الانتقال إلى الخطوات التالية ، يتيح لك هذا النهج ، الذي يتضمن استخدام كائن dataframe جديد معدّل ، تنظيم العمل الفعال مع البيانات.

▍ تطبيق خطوة الربط


 @conda(libraries={'pandas' : '0.25.3'})    @step    def join(self, inputs):        """               .        """        inputs = inputs[0:self.max_genres]        #   ,    .        self.genre_stats = {inp.genre.lower(): \                            {'quartiles': inp.quartiles,                             'dataframe': inp.dataframe} \                            for inp in inputs}        self.next(self.end) 

هنا يستحق تسليط الضوء على بضع نقاط:

  • في هذه الخطوة ، نستخدم إصدارًا مختلفًا تمامًا من مكتبة الباندا.
  • كل عنصر في صفيف inputs هو نسخة من compute_statistics المنفذة سابقًا. أنه يحتوي على حالة تشغيل الدالة المقابلة ، أي قيم المتغيرات المختلفة. لذلك ، يمكن أن تحتوي input[0].quartiles على input[0].quartiles لنوع comedy ، ويمكن أن تحتوي input[1].quartiles input[0].quartiles على رباعيات لنوع sci-fi .

draft مسودة جاهزة


يمكن العثور على رمز المشروع الكامل الذي استعرضناه للتو هنا .

من أجل إلقاء نظرة على كيفية سير العمل الموضح في ملف tutorial_flow.py ، تحتاج إلى تشغيل الأمر التالي:

 python3 tutorial_flow.py --environment=conda show 

استخدم الأمر التالي لبدء سير العمل:

 python3 tutorial_flow.py --environment=conda run --movie_data=path/to/movies.csv --max_genres=7 

فحص نتائج تشغيل سير العمل باستخدام API العميل


من أجل فحص لقطات البيانات وحالة عمليات الإطلاق السابقة لسير العمل ، يمكنك استخدام واجهة برمجة تطبيقات العميل المقدمة من Metaflow. تعد واجهة برمجة التطبيقات هذه مثالية لاستكشاف تفاصيل التجارب التي أجريت في بيئة Jupyter Notebook.

فيما يلي مثال بسيط لإخراج المتغير genre_stats ، مأخوذ من بيانات آخر إطلاق ناجح لـ GenreStatsFlow .

 from metaflow import Flow, get_metadata #      print("Using metadata provider: %s" % get_metadata()) #     MovieStatsFlow. run = Flow('GenreStatsFlow').latest_successful_run print("Using analysis from '%s'" % str(run)) genre_stats = run.data.genre_stats print(genre_stats) 

تشغيل سير العمل في السحابة


بعد قيامك بإنشاء سير العمل واختباره على جهاز كمبيوتر عادي ، من المحتمل أنك تريد تشغيل التعليمات البرمجية في السحابة لتسريع العمل.

حاليًا ، يدعم Metaflow التكامل مع AWS فقط. في الصورة التالية ، يمكنك رؤية مناظرة للموارد السحابية المحلية والمستخدمة بواسطة Metaflow.


Metaflow والتكامل AWS

لتوصيل Metaflow بـ AWS ، يجب عليك إكمال الخطوات التالية:

  • تحتاج أولاً إلى القيام بإعداد AWS لمرة واحدة عن طريق إنشاء موارد يمكن أن يعمل معها Metaflow. يمكن استخدام نفس الموارد ، على سبيل المثال ، من قبل أعضاء فريق العمل الذين يوضحون لبعضهم البعض نتائج سير العمل. يمكنك العثور على التعليمات ذات الصلة هنا. الإعدادات سريعة بما يكفي ، لأن Metaflow يحتوي على قالب إعدادات CloudFormation.
  • بعد ذلك ، على الكمبيوتر المحلي ، تحتاج إلى تشغيل metaflow configure aws وإدخال إجابات على أسئلة النظام. باستخدام هذه البيانات ، سيتمكن Metaflow من استخدام مستودعات البيانات المستندة إلى مجموعة النظراء.
  • الآن ، لبدء سير العمل المحلي في السحابة ، ما عليك سوى إضافة مفتاح --with batch إلى --with batch بدء سير العمل. على سبيل المثال ، قد يبدو كالتالي: python3 sample_flow.py run --with batch .
  • من أجل القيام بإطلاق مختلط لسير العمل ، أي القيام ببعض الخطوات محليًا ، وبعضها في السحابة ، تحتاج إلى إضافة @batch decorator إلى تلك الخطوات التي يلزم تنفيذها في السحابة. على سبيل المثال ، مثل هذا: @batch(cpu=1, memory=500) .

النتائج


هنا ، أود أن أشير إلى بعض ميزات Metaflow التي يمكن اعتبارها مزايا وعيوب هذا الإطار:

  • تم دمج Metaflow بإحكام مع AWS. ولكن في خطط تطوير الإطار ، يوجد دعم لعدد أكبر من موفري الخدمات السحابية.
  • Metaflow هي أداة تدعم واجهة سطر الأوامر فقط. لا يحتوي على واجهة رسومية (على عكس الأطر العالمية الأخرى لتنظيم عمليات العمل ، مثل Airflow).

أعزائي القراء! هل تخطط لاستخدام Metaflow؟

Source: https://habr.com/ru/post/ar482462/


All Articles