هناك العديد من الطرق لمعالجة الرسائل من أنظمة Pub-Sub: استخدام خدمة منفصلة ، عزل عملية معزولة ، تنظيم تجمع معالجة / سلسلة رسائل ، IPC معقد ، Poll-over-Http والعديد من الخدمات الأخرى. اليوم أريد أن أتحدث عن كيفية استخدام Pub-Sub عبر HTTP وحول خدمتي المكتوبة خصيصًا لهذا الغرض.
يعد استخدام الواجهة الخلفية لخدمة HTTP الجاهزة في بعض الحالات حلاً مثاليًا لمعالجة قائمة انتظار الرسائل:
- تحقيق التوازن من خارج منطقة الجزاء. عادةً ما تكون الواجهة الخلفية بالفعل وراء الموازن ولديها بنية تحتية جاهزة للتحميل ، مما يبسط العمل مع الرسائل بشكل كبير.
- باستخدام وحدة تحكم REST العادية (أي مورد HTTP). يقلل استهلاك رسائل HTTP من تكلفة تنفيذ برامج compumers للغات مختلفة إذا كانت الخلفية مختلطة.
- تبسيط استخدام خطافات الويب للخدمات الأخرى. الآن تقريبًا كل خدمة (Jira و Gitlab و Mattermost و Slack ...) تدعم بطريقة أو بأخرى روابط الويب للتفاعل مع العالم الخارجي. يمكنك جعل الحياة أسهل إذا علمت قائمة الانتظار لأداء وظائف مرسل HTTP.
هذا النهج له أيضا عيوب:
- يمكنك نسيان خفة الحل. HTTP بروتوكول كبير ، واستخدام الأطر على جانب المستهلك سيزيد على الفور من الكمون والتحميل.
- نفقد نقاط القوة في نهج الاستطلاع ، ونحصل على نقاط الضعف في Push.
- يمكن أن تؤثر معالجة الرسائل بواسطة مثيلات الخدمة نفسها التي تعالج العملاء على الاستجابة. هذه ليست مهمة ، حيث يتم التعامل معها بالتوازن والعزلة.
لقد طبقت الفكرة كخدمة قائمة انتظار على بروتوكول انتقال النص المتشعب ، والتي سيتم مناقشتها لاحقًا. تمت كتابة المشروع في Kotlin باستخدام Spring Boot 2.1. كوسيط ، يتوفر Apache Kafka فقط حاليًا.
علاوة على ذلك ، من المفترض أن يكون القارئ معتادًا على كافكا ويعرف الإلتزامات (الالتزام) والإزاحة (الإزاحة) للرسائل ومبادئ المجموعات (المجموعة) والمستهلكون (المستهلك) ، ويفهم أيضًا مدى اختلاف التقسيم (القسم) عن الموضوع (الموضوع) . إذا كانت هناك فجوات ، فإنني أنصحك بقراءة هذا القسم من وثائق كافكا قبل المتابعة.المحتويات
مراجعة
Queue-Over-Http هي خدمة تعمل كوسيط بين وسيط الرسائل ومستهلك HTTP النهائي (الخدمة تجعل من السهل تنفيذ الدعم لإرسال الرسائل إلى المستهلكين بأي طريقة أخرى ، على سبيل المثال ، * RPC). في الوقت الحالي ، لا يتوفر سوى الاشتراك وإلغاء الاشتراك وعرض قائمة المستهلكين ، ولم يتم بعد إرسال الرسائل إلى الوسيط (إنتاج) عبر HTTP بسبب عدم القدرة على ضمان ترتيب الرسائل دون دعم خاص من المنتج.
الرقم الرئيسي للخدمة هو المستهلك ، الذي يمكنه الاشتراك في أقسام محددة أو فقط في الموضوعات (يتم دعم نمط الموضوع). في الحالة الأولى ، يتم إيقاف تشغيل التوازن التلقائي للأقسام. بعد الاشتراك ، يبدأ مورد HTTP المحدد في تلقي الرسائل من أقسام كافكا المعينة. معماريا ، يرتبط كل مشترك بعميل Kafka Java الأصلي.
قصة مسلية عن KafkaConsumerلدى كافكا عميل جافا رائع يمكنه القيام بالكثير. يمكنني استخدامه في محول قائمة الانتظار لتلقي الرسائل من الوسيط ثم إرساله إلى قوائم انتظار الخدمة المحلية. تجدر الإشارة إلى أن العميل يعمل بشكل حصري في سياق موضوع واحد.
فكرة المحول بسيطة. نبدأ في موضوع واحد ، نكتب أبسط جدولة العملاء الأصليين ، مع التركيز على تقليل زمن الوصول. وهذا هو ، نكتب شيئا مشابها:
while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) {
يبدو أن كل شيء رائع ، الكمون هو الحد الأدنى حتى مع عشرات المستهلكين. في الممارسة العملية ، اتضح أن
KafkaConsumer
لهذا الوضع من التشغيل ويعطي معدل تخصيص يبلغ حوالي 1.5 ميجابايت / ثانية في وقت الخمول. مع 100 ناقل ، يصل معدل التخصيص إلى 150 ميجابايت / ثانية ويجعل GC تفكر كثيرًا في التطبيق. بالطبع ، كل هذه القمامة موجودة في المنطقة الصغيرة ، GC قادرة على التعامل مع هذا ، لكن الحل ليس مثاليًا.
من الواضح أنك بحاجة إلى المضي قدماً بالطريقة المعتادة لـ
KafkaConsumer
، والآن أضع كل مشترك في ساحة المشاركات الخاصة بي. هذا يعطي مقدار حمل للذاكرة والجدولة ، ولكن لا توجد طريقة أخرى.
أعد كتابة الرمز من أعلى ، وأزل الحلقة الداخلية وتغيير
Duration.ZERO
إلى
Duration.ofMillis(100)
. اتضح بشكل جيد ، ينخفض معدل التخصيص إلى 80-150 كيلو بايت في الثانية لكل مستهلك. ومع ذلك ، يؤخر Poll ذو مهلة قدرها 100 مللي ثانية قائمة الانتظار بأكملها من الالتزامات إلى نفس 100 مللي ثانية ، وهذا غير مقبول كثيرًا.
في عملية إيجاد حلول للمشكلة ، أتذكر
KafkaConsumer::wakeup
، التي تلقي
WakeupException
أي عملية حظر على المستهلك. من خلال هذه الطريقة ، يكون الطريق إلى زمن الانتقال البسيط بسيطًا: عندما يصل طلب جديد للالتزام ، نضعه في قائمة الانتظار ، وعلى المستهلك الأصلي نسميه
wakeup
. في دورة العمل ،
WakeupException
إلى ارتكاب ما تراكم. لنقل السيطرة بمساعدة الاستثناءات ، يجب عليك أن تضعها بين يديك فورًا ، ولكن نظرًا لعدم وجود أي شيء آخر ...
اتضح أن هذا الخيار بعيد عن أن يكون مثالياً ، لأن أي عملية على المستهلك الأصلي
WakeupException
الآن
WakeupException
، بما في ذلك الالتزام نفسه. ستؤدي معالجة هذا الموقف إلى تشويش الرمز بعلامة تسمح
wakeup
.
لقد توصلت إلى استنتاج مفاده أنه سيكون من الجيد تعديل طريقة
KafkaConsumer::poll
بحيث يمكن مقاطعتها بشكل طبيعي ، وفقًا
KafkaConsumer::poll
إضافية. نتيجةً لذلك ، وُلد
فرانكشتاين من الانعكاس ، الذي ينسخ بالضبط طريقة الاستطلاع الأصلية ، مضيفًا خروجًا من الحلقة بالعلم. يتم تعيين هذه العلامة بواسطة طريقة interruptPoll منفصلة ، والتي ، علاوة على ذلك ، تستيقظ على أداة تحديد العميل لتحرير قفل مؤشر الترابط في عمليات الإدخال / الإخراج.
بعد تنفيذ العميل بهذه الطريقة ، أحصل على سرعة رد الفعل من اللحظة التي يصل فيها طلب الالتزام حتى معالجته حتى 100 ميكروثانية ، وتأخر ممتاز لجلب الرسائل من وسيط ، وهو أمر جيد جدًا.
يتم تمثيل كل قسم بواسطة قائمة انتظار محلية منفصلة ، حيث يكتب المحول رسائل من الوسيط. يأخذ العامل الرسائل منه ويرسلها للتنفيذ ، أي لإرسالها عبر HTTP.
تدعم الخدمة معالجة رسائل الدُفعات لزيادة الإنتاجية. عند الاشتراك ، يمكنك تحديد
concurrencyFactor
كل موضوع (ينطبق على كل قسم معين بشكل مستقل). على سبيل المثال ، يعني
concurrencyFactor=1000
أنه يمكن إرسال 1000 رسالة في شكل طلبات HTTP إلى المستهلك في نفس الوقت. بمجرد أن يتم حل جميع الرسائل الواردة من العبوة بشكل لا لبس فيه من قبل المستهلك ، تقرر الخدمة الالتزام التالي لتعويض آخر رسالة في كافكا. وبالتالي ، فإن القيمة الثانية لـ
concurrencyFactor
هي الحد الأقصى لعدد الرسائل التي يعالجها المستهلك في حالة تعطل Kafka أو Queue-Over-Http.
لتقليل التأخيرات ، تحتوي قائمة الانتظار على
loadFactor = concurrencyFactor * 2
، والذي يسمح لك بقراءة ضعف عدد الرسائل من الوسيط التي يمكن إرسالها. نظرًا لتعطيل autocommit على العميل الأصلي ، فإن هذا المخطط لا ينتهك ضمانات At-Least-Once.
تؤدي قيمة
concurrencyFactor
العالية إلى زيادة إنتاجية قائمة الانتظار عن طريق تقليل عدد الإلتزامات التي تصل إلى 10 مللي ثانية في أسوأ الحالات. في الوقت نفسه ، يزداد الحمل على المستهلك.
ترتيب إرسال الرسائل داخل الحزمة غير مضمون ، ولكن يمكن تحقيق ذلك عن طريق ضبط
concurrencyFactor=1
.
يرتكب
ارتكاب هي جزء مهم من الخدمة. عندما تكون الحزمة التالية من البيانات جاهزة ، فإن إزاحة آخر رسالة من الحزمة ملتزم على الفور بـ Kafka ، وفقط بعد الالتزام الناجح تصبح الحزمة التالية متاحة للمعالجة. في كثير من الأحيان هذا لا يكفي والتزام السيارات هو مطلوب. للقيام بذلك ، هناك المعلمة
autoCommitPeriodMs
، والتي تشترك في القليل مع فترة
autoCommitPeriodMs
الكلاسيكي للعملاء الأصليين الذين يرتكبون الرسالة الأخيرة التي تمت قراءتها من القسم. تخيل
concurrencyFactor=10
. أرسلت الخدمة جميع الرسائل العشرة وتنتظر أن يكون كل منها جاهزًا. اكتمال معالجة الرسالة 3 أولاً ، ثم الرسالة 1 ، ثم الرسالة 10. في هذه المرحلة ، حان وقت الالتزام التلقائي. من المهم عدم انتهاك دلالات At-Least-Once. لذلك ، يمكنك فقط الالتزام بالرسالة الأولى ، أي الإزاحة 2 ، حيث تم معالجتها بنجاح فقط في تلك اللحظة. علاوة على ذلك ، حتى تتم معالجة الرسائل التلقائية التالية ، تتم معالجة الرسائل 2 و 5 و 6 و 4 و 8. والآن يلزمك الالتزام فقط بالإزاحة 7 وهكذا. لا يؤثر Autocommit تقريبًا على الإنتاجية.
خطأ في التعامل
في الوضع العادي للتشغيل ، ترسل الخدمة رسالة إلى المشرف مرة واحدة. إذا تسببت ، لسبب ما ، في حدوث خطأ 4xx أو 5xx ، فستعيد الخدمة إرسال الرسالة ، في انتظار المعالجة الناجحة. يمكن تكوين الوقت بين المحاولات كمعلمة منفصلة.
من الممكن أيضًا تعيين عدد المحاولات التي سيتم بعدها وضع علامة على الرسالة كمعالجة ، مما يؤدي إلى إيقاف عمليات إعادة الإرسال بغض النظر عن حالة الاستجابة. لا أنصح باستخدام هذا للبيانات الحساسة ، يجب دائمًا ضبط حالات فشل المستهلكين يدويًا. يمكن مراقبة الرسائل الملصقة بواسطة سجلات الخدمة ومراقبة حالة استجابة المستهلك.
حول الشائكةعادةً ما يرسل خادم HTTP ، الذي يعطي 4xx أو 5xx حالة الاستجابة ، أيضًا Connection: close
header. يظل اتصال TCP المغلق بهذه الطريقة في TIME_WAITED
حتى يتم مسحه بواسطة نظام التشغيل بعد مرور بعض الوقت. المشكلة هي أن مثل هذه الاتصالات تشغل منفذًا كاملًا لا يمكن إعادة استخدامه حتى يتم إصداره. قد ينتج عن ذلك عدم وجود منافذ مجانية على الجهاز لتأسيس اتصال TCP وسيتم طرح الخدمة مع استثناءات في سجلات كل إرسال. في الممارسة العملية ، في Windows 10 ، تنتهي المنافذ بعد إرسال 10-20 ألف رسالة خاطئة في غضون 1-2 دقائق. في الوضع القياسي ، هذه ليست مشكلة.
الرسائل
يتم إرسال كل رسالة مستخرجة من الوسيط إلى المستشار عبر HTTP إلى المورد المحدد أثناء الاشتراك. بشكل افتراضي ، يتم إرسال رسالة بواسطة طلب POST في الجسم. يمكن تغيير هذا السلوك عن طريق تحديد أي طريقة أخرى. إذا كانت الطريقة لا تدعم إرسال البيانات في النص ، يمكنك تحديد اسم معلمة السلسلة التي سيتم إرسال الرسالة فيها. بالإضافة إلى ذلك ، عند الاشتراك ، يمكنك تحديد رؤوس إضافية ستتم إضافتها إلى كل رسالة ، وهي مناسبة للترخيص الأساسي باستخدام الرموز. تتم إضافة الرؤوس إلى كل رسالة بمعرف المستهلك والموضوع والقسم ، حيث تمت قراءة الرسالة ، ورقم الرسالة ، ومفتاح القسم ، إن أمكن ، وكذلك اسم الوسيط.
الأداء
لتقييم الأداء ، استخدمت جهاز كمبيوتر (Windows 10 ، OpenJDK-11 (G1 بدون توليف) ، i7-6700K ، 16GB) ، الذي يقوم بتشغيل الخدمة وجهاز كمبيوتر محمول (Windows 10 ، i5-8250U ، 8GB) ، حيث كان منتج الرسائل ، HTTP يدور مورد المستهلك و Kafka مع الإعدادات الافتراضية. الكمبيوتر متصل بالموجه عبر اتصال سلكي 1 جيجابت / ثانية ، والكمبيوتر المحمول عبر 802.11ac. يكتب المنتج كل 110 مللي ثانية كل 100 مللي ثانية عن 110 بايت من الرسائل إلى الموضوعات المحددة التي يتم اشتراك المتابعين فيها (
concurrencyFactor=500
، يتم إيقاف الالتزام التلقائي) من مجموعات مختلفة. الحامل بعيد عن المثالية ، لكن يمكنك الحصول على بعض الصور.
معلمة القياس الرئيسية هي تأثير الخدمة على الكمون.
دع:
- t
q - الطابع الزمني لخدمة استقبال الرسائل من العميل الأصلي
- d
t0 هو الوقت بين t
q والوقت الذي تم فيه إرسال الرسالة من قائمة الانتظار المحلية إلى مجموعة المسؤولين التنفيذيين
- d
t هو الوقت بين t
q والوقت الذي تم فيه إرسال طلب HTTP. أن d
t هو تأثير الخدمة على زمن الوصول للرسالة.
خلال القياسات ، تم الحصول على النتائج التالية (C - مستهلكون ، مواضيع T ، رسائل M):

في وضع التشغيل القياسي ، لا تؤثر الخدمة نفسها تقريبًا على زمن الوصول ، كما أن استهلاك الذاكرة ضئيل. لا يتم تحديد القيم القصوى لـ d
t (حوالي 60 مللي ثانية) على وجه التحديد ، لأنها تعتمد على تشغيل GC ، وليس على الخدمة نفسها. يمكن أن يساعد الضبط الخاص لـ GC أو استبدال G1 مع Shenandoah في تخفيف انتشار القيم القصوى.
يتغير كل شيء بشكل كبير عندما لا يتعامل المستهلك مع تدفق الرسائل من قائمة الانتظار وتقوم الخدمة بتشغيل وضع الاختناق. في هذا الوضع ، يزداد استهلاك الذاكرة ، حيث يزداد وقت الاستجابة للطلبات بشكل كبير ، مما يمنع تنظيف الموارد في الوقت المناسب. يبقى التأثير على الكمون هنا على مستوى النتائج السابقة ، وتنتج قيم dt العالية عن التحميل المسبق للرسائل في قائمة الانتظار المحلية.
لسوء الحظ ، لا يمكن الاختبار عند تحميل أعلى ، نظرًا لأن الكمبيوتر المحمول ينحني بالفعل عند 1300 RPS. إذا تمكن أي شخص من المساعدة في تنظيم القياسات بالأحمال الكبيرة ، فسوف أكون سعيدًا بتقديم مجموعة للاختبارات.
مظاهرة
الآن دعنا ننتقل إلى المظاهرة. لهذا نحتاج:
- كافكا وسيط ، على استعداد للذهاب. سآخذ المثيل الذي أثير في 192.168.99.100:9092 من Bitnami.
- مورد HTTP الذي سيتلقى الرسائل. من أجل الوضوح ، أخذت خطافات الويب من سلاك.
بادئ ذي بدء ، تحتاج إلى رفع خدمة Queue-Over-Http نفسها. للقيام بذلك ، قم بإنشاء المحتويات التالية في دليل
application.yml
فارغ:
spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092"
نوضح هنا للخدمة معلمات اتصال وسيط معين ، وكذلك أماكن تخزين المشتركين بحيث لا تضيع بين عمليات البدء. في `app.brokers []. Config '، يمكنك تحديد أي معلمات اتصال يدعمها عميل Kafka الأصلي ؛ يمكن العثور على قائمة كاملة
هنا .
نظرًا لأن ملف التكوين تتم معالجته بواسطة Spring ، يمكنك كتابة الكثير من الأشياء المثيرة للاهتمام هناك. بما في ذلك ، تكوين التسجيل.
الآن قم بتشغيل الخدمة نفسها. نحن نستخدم أسهل طريقة -
docker-compose.yml
:
version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist
إذا كان هذا الخيار لا يناسبك ، فيمكنك ترجمة الخدمة من المصدر. إرشادات التجميع في مشروع المستند التمهيدي ، وهو رابط يتم تقديمه في نهاية المقالة.والخطوة التالية هي تسجيل المشترك الأول. للقيام بذلك ، تحتاج إلى تنفيذ طلب HTTP للخدمة مع وصف للمستهلك:
POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } }
إذا سارت الأمور على ما يرام ، فستكون الاستجابة هي نفس المحتوى المرسل.
دعنا نذهب من خلال كل معلمة:
Consumer.id
- معرف المشترك لديناConsumer.group.id
- معرف المجموعةConsumer.broker
- حدد أي من وسطاء الخدمات الذين تحتاج إلى الاشتراك فيهConsumer.topics[0].name
- اسم الموضوع الذي نريد استلام الرسائل منهConsumer.topics[0].config. concurrencyFactor
Consumer.topics[0].config. concurrencyFactor
- الحد الأقصى لعدد الرسائل المرسلة في وقت واحدConsumer.topics[0].config. autoCommitPeriodMs
Consumer.topics[0].config. autoCommitPeriodMs
- فترة الالتزام القسري للرسائل الجاهزةConsumer.subscriptionMethod.type
- نوع الاشتراك. HTTP فقط هو متاح حاليا.Consumer.subscriptionMethod.delayOnErrorMs
- الوقت قبل إعادة إرسال رسالة انتهت بخطأConsumer.subscriptionMethod.retryBeforeCommit
- عدد محاولات إعادة إرسال رسالة الخطأ. إذا 0 - سوف تدور الرسالة حتى معالجة ناجحة. في حالتنا ، لا يضمن ضمان التسليم الكامل أهمية ثبات التدفق.Consumer.subscriptionMethod.uri
- المورد الذي سيتم إرسال الرسائل إليهConsumer.subscriptionMethod.additionalHeader
- رؤوس إضافية سيتم إرسالها مع كل رسالة. لاحظ أنه سيكون هناك JSON في نص كل رسالة بحيث يمكن لـ Slack تفسير الطلب بشكل صحيح.
في هذا الطلب ، تم حذف طريقة HTTP ، نظرًا لأن الإعداد الافتراضي ، POST ، Slack جيد جدًا.من هذه اللحظة ، تراقب الخدمة الأقسام المعينة لموضوع slack.test للرسائل الجديدة.
لكتابة رسائل إلى الموضوع ،
/opt/bitnami/kafka/bin
الأدوات المساعدة المضمنة في كافكا الموجودة في
/opt/bitnami/kafka/bin
لصورة Kafka التي تم إطلاقها (قد يختلف موقع الأدوات المساعدة في مثيلات Kafka الأخرى):
kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {“text”: “Hello!”}
في نفس الوقت ، سوف يعلمك Slack برسالة جديدة:
لإلغاء اشتراك أحد المستهلكين ، يكفي تقديم طلب POST لـ "وسيط / إلغاء الاشتراك" بنفس المحتوى الذي كان خلال الاشتراك.الخاتمة
في الوقت الحالي ، يتم تنفيذ الوظيفة الأساسية فقط. علاوة على ذلك ، من المزمع تحسين الدُفعات ، ومحاولة تنفيذ دلالات الألفاظ تمامًا ، وإضافة القدرة على إرسال رسائل إلى الوسيط عبر HTTP ، والأهم من ذلك ، إضافة دعم لنشر Pub-Sub الشهير.
خدمة قائمة الانتظار عبر بروتوكول انتقال النص المتشعب هي حاليا قيد التطوير النشط. الإصدار 0.1.3 مستقر بما فيه الكفاية للاختبار على المدرجات مرحلة التطوير. تم اختبار الأداء على أنظمة التشغيل Windows 10 و Debian 9 و Ubuntu 18.04. يمكنك استخدام همز على مسؤوليتك الخاصة. إذا كنت ترغب في المساعدة في التطوير أو تقديم أي ملاحظات حول الخدمة - مرحبًا بكم في مشروع
جيثب .