
لقد حدث أنه أثناء عملية العمل في MegaFon ، يتعين على المرء أن يواجه نفس المهام عند العمل مع RabbitMQ. السؤال الذي يطرح نفسه بشكل طبيعي: "كيفية تبسيط وأتمتة تنفيذ هذه المهام؟"
الحل الأول الذي يتبادر إلى الذهن هو استخدام واجهة HTTP ، وبطبيعة الحال ، من خارج منطقة الجزاء ، RabbitMQ لديه واجهة ويب جيدة و HTTP API. ومع ذلك ، فإن استخدام HTTP API ليس ملائمًا دائمًا ، بل وأحيانًا مستحيل (لنفترض أنه ليس لديك حقوق وصول كافية ، لكنني أريد حقًا نشر رسالة) في مثل هذه الأوقات ، يصبح من الضروري العمل باستخدام بروتوكول AMQP
لعدم العثور على حلول جاهزة مناسبة لي في المساحات المفتوحة للشبكة ، فقد تقرر كتابة تطبيق صغير للعمل مع RabbitMQ باستخدام بروتوكول AMQP مع القدرة على نقل معلمات بدء التشغيل من خلال سطر الأوامر وتوفير الحد الأدنى الضروري من الميزات ، وهي:
- نشر
- تدقيق الرسائل
- إنشاء وتحرير عناصر المسار الأساسية
تم اختيار Python كأبسط (وفي رأيي جميلة) أداة لتنفيذ هذه المهمة. (يمكن للمرء أن يجادل هنا ، ولكن ما الذي سيتغير؟)
يتم تقديم ترجمات الأدلة الرسمية ( واحد أو اثنين ) بواسطة RabbitMQ على المحور ، ولكن في بعض الأحيان يكون مثال بسيط من الممارسة مفيدًا. في المقالة ، سأحاول توضيح المشكلات الرئيسية التي تنشأ عند العمل مع الأرانب باستخدام قناة AMQP من Python باستخدام مثال لتطبيق صغير. التطبيق نفسه متاح على جيثب .
باختصار حول بروتوكول AMQP ووسيط الرسائل RabbitMQ
AMQP هو أحد بروتوكولات المراسلة الأكثر شيوعًا بين مكونات النظام الموزع. السمة المميزة الرئيسية لهذا البروتوكول هو مفهوم بناء مسار الرسالة ، والذي يحتوي على عنصرين هيكليين رئيسيين: قائمة الانتظار ونقطة التبادل . قائمة الانتظار بتجميع الرسائل حتى يتم تلقيها. نقطة التبادل هي موزع رسائل يوجهها إما إلى قائمة الانتظار المطلوبة أو إلى نقطة تبادل أخرى. تعتمد قواعد التوزيع (الارتباطات) التي تحدد من خلالها نقطة التبادل مكان توجيه الرسالة ، على التحقق من مفتاح توجيه الرسالة للتأكد من توافقه مع القناع المحدد. يمكنك قراءة المزيد حول كيفية عمل AMQP هنا .
RabbitMQ هو تطبيق مفتوح المصدر يدعم AMQP بالكامل ويقدم عددًا من الميزات الإضافية. للعمل مع RabbitMQ ، تمت كتابة عدد كبير من المكتبات في مجموعة متنوعة من لغات البرمجة ، بما في ذلك Python.
تنفيذ بايثون
يمكنك دائمًا رمي نصين للاستخدام الشخصي وعدم معرفة المشكلات التي تحدث معهم. عندما يتعلق الأمر بنشرها بين الزملاء ، يصبح كل شيء أكثر تعقيدًا. يحتاج الجميع إلى إظهار ومعرفة كيفية وما الذي يجب إطلاقه ، وماذا وأين يتغير ، وأين يمكن الحصول على أحدث إصدار ، وما الذي تغير فيه ... لقد توصلت إلى استنتاج مفاده أنه من السهل العمل على واجهة بسيطة مرة واحدة ، حتى لا تضيع الوقت في المستقبل. لسهولة الاستخدام ، تقرر تقسيم التطبيق إلى 4 وحدات:
- الوحدة المسؤولة عن النشر
- الوحدة النمطية المسؤولة عن طرح الرسائل من قائمة الانتظار
- وحدة نمطية مصممة لإجراء تغييرات على تكوين وسيط RabbitMQ
- وحدة نمطية تحتوي على معلمات وطرق شائعة في الوحدات السابقة
يبسط هذا النهج مجموعة معلمات بدء التشغيل. لقد اخترنا الوحدة النمطية المطلوبة ، واخترنا أحد أوضاع التشغيل الخاصة بها واجتزنا المعلمات الضرورية (لمزيد من المعلومات حول أوضاع التشغيل والمعلمات في - المساعدة - المساعدة).
نظرًا لأن بنية "الأرانب" في MegaFon تتكون من عدد كبير بما فيه الكفاية من العقد ، لتسهيل الاستخدام ، يتم نقل البيانات للاتصال العقد إلى وحدة نمطية مع المعلمات العامة والأساليب rmq_common_tools.py
للعمل على AMQP في بيثون ، سوف نستخدم مكتبة Pika .
import pika
باستخدام هذه المكتبة ، سيتألف العمل مع RabbitMQ من ثلاث مراحل رئيسية:
- تأسيس اتصال
- أداء العمليات المطلوبة
- اتصال وثيق
المرحلة الأولى والأخيرة هي نفسها لجميع الوحدات ويتم تنفيذها في rmq_common_tools.py
لتأسيس اتصال:
rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel()
تتيح لك مكتبة Pika استخدام خيارات التصميم المختلفة للاتصال بـ RabbitMQ. في هذه الحالة ، كان الخيار الأكثر ملاءمة هو تمرير المعلمات في شكل سلسلة URL بالتنسيق التالي:
'amqp://rabbit_user:rabbit_password@host:port/vhost'
لإغلاق اتصال:
rmq_connection.close()
نشر
من المحتمل أن يكون نشر رسالة هو الأسهل ، ولكن في نفس الوقت تكون العملية الأكثر شيوعًا عند العمل مع الأرانب.
نشر أدوات النشر المترجمة في rmq_publish.py
لنشر رسالة ، استخدم الطريقة
rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text)
حيث:
التبادل - اسم نقطة التبادل التي سيتم نشر الرسالة
routing_key - مفتاح التوجيه الذي سيتم نشر الرسالة به
النص الأساسي للرسالة
يدعم rmq_publish.py وضعي إدخال الرسائل للنشر:
- يتم إدخال الرسالة كمعلمة من خلال سطر الأوامر (from_console)
- تتم قراءة الرسالة من الملف (from_file)
الوضع الثاني ، في رأيي ، أكثر ملاءمة عند العمل مع الرسائل الكبيرة أو صفائف الرسائل. الأول ، بدوره ، يسمح لك بإرسال رسالة بدون ملفات إضافية ، وهو مناسب عند دمج الوحدة النمطية في سيناريوهات أخرى.
تلقي الرسائل
مسألة تلقي الرسائل لم تعد تافهة مثل النشر. عندما يتعلق الأمر بقراءة الرسائل ، فأنت بحاجة إلى فهم:
- بعد تأكيد استلام الرسالة ، ستتم إزالتها من قائمة الانتظار. لذلك ، عند قراءة الرسائل من خط "المعركة" ، فإننا "نختارها" من المستهلك الرئيسي. إذا كنا لا نريد أن نفقد تدفق الرسائل ، ولكننا نريد فقط أن نفهم ما هي الرسائل التي تتحرك في "الأرنب" ، فإن الخيار الأكثر منطقية هو إنشاء قائمة انتظار "تسجيل" منفصلة ، أو كما يطلق عليها أيضًا ، "قائمة انتظار الاعتراض".
- تتطلب قراءة الرسائل ، كقاعدة عامة ، مزيدًا من المعالجة أو التحليل ، مما يعني أنه يجب حفظها في مكان ما إذا كانت المعالجة في الوقت الفعلي مستحيلة أو غير مطلوبة.
قارئ الرسائل المطبق في ملف rmq_consume.py
يتم توفير وضعين التشغيل:
- قراءة الرسائل من قائمة انتظار موجودة
- إنشاء قائمة انتظار وقت ومسار لقراءة الرسائل من قائمة الانتظار هذه
سيتم النظر في مسألة إنشاء قائمة انتظار وطرق أدناه.
يتم تطبيق التدقيق اللغوي المباشر على النحو التالي:
channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc())
اين
on_message - إجراء معالج الرسالة
params.queue - اسم قائمة الانتظار التي سيتم إجراء الطرح منها
يجب أن يقوم معالج الرسالة بإجراء بعض العملية على رسالة القراءة وتأكيد (أو عدم تأكيد ، إذا لزم الأمر) تسليم الرسالة.
def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log(' .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag)
اين
all_cnt - عداد عالمي
ليم - عدد الرسائل المراد قراءتها
في مثل هذا التنفيذ للمعالج ، يتم طرح عدد معين من الرسائل وإخراج المعلومات حول تقدم الطرح إلى وحدة التحكم في حالة الكتابة إلى ملف.
من الممكن أيضًا كتابة الرسائل المقروءة في قاعدة البيانات. في التنفيذ الحالي ، لم يتم تقديم مثل هذه الفرصة ، لكن ليس من الصعب إضافتها.
سجل في قاعدة بياناتسننظر في مثال على كتابة الرسائل إلى قاعدة البيانات لقاعدة بيانات Oracle ومكتبة cx_oracle .
الاتصال بقاعدة البيانات
ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor()
أضف معالج on_message
global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1
اين
cnt هو عداد آخر
الالتزام - عدد الإدخالات في قاعدة البيانات ، وبعد ذلك من الضروري القيام بـ "الالتزام". يرجع وجود مثل هذه المعلمة إلى الرغبة في تقليل الحمل على قاعدة البيانات. ومع ذلك ، تثبيته ليست كبيرة بشكل خاص ، ل في حالة حدوث فشل ، هناك فرصة لفقدان الرسائل المقروءة بعد آخر التزام ناجح.
وكما هو متوقع ، في نهاية العمل ، نتعهد بالالتزام النهائي ونغلق الاتصال
ora_cursor.execute('commit') connection_ora.close()
شيء من هذا القبيل هو قراءة الرسائل. إذا قمت بإزالة القيود المفروضة على عدد الرسائل المقروءة ، فيمكنك إجراء عملية خلفية للقراءة المستمرة للرسائل من "الأرنب".
التكوين
على الرغم من أن بروتوكول AMQP مخصص في المقام الأول لنشر الرسائل وقراءتها ، فإنه يسمح لك أيضًا بإجراء عمليات بسيطة من خلال تكوين المسارات (لا نتحدث عن تكوين اتصالات الشبكة وإعدادات RabbitMQ الأخرى كتطبيق).
عمليات التكوين الرئيسية هي:
- إنشاء قائمة انتظار أو نقطة تبادل
- إنشاء قاعدة توجيه (ملزمة)
- حذف قائمة انتظار أو نقطة تبادل
- إزالة قاعدة توجيه (ملزمة)
- قائمة انتظار المقاصة
نظرًا لوجود إجراء جاهز لكل منهم في مكتبة pika ، لتسهيل التشغيل ، يتم تجميعها ببساطة في ملف rmq_setup.py . بعد ذلك ، ندرج الإجراءات من مكتبة pika مع بعض التعليقات حول المعلمات.
إنشاء قائمة انتظار
rmq_channel.queue_declare(queue=params.queue, durable = params.durable)
كل شيء بسيط هنا
قائمة الانتظار - اسم قائمة الانتظار لإنشاء
دائم - معلمة منطقية ، تعني قيمة True أنه عند إعادة تشغيل الأرنب ، ستظل قائمة الانتظار موجودة. إذا خطأ ، سيتم حذف قائمة الانتظار عند إعادة التشغيل. عادةً ما يتم استخدام الخيار الثاني لقوائم الانتظار المؤقتة التي يتم ضمان عدم الحاجة إليها في المستقبل.
إنشاء نقطة تبادل (تبادل)
rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable)
هنا تنشأ معلمة جديدة exchange_type - نوع نقطة التبادل. حول أنواع نقاط التبادل التي تتم قراءتها هنا .
التبادل - اسم نقطة التبادل المنشأة
حذف قائمة انتظار أو نقطة تبادل
rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch)
إنشاء قاعدة توجيه (ملزمة)
rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
التبادل - اسم نقطة التبادل التي سيتم إجراء النقل منها
قائمة الانتظار - اسم قائمة الانتظار المطلوب توجيهها إلى
routing_key - قناع مفتاح التوجيه ، والذي سيتم استخدامه لإعادة التوجيه.
الإدخالات التالية صالحة:
- rk.my_key. * - في هذا القناع ، تعني العلامة النجمية مجموعة أحرف غير فارغة. بمعنى آخر ، سيتخطى هذا القناع أي مفتاح من نوع rk.my_key. + شيء آخر ، لكن لن يفوتك المفتاح rk.my_key
- rk.my_key. # - سيتخطى هذا القناع كل شيء مثل مفتاح rk.my_key السابق
إزالة قاعدة توجيه (ملزمة)
rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
كل شيء مشابه لإنشاء قاعدة إعادة توجيه.
قائمة انتظار المقاصة
rmq_channel.queue_purge(queue=params.queue)
قائمة الانتظار - اسم قائمة الانتظار المراد مسحها
حول استخدام واجهة سطر الأوامر في تطبيقات Pythonخيارات بدء التشغيل تجعل الحياة أسهل كثيرًا. من أجل عدم تعديل الكود قبل كل عملية إطلاق ، من المنطقي توفير آلية لتمرير المعلمات عند بدء التشغيل. تم اختيار مكتبة argparse لهذا الغرض . لن أخوض في تفاصيل تعقيدات استخدامه ؛ فهناك أدلة كافية حول هذا الموضوع ( واحد ، اثنان ، ثلاثة ). لاحظت فقط أن هذه الأداة ساعدتني في تبسيط عملية استخدام التطبيق بشكل كبير (إذا كان يمكنك تسميته). حتى بعد إلقاء تسلسل بسيط من الأوامر ولفها في واجهة مشابهة ، يمكنك الحصول على أداة كاملة وسهلة الاستخدام.
التطبيق في الحياة اليومية. ما جاء في متناول يدي أكثر.
حسنًا ، الانطباع قليلاً عن استخدام AMQP في الحياة اليومية.
الميزة الأكثر طلبًا هي نشر الرسالة. لا تسمح حقوق الوصول لمستخدم معين دائمًا باستخدام واجهة ويب ، رغم أنه من الضروري في بعض الأحيان اختبار خدمة معينة. هنا AMQP والتفويض نيابة عن الخدمة باستخدام هذه القناة تمر للمساعدة.
وكان الثاني الأكثر شعبية القدرة على قراءة الرسائل من قائمة انتظار الوقت. هذه الميزة مفيدة في تكوين مسارات جديدة وتدفقات الرسائل ، وكذلك في منع الحوادث.
الاحتمالات الأخرى وجدت أيضا التطبيق في مختلف المهام.