مرحبا بالجميع! اسمي Anton ، في Rostelecom أقوم بتطوير مستودع بيانات مركزي. يتكون التخزين لدينا من وحدات ، والتي يستخدم أوركسترا العديد من مثيلات Informatica ، التي نريد نقل بعضها إلى Airflow كجزء من الانتقال إلى حلول مفتوحة المصدر. نظرًا لأن Informatica و Airflow هما أداتان مختلفتان تمامًا ، فإن تنفيذ وتكرار تنفيذ موجود ليس بهذه السهولة. أردنا الحصول على سير عمل ، من ناحية ، أقرب ما يمكن من التنفيذ الحالي ، ومن ناحية أخرى ، استخدم مبدأ Airflow الأول الأكثر إثارة للاهتمام - الديناميكية ، والذي يوفر المرونة.
في هذا المقال القصير ، أريد أن أتحدث عن الجيل الديناميكي حقًا من DAGs في Airflow. في هذا الموضوع ، يحتوي الإنترنت بشكل أساسي على العديد من المقالات من مطورين من الهند ، وهي مواد من النموذج "يمكنك إنشاء علامات بشكل ديناميكي في Airflow ، فيما يلي مثال: <مثال لإنشاء 10 مهام / علامات HelloWorld>" . لكننا كنا مهتمين بتوليد الخفافيش ، والتي سوف تتغير في الوقت المناسب مع عدد متغير وأسماء المهام.

حاليًا ، يتم تطبيق Airflow لإطلاق وحدة نمطية تنشئ حزم بيانات على خوادم المصدر البعيد لمزيد من التحميل إلى المستودع. يتم تشغيله وفقًا لجدول زمني بسيط ؛ ليس من المثير للاهتمام أن ندرسه بالتفصيل. وأيضًا ، سيتم تقديم تزامن قريبًا من خلال وحدة Airflow ، التي توفر حزم البيانات لمزيد من التحميل بواسطة الطبقات في المرحلة المتوسطة. نحن هنا في انتظار سلسلة من أشعل النار ، لم أجد وصفًا لها في أي مكان وأريد أن أشارك تجربتي.
على Airflow on Habré ، يوجد مقالتان من المطورين من Mail.ru حيث يتم وصف الأشياء الأساسية جيدًا:
وصف عام لتدفق الهواء
المتفرعة ، المعلمات عبر jinja ، والتواصل داخل DAG من خلال Xcom
مسرد صغير:
DAG / DAG هو رسم بياني موجه. في هذه الحالة ، نعني سلسلة من الإجراءات التي تعتمد على بعضها البعض ولا تشكل دورات.
SubDAG / Sabdag - هو نفسه DAG ، ولكنه يقع داخل DAG آخر ، تم إطلاقه كجزء من DAG الأصل (أي أنه مهمة) وليس له جدول منفصل.
المشغل / المشغل - خطوة محددة في dag ، تنفيذ إجراء محدد. على سبيل المثال ، PythonOperator.
المهمة / المهمة - يتم عرض مثيل محدد للمشغل عند بدء DAG ، كمربع صغير في واجهة الويب. على سبيل المثال ، PythonOperator ، والذي يسمى run_task ويتم تشغيله في DAG check_dag .
فكرة توليد المهام الديناميكية في الخلل والمشاكل والعيوب
إدخال البيانات:
يوجد جدول في مستودع الأوركسترا ، دعنا نسميه PKG_TABLE.
هناك آلية تضيف إدخالات إلى جدول PKG_TABLE التي تكون حزمة البيانات جاهزة للتنزيل.
ما أردنا:
DAG ، الذي سيتم إنشاؤه للحزم الجاهزة للتنزيل وبدء تنزيلها (المفسد: في النهاية ، تم تشغيل كل شيء).
باستخدام الكود أدناه ، نقوم بإنشاء مجموعة بيانات مؤلفة من مهمة LatestOnlyOperator ومهمتها التابعة ، والتي يتم إنشاؤها عند تشغيل وظيفة pkg_subdag_factory ، والتي تتلقى قائمة من الحزم من جدول PKG_TABLE وتُنشئ العديد من PythonOperators. في حالة عدم وجود حزم للتنزيل ، يتم إنشاء DummyOperator.
قرروا إنشاء الإصدار الأول باستخدام PythonOperator ، وإعادة توزيعه في سير عمل تفصيلي باستخدام Airflow.
توضح لقطات الشاشة التالية كيف يبدو هذا كنتيجة.
ظهور DAG:

ظهور subdag في حالة عدم وجود حزم للتسليم:

ظهور subdag في وجود حزم للتسليم:

مشاكل والفروق الدقيقة
- لم ينجح Catchup كما توقعنا: بعد تشغيل dag مغلقًا ، حدثت عمليات إطلاق متعددة (ليس لكامل فترة الجدول ، ولكن 2-3 في نفس الوقت). لهذا السبب ، اضطررت إلى إضافة LatestOnlyOperator ، بحيث تصبح جميع عمليات الإطلاق باستثناء الإصدار الأخير في وضع الخمول.
- إذا أنشأت نموذجًا فرعيًا ، فأنت بحاجة إلى تمكينه بشكل صريح عبر سطر الأوامر باستخدام الأمر "تدفق الهواء غير مؤقت <subdag_name>" ، وإلا فلن يبدأ تشغيله ، وستحتاج إلى القيام بذلك عند إنشاء كل فئة فرعية جديدة (نموذج فرعي باسم جديد) ، مما سيجعله غير مريح للغاية لإنشاء ديناميكي . إذا قمت بتعيين المعلمة "dags_are_paused_at_creation" = خطأ في تكوين تدفق الهواء ($ airflow_home / airflow.cfg) ، فلن يكون ذلك ضروريًا ، لكن يمكن أن يؤدي ذلك إلى عواقب غير سارة مع الإطلاق التلقائي التلقائي ل dagas جديدة - يبدو لي أنك بحاجة إلى بدء dagas جديدة يدويًا.
كما تقول الوثائق ، "تتمثل إحدى الإمكانيات الرئيسية لـ Airflow في أن عمليات DAG Runs هذه عبارة عن عناصر ذرية ودموية ، <...>" ، مما يعني: "من المفهوم أن يتم إنشاء DAG بدون تغيير." نظرًا لأننا انتهكنا هذه "القدرة الرئيسية" ، فقد تعلمنا بعض الأشياء:
- يبدأ dag فارغة (بدون مهام) ولا يمكن أن ينتهي ، مما يؤدي إلى انسداد جميع أوجه التشابه الممكنة. حدث هذا إذا لم تكن هناك حزم للتنزيل في وقت إطلاق dag. لحل هذا، يتم إنشاء DummyOperator.
- في حالة إعادة إنشاء dag للمهام أثناء العمل ولم تعد هذه المهمة في dag المحدّثة - فستتوقف مع مقاطعة العملية قيد التشغيل. ويحدث هذا مع كل خطوة من خطوات sheduler ، ولكن ليس أكثر مما هو موضح في المعلمة min_file_process_interval في تكوين تدفق الهواء ($ airflow_home / airflow.cfg). للتحايل على ذلك ، قمنا بإنشاء حزم المهام ليس فقط من خلال حالة "جاهز للتنزيل" ، ولكن أيضًا من خلال حالة "التحميل أثناء العملية" بحيث يستمر إنشاءه أثناء عملية التنزيل.
- إذا لم يكن للإصدار الحالي من dag أي مهمة كانت موجودة من قبل - على سبيل المثال ، كانت هناك مهمة تحمل الاسم "pkg_123" تم تحميلها مسبقًا ولم يتم إنشاؤها في الإصدار الحالي من dag ، لا يمكنك مشاهدة إحصائيات عن هذه المهمة في واجهة الويب. على الرغم من أن جميع المعلومات يتم تخزينها في قاعدة بيانات تدفق الهواء وعلى أساسها ، فمن الممكن بناء لوحة تحكم جميلة للإطلاقات القديمة بالوسائل الخارجية. عندما ينشأ سؤال حول وتيرة تحديث DAGs والقدرة على تعطيله ، يمكنك أن تقرأ عنه هنا .
- نظرًا للإنشاء الديناميكي لـ task_id ، من الضروري إلقاء قاموس يحتوي على بيانات لجميع الحزم الحالية في كل مهمة من هذه المهام ، بالإضافة إلى معرف الحزمة الحالية ، بحيث عندما تعمل الوظيفة نفسها ، حدد البيانات اللازمة من نفس القاموس بواسطة معرف الحزمة. خلاف ذلك ، بدأت جميع المهام لنفس الحزمة.
Execution_date في السجلات ووقت البدء الفعلي
سأنتهي بفارق بسيط آخر من Airflow ، والذي يربك في البداية ولا يوصف بكلمات بسيطة في مقالات أخرى - تاريخ التنفيذ (الذي يتم عرضه في جميع السجلات ، في الواجهة ، وما إلى ذلك) ووقت البدء الفعلي. من حيث المبدأ ، الوصف في توثيق تدفق الهواء والأسئلة الشائعة ، ولكن النتيجة ليست واضحة ، لذلك يبدو لي أن التوضيح مطلوب.
الوثائق : "جدولة تطلق وظيفتك في نهاية الفترة"
النتيجة : إذا قمت بإنشاء مؤشر مع جدول ، على سبيل المثال ،daily ، فسيتم تشغيل المدى الذي يحمل "implementation_date" 2018-01-01 00:00:00 "فعليًا" 2018-02-01 00:00:00 ".
روابط مفيدة:
وثائق الاسترداد
وثائق LastOnlyOperator
مزيد من الوثائق على LatestOnlyOperator
مثال على استخدام LastOnlyOperator
بعض الفروق الدقيقة
سؤال حول التبعيات على الإطلاق السابق
مثال صغير عن الجيل الديناميكي
سؤال حول الجيل الديناميكي مع وصف صغير