كيف أصبح كافكا حقيقة


مرحبا يا هبر!


أنا أعمل في فريق Tinkoff ، الذي يعمل على تطوير مركز الإخطار الخاص به. بالنسبة للجزء الأكبر ، أنا أطور في Java باستخدام Spring boot وحل المشكلات التقنية المختلفة التي تنشأ في المشروع.


تتفاعل معظم خدماتنا الميكروية بشكل غير متزامن مع بعضها البعض من خلال وسيط الرسائل. في السابق ، استخدمنا IBM MQ كوسيط ، والذي توقف عن التعامل مع الحمل ، ولكن في نفس الوقت كان لديه ضمانات تسليم عالية.


كبديل ، لقد عرضنا على Apache Kafka ، والذي يتميز بقابلية عالية للتطوير ، ولكن لسوء الحظ ، يتطلب منهج تكوين فردي تقريبًا لسيناريوهات مختلفة. بالإضافة إلى ذلك ، فإن آلية التسليم مرة واحدة على الأقل ، والتي تعمل في كافكا بشكل افتراضي ، لم تسمح بالحفاظ على المستوى المطلوب من الاتساق خارج الصندوق. بعد ذلك ، سوف أشارك تجربتنا في تكوين كافكا ، على وجه الخصوص ، وسأخبرك عن كيفية التهيئة والعيش مع التسليم مرة واحدة بالضبط.


تسليم مضمون وأكثر من ذلك


ستساعد المعلمات التي سيتم مناقشتها لاحقًا في منع عدد من المشكلات في إعدادات الاتصال الافتراضية. لكن أولاً ، أريد أن أنبه إلى معلمة واحدة من شأنها أن تسهل تصحيح ممكن.


سوف Client.id للمنتج والمستهلك مساعدة في هذا. للوهلة الأولى ، يمكنك استخدام اسم التطبيق كقيمة ، وسيعمل هذا في معظم الحالات. على الرغم من أن الموقف عند استخدام عدة مستهلكين في التطبيق وإعطائهم نفس client.id يؤدي إلى التحذير التالي:


org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0 

إذا كنت ترغب في استخدام JMX في تطبيق مع Kafka ، فقد يكون ذلك مشكلة. في هذه الحالة ، من الأفضل استخدام مزيج من اسم التطبيق ، على سبيل المثال ، اسم الموضوع ، كقيمة client.id. يمكن رؤية نتيجة التكوين الخاصة بنا في إخراج أمر kafka-Consumer-groups من الأدوات المساعدة من Confluent:



سنقوم الآن بتحليل سيناريو تسليم الرسالة المضمون. يحتوي Kafka Producer على معلمة acks تسمح لك بالتكوين بعد كم من الإقرار بأن زعيم الكتلة يحتاج إلى مراعاة الرسالة المسجلة بنجاح. يمكن أن تأخذ هذه المعلمة القيم التالية:


  • 0 - الاعتراف لن يتم النظر فيها.
  • 1 - المعلمة الافتراضية ، الإقرار مطلوب من نسخة متماثلة واحدة فقط.
  • ledge1 - الإقرار مطلوب من كافة النسخ المتماثلة المتزامنة ( تكوين الكتلة min.insync.replicas ).

يمكن أن نرى من القيم أعلاه أن acks تساوي −1 تعطي أقوى ضمانات بأن الرسالة لن تضيع.


كما نعلم جميعا ، النظم الموزعة لا يمكن الاعتماد عليها. للحماية من الأعطال المؤقتة ، يوفر Kafka Producer معلمة لإعادة المحاولة تسمح لك بتعيين عدد محاولات إعادة المحاولة أثناء delivery.timeout.ms . نظرًا لأن المعلمة إعادة المحاولة الافتراضية إلى Integer.MAX_VALUE (2147483647) ، يمكن التحكم في عدد عمليات إعادة إرسال الرسالة عن طريق تغيير delivery.timeout.ms فقط.


تتحرك نحو بالضبط بمجرد التسليم


تسمح هذه الإعدادات للمنتج لدينا بتسليم الرسائل مع ضمان عالي. دعونا نتحدث الآن عن كيفية ضمان تسجيل نسخة واحدة فقط من رسالة في موضوع كافكا؟ في أبسط الحالات ، للقيام بذلك على Producer ، قم بتعيين المعلمة enable.idempotence على true. يضمن Idempotency تسجيل رسالة واحدة فقط في قسم معين من موضوع واحد. الشرط المسبق لتمكين idempotency هو acks = الكل ، إعادة المحاولة> 0 ، max.in.flight.requests.per.connection ≤ 5 . إذا لم يتم تعيين هذه المعلمات بواسطة المطور ، فسيتم تلقائيًا تعيين القيم أعلاه.


عند إعداد idempotency ، من الضروري التأكد من أن الرسائل نفسها تقع في نفس الأقسام في كل مرة. يمكن القيام بذلك عن طريق تكوين المفتاح والمعلمة partitioner.class على Producer. لنبدأ بالمفتاح. لكل شحنة ، يجب أن تكون هي نفسها. يتم تحقيق ذلك بسهولة باستخدام أي معرف أعمال من الرسالة الأصلية. تحتوي المعلمة partitioner.class على قيمة افتراضية من DefaultPartitioner . باستخدام استراتيجية التقسيم هذه ، يكون السلوك الافتراضي كما يلي:


  • إذا تم تحديد القسم بشكل صريح عند إرسال الرسالة ، فإننا نستخدمها.
  • إذا لم يتم تحديد القسم ، ولكن تم تحديد المفتاح ، فحدد القسم بواسطة التجزئة من المفتاح.
  • إذا لم يتم تحديد القسم والمفتاح ، فحدد الأقسام بدورها (round-robin).

بالإضافة إلى ذلك ، يمنحك استخدام المفتاح والمعطيات المرسلة مع المعلمة max.in.flight.requests.per.connection = 1 معالجة منظمة للرسائل على المستهلك. بشكل منفصل ، تجدر الإشارة إلى أنه إذا تم تكوين التحكم في الوصول على نظام المجموعة لديك ، فستحتاج إلى حقوق الكتابة العاطفية للموضوع.


إذا لم يكن لديك فجأة إمكانيات كافية للإرسال العاطفي حسب المفتاح ، أو كان المنطق في جانب المنتج يتطلب الحفاظ على تناسق البيانات بين الأقسام المختلفة ، فستتوقف المعاملات. بالإضافة إلى ذلك ، باستخدام معاملة سلسلة ، يمكنك مزامنة سجل في Kafka بشكل مشروط ، على سبيل المثال ، مع سجل في قاعدة البيانات. لتمكين إرسال المعاملات إلى المنتج ، من الضروري أن يكون لديه ضعف ، وتعيينًا معاملاتًا. إذا تم تكوين التحكم في الوصول على مجموعة Kafka الخاصة بك ، ثم لتسجيل المعاملات ، وكذلك idempotent ، ستحتاج إلى أذونات الكتابة ، والتي يمكن منحها عن طريق قناع باستخدام القيمة المخزنة في المعاملات.


بشكل رسمي ، يمكنك استخدام أي سلسلة ، على سبيل المثال ، اسم التطبيق ، كمعرف معاملة. ولكن إذا قمت بتشغيل العديد من مثيلات التطبيق نفسه بنفس المعاملات ، فسيتم إيقاف المثيل الذي تم إطلاقه لأول مرة مع وجود خطأ ، حيث إن Kafka ستعتبره عملية غيبوبة.


 org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. 

لحل هذه المشكلة ، نضيف لاحقة إلى اسم التطبيق في شكل اسم المضيف ، والتي يتم الحصول عليها من متغيرات البيئة.


تم تكوين المنتج ، لكن المعاملات على كافكا تتحكم فقط في نطاق الرسائل. بغض النظر عن حالة المعاملة ، تندرج الرسالة على الفور في الموضوع ، لكن لها سمات نظام إضافية.


لمنع مثل هذه الرسائل من قراءتها من قبل المستهلك مسبقًا ، يجب عليه تعيين المعلمة isolation.level على read_committed. سيكون هذا المستهلك قادرًا على قراءة الرسائل غير المتعلقة بالمعاملات كما كان من قبل ، والرسائل التي تتم فقط بعد الالتزام.
إذا قمت بتثبيت جميع الإعدادات المذكورة أعلاه ، فحينئذٍ قمت بتكوينها بمجرد التوصيل. تهانينا!


ولكن هناك فارق بسيط آخر. Transactional.id ، الذي قمنا بتكوينه أعلاه ، هو في الواقع بادئة للمعاملات. على مدير المعاملات ، يتم إضافة رقم تسلسلي إليه. يتم إصدار المعرف المستلم على Transactionional.id.expiration.ms ، والذي تم تكوينه على نظام كافكا وله قيمة افتراضية "7 أيام". إذا لم يتلق التطبيق خلال هذه الفترة أي رسائل ، فعندما تحاول إرسال المعاملة التالية ، ستتلقى InvalidPidMappingException . بعد ذلك ، سيصدر منسق المعاملات رقم تسلسل جديد للمعاملة التالية. ومع ذلك ، قد يتم فقد الرسالة إذا لم يتم معالجة InvalidPidMappingException بشكل صحيح.


بدلا من المجاميع


كما ترى ، لا يكفي مجرد إرسال رسائل إلى كافكا. تحتاج إلى اختيار مجموعة من المعلمات وتكون مستعدًا لإجراء تغييرات سريعة. في هذه المقالة ، حاولت أن أعرض بالضبط مرة واحدة إعدادات التسليم بالتفصيل ووصفت العديد من مشكلات تكوين client.id و Transactionional.id التي واجهناها. ملخص إعدادات المنتج والمستهلك ملخصة أدناه.


منتج:


  1. aks = الكل
  2. إعادة المحاولة> 0
  3. enable.idempotence = صحيح
  4. max.in.flight.requests.per.connection ≤ 5 (1 - للإرسال المنظم)
  5. Transactionional.id = $ {application-name} - $ {hostname}

المستهلك:


  1. isolation.level = read_committed

لتقليل الأخطاء في التطبيقات المستقبلية ، قمنا بإعداد برنامجنا على التكوين الربيعي ، حيث تم بالفعل تعيين قيم لبعض المعلمات المدرجة.


وإليك بعض المواد للدراسة المستقلة:


Source: https://habr.com/ru/post/ar481784/


All Articles