أباتشي نيفي أتمتة تسليم التدفق

مرحبا بالجميع!



المهمة على النحو التالي - هناك تدفق ، يتم تقديمه في الصورة أعلاه ، والتي يجب طرحها على خوادم N باستخدام Apache NiFi . اختبار التدفق - يتم إنشاء ملف وإرساله إلى مثيل NiFi آخر. يتم نقل البيانات باستخدام بروتوكول NiFi Site to Site.


يعد NiFi Site to Site (S2S) طريقة آمنة وقابلة للتخصيص بسهولة لنقل البيانات بين مثيلات NiFi. تعرف على كيفية عمل S2S في الوثائق ومن المهم ألا تنسَ تكوين مثيل NiFi لتمكين S2S من رؤية هنا .

في تلك الحالات عندما يتعلق الأمر بنقل البيانات باستخدام S2S - تسمى مثيل واحد العميل ، الخادم الثاني. يرسل العميل البيانات ، يرسل الخادم. طريقتان لتكوين نقل البيانات بينهما:

  1. دفع. من مثيل عميل ، يتم إرسال البيانات باستخدام مجموعة العمليات عن بُعد (RPG). على مثيل الخادم ، يتم استلام البيانات باستخدام منفذ الإدخال.
  2. سحب. يستقبل الخادم البيانات باستخدام RPG ، يرسل العميل باستخدام منفذ الإخراج.


نحن تخزين تدفق المتداول في سجل اباتشي.


يعد Apache NiFi Registry أحد المشاريع الفرعية لـ Apache NiFi والذي يوفر أداة لتخزين التدفق والتحكم في الإصدار. وهناك نوع من بوابة. يمكن العثور على معلومات حول تثبيت السجل وتكوينه والعمل معه في الوثائق الرسمية . يتم دمج تدفق التخزين في مجموعة عمليات ويتم تخزينه على هذا النحو في السجل. كذلك في المقالة سوف نعود إلى هذا.


في البداية ، عندما يكون N عددًا صغيرًا ، يتم تسليم التدفق وتحديثه باليد في وقت مقبول.

ولكن مع نمو N ، هناك المزيد من المشاكل:

  1. تحديث التدفق يستغرق المزيد من الوقت. من الضروري أن تذهب إلى جميع الخوادم
  2. هناك أخطاء في تحديث القوالب. هنا أنها حدثت ، ولكن هنا نسوا
  3. أخطاء بشرية عند تنفيذ عدد كبير من العمليات من نفس النوع

كل هذا يقودنا إلى حقيقة أننا بحاجة إلى أتمتة العملية. جربت الطرق التالية لحل هذه المشكلة:

  1. استخدم MiNiFi بدلاً من NiFi
  2. نيفي CLI
  3. NiPyAPI

باستخدام MiNiFi


Apache MiNiFy هو مشروع فرعي لـ Apache NiFi. MiNiFy هو عامل مضغوط يستخدم نفس المعالجات مثل NiFi ، مما يتيح لك إنشاء نفس التدفق كما في NiFi. يتم تحقيق خفة العامل ، من بين أشياء أخرى ، نظرًا لحقيقة أن MiNiFy لا يحتوي على واجهة رسومية لتكوين التدفق. عدم وجود واجهة رسومية في MiNiFy يعني أنه من الضروري حل مشكلة تسليم التدفق في ميني. نظرًا لأن MiNiFy يُستخدم بنشاط في IOT ، فهناك العديد من المكونات ، ويجب أتمتة عملية توصيل التدفق إلى الحالات المصغرة النهائية. مهمة مألوفة ، أليس كذلك؟

سيساعد المشروع الفرعي الآخر في حل هذه المشكلة - خادم MiNiFi C2. يهدف هذا المنتج إلى أن يكون نقطة مركزية في هندسة التكوينات الدائرية. كيفية تكوين البيئة - الموصوفة في هذه المقالة على حبري ومعلومات كافية لحل هذه المهمة. يقوم MiNiFi بالتزامن مع خادم C2 بتحديث التكوين تلقائيًا في المنزل. العيب الوحيد في هذا النهج هو أنه يجب عليك إنشاء قوالب على خادم C2 ، فالتزام التسجيل البسيط لا يكفي.

الخيار الموضح في المقالة أعلاه يعمل وليس من الصعب تنفيذه ، ولكن لا تنس ما يلي:

  1. في minifi لا توجد جميع المعالجات من nifi
  2. تتخلف إصدارات المعالج في Minifi عن إصدارات المعالج في NiFi.

في وقت كتابة هذا التقرير ، كان أحدث إصدار من NiFi هو 1.9.2. إصدار المعالج من أحدث إصدار من MiNiFi هو 1.7.0. يمكن إضافة المعالجات إلى MiNiFi ، ولكن نظرًا لاختلافات الإصدار بين معالجات NiFi و MiNiFi قد لا يعمل هذا.

نيفي CLI


استنادا إلى وصف الأداة على الموقع الرسمي ، فهي أداة لأتمتة التفاعل بين NiFI و NiFi Registry في مجال تسليم التدفق أو التحكم في العملية. للبدء ، يجب تنزيل هذه الأداة من هنا .

تشغيل الأداة المساعدة

./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete. 

لكي نتمكن من تحميل التدفق الضروري من السجل ، نحتاج إلى معرفة معرفات السلة (معرف الجرافة) والتدفق نفسه (معرف التدفق). يمكن الحصول على هذه البيانات إما من خلال CLI ، أو من خلال واجهة الويب الخاصة بسجل NiFi. تبدو واجهة الويب كما يلي:



باستخدام CLI يفعل هذا:

 #> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85 

نبدأ في استيراد مجموعة العملية من السجل:

 #> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3 

النقطة المهمة هي أنه يمكن تحديد أي مثيل nifi كمضيف الذي نقوم بتشغيل مجموعة العملية عليه.

مجموعة العملية المضافة مع المعالجات توقف ، يجب أن تبدأ

 #> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080 

عظيم ، بدأت المعالجات. ومع ذلك ، وفقًا لظروف المشكلة ، نحتاج إلى مثيلات NiFi لإرسال البيانات إلى مثيلات أخرى. افترض أنك حددت طريقة الدفع لنقل البيانات إلى الخادم. من أجل تنظيم نقل البيانات ، تحتاج إلى تمكين نقل البيانات (تمكين الإرسال) على مجموعة العمليات عن بعد المضافة (RPG) ، والتي تم تضمينها بالفعل في تدفقنا.



في الوثائق في CLI والمصادر الأخرى ، لم أجد طريقة لتمكين نقل البيانات. إذا كنت تعرف كيفية القيام بذلك ، يرجى الكتابة في التعليقات.

نظرًا لأن لدينا bash ونحن على استعداد للذهاب إلى النهاية - سنجد طريقة للخروج! يمكنك استخدام واجهة برمجة تطبيقات NiFi لحل هذه المشكلة. نستخدم الطريقة التالية ، نأخذ المعرف من الأمثلة أعلاه (في حالتنا ، هو 7f522a13-016e-1000-e504-d5b15587f2f3). وصف أساليب API NiFi هنا .


في الجسم ، تحتاج إلى تمرير JSON ، من النموذج التالي:

 { "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true } 

المعلمات التي يجب ملؤها من أجل "العمل":
حالة نقل البيانات الحالة. TRANSMITTING المتاحة لتمكين نقل البيانات ، توقف لإيقاف
نسخة - نسخة المعالج

سيكون الإصدار افتراضيًا عند 0 ، ولكن يمكن الحصول على هذه المعلمات باستخدام الطريقة



بالنسبة لمحبي البرامج النصية للباش ، قد تبدو هذه الطريقة مناسبة ، لكنها صعبة بالنسبة لي - البرامج النصية للباش ليست المفضلة لدي. الطريقة التالية هي أكثر إثارة للاهتمام ومريحة في رأيي.

NiPyAPI


NiPyAPI هي مكتبة بيثون للتفاعل مع مثيلات NiFi. تحتوي صفحة الوثائق على المعلومات اللازمة للعمل مع المكتبة. يوصف البدء السريع في مشروع جيثب.

برنامجنا النصي لبدء التهيئة هو برنامج بيثون. نمرر الترميز.
نحن تكوين التكوين لمزيد من العمل. سنحتاج المعلمات التالية:

 nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #  nifi-api ,    process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #  nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry,      nifi nipyapi.config.bucket_name = 'BucketName' # bucket,    flow nipyapi.config.flow_name = 'FlowName' # flow,   

علاوة على ذلك ، سوف أدرج أسماء أساليب هذه المكتبة ، الموضحة هنا .

ربط التسجيل ل nifi مثيل مع

 nipyapi.versioning.create_registry_client 

في هذه الخطوة ، يمكنك أيضًا إضافة تحقق من أن السجل قد تمت إضافته بالفعل إلى المثيل ؛ لذلك ، يمكنك استخدام الطريقة

 nipyapi.versioning.list_registry_clients 

ابحث عن دلو لمزيد من البحث عن التدفق في السلة.

 nipyapi.versioning.get_registry_bucket 

بحث دلو للتدفق

 nipyapi.versioning.get_flow_in_bucket 

علاوة على ذلك ، من المهم أن نفهم ما إذا كانت مجموعة العمليات هذه قد تمت إضافتها بالفعل. يتم وضع مجموعة العملية في الإحداثيات وقد ينشأ موقف عندما يتم تراكب مجموعة ثانية أعلى مكون واحد. راجعت ، يمكن أن يكون هذا :) للحصول على كل مجموعة العملية المضافة ، نستخدم هذه الطريقة

 nipyapi.canvas.list_all_process_groups 

وبعد ذلك يمكننا البحث ، على سبيل المثال بالاسم.

لن أصف عملية تحديث القالب ، سأقول فقط أنه إذا تمت إضافة المعالجات في الإصدار الجديد من القالب ، فلن تكون هناك مشاكل في وجود رسائل في قوائم الانتظار. ولكن إذا تم حذف المعالجات ، فقد تنشأ مشاكل (لا يسمح nifi بالمعالج إذا تم تجميع قائمة انتظار الرسائل أمامه). إذا كنت مهتمًا بكيفية حل هذه المشكلة - اكتب لي ، من فضلك ، سنناقش هذه النقطة. الاتصال في نهاية المقال. دعنا ننتقل إلى خطوة إضافة مجموعة عملية.

عند تصحيح أخطاء البرنامج النصي ، صادفت ميزة لا يتم دائمًا سحب أحدث إصدار من التدفق ، لذلك أوصي بتوضيح هذا الإصدار أولاً:

 nipyapi.versioning.get_latest_flow_ver 

مجموعة عملية النشر:

 nipyapi.versioning.deploy_flow_version 

نبدأ المعالجات:

 nipyapi.canvas.schedule_process_group 

في كتلة CLI ، تمت كتابة أن نقل البيانات لا يتم تشغيله تلقائيًا في مجموعة العمليات عن بُعد؟ عند تنفيذ البرنامج النصي ، واجهت هذه المشكلة أيضًا. في ذلك الوقت ، لم تنجح في بدء نقل البيانات باستخدام واجهة برمجة التطبيقات (API) وقررت الكتابة إلى مطور مكتبة NiPyAPI وطلب المشورة / المساعدة. أجابني المطور ، وناقشنا المشكلة ، وقال إنه يحتاج إلى وقت "لفحص شيء ما". والآن ، بعد يومين ، تصل رسالة مكتوبة فيها وظيفة بيثون تحل مشكلة الإطلاق! في ذلك الوقت ، كان إصدار NiPyAPI هو 0.13.3 ، وبالطبع ، لم يكن هناك شيء من هذا القبيل فيه. ولكن في الإصدار 0.14.0 ، الذي تم إصداره مؤخرًا ، تم بالفعل تضمين هذه الوظيفة في المكتبة. الوفاء،

 nipyapi.canvas.set_remote_process_group_transmission 

لذلك ، باستخدام مكتبة NiPyAPI ، قمنا بتوصيل السجل والتدفق المدلفن وحتى بدأنا في المعالجات ونقل البيانات. بعد ذلك ، يمكنك مشط الكود وإضافة جميع أنواع الشيكات والتسجيل وكل ذلك. لكن هذه قصة مختلفة تماما.

من خيارات الأتمتة التي فكرت فيها ، بدا لي الأخير الأكثر كفاءة. أولاً ، لا يزال هذا رمز python ، حيث يمكنك تضمين رمز البرنامج المساعد والاستفادة الكاملة من لغة البرمجة. ثانياً ، يتطور مشروع NiPyAPI بنشاط وفي حالة حدوث مشكلات ، يمكنك الكتابة إلى المطور. ثالثًا ، لا يزال NiPyAPI أداة أكثر مرونة للتفاعل مع NiFi في حل المشكلات المعقدة. على سبيل المثال ، في تحديد ما إذا كانت قوائم انتظار الرسائل فارغة الآن في التدفق وما إذا كان يمكن تحديث مجموعة العملية.

هذا كل شيء. لقد وصفت 3 طرق لأتمتة تسليم التدفق في NiFi ، والمخاطر التي قد يواجهها المطور وأعطت رمزًا تشغيليًا لأتمتة التسليم. إذا كنت مهتمًا بهذا الموضوع - اكتب!

Source: https://habr.com/ru/post/ar476722/


All Articles