تسجيل الأحداث مع كافكا

مرحبا يا هبر!

اكتشفنا آخر احتياطيات من كتاب " Apache Kafka. Stream Streaming and Data Analysis " وأرسلناه إلى المعهد. علاوة على ذلك ، تلقينا عقدًا لكتاب " Streamka in Action " ونبدأ في ترجمته حرفياً الأسبوع المقبل.



لإظهار الحالة المثيرة للاهتمام لاستخدام مكتبة Kafka Streams ، قررنا ترجمة المقالة حول نموذج مصدر الأحداث في Kafka من Adam Worski الذي نُشر مقاله عن لغة Scala قبل أسبوعين. والأكثر إثارة للاهتمام هو أن رأي آدم وورسكي لا يمكن إنكاره: هنا ، على سبيل المثال ، يقال أن هذا النموذج غير مناسب بالتأكيد لكافكا. وكلما كانت لا تنسى ، نأمل أن نحصل على انطباع المقالة.

تمت ترجمة مصطلح "تحديد مصادر الأحداث" على أنه "تسجيل الأحداث" في منشوراتنا عن الهندسة المعمارية النظيفة بواسطة روبرت مارتن وفي هذه المقالة. إذا أعجب شخص ما بترجمة "أحداث الضخ" - فيرجى إبلاغي بذلك.

إنشاء نظام يوفر تسجيل الأحداث (مصادر الأحداث) ، نواجه عاجلاً أم آجلاً مشكلة المثابرة (المثابرة) - وهنا لدينا خياران. أولاً ، هناك EventStore ، وهو تطبيق ناضج مقوى في المعركة. بدلاً من ذلك ، يمكنك استخدام مثابرة akka للاستفادة الكاملة من قابلية كاساندرا للتوسع ، بالإضافة إلى الاعتماد على أداء نموذج الممثل. خيار آخر هو قاعدة البيانات العلائقية القديمة الجيدة ، حيث يتم دمج نهج CRUD مع استخدام الأحداث ويتم تقليل الفائدة القصوى من المعاملات.

بالإضافة إلى هذه الفرص (وربما العديد من الفرص الأخرى) التي نشأت بفضل العديد من الأشياء التي تم تنفيذها مؤخرًا ، أصبح اليوم من السهل جدًا تنظيم تسجيل الأحداث على قمة كافكا . دعونا نلقي نظرة على كيفية ذلك.

ما هو تسجيل الأحداث؟

هناك عدد من المقالات التمهيدية الممتازة حول هذا الموضوع ، لذا سأقتصر على المقدمة الأكثر إيجازًا. عند تسجيل الأحداث ، لا نقوم بحفظ الحالة "الحالية" للكيانات المستخدمة في نظامنا ، ولكن لا يتم حفظ دفق الأحداث المتعلقة بهذه الكيانات. كل حدث هو حقيقة تصف تغيرًا في الحالة (بالفعل!) حدث مع الكائن. كما تعلمون ، الحقائق لا تناقش ولا تتغير .

عندما يكون لدينا تيار من هذه الأحداث ، يمكن توضيح الحالة الحالية للكيان من خلال تقليل جميع الأحداث المتعلقة به ؛ ومع ذلك ، ضع في اعتبارك أن العكس غير ممكن - مع الحفاظ على الحالة "الحالية" فقط ، فإننا نتخلص من الكثير من المعلومات الزمنية القيمة.

يمكن أن يتعايش تسجيل الأحداث بشكل سلمي مع الطرق التقليدية لتخزين الدولة. كقاعدة ، يعالج النظام عددًا من أنواع الكيانات (على سبيل المثال: المستخدمون ، الطلبات ، السلع ، ...) ومن المحتمل جدًا أن يكون تسجيل الأحداث مفيدًا فقط لبعض هذه الفئات. من المهم أن نلاحظ أننا هنا لا نواجه خيار "الكل أو لا شيء" ؛ الأمر يتعلق فقط بميزة إدارة الحالة الإضافية في تطبيقنا.

تخزين الحدث في كافكا

أول مشكلة يجب حلها: كيف يتم تخزين الأحداث في كافكا؟ هناك ثلاث استراتيجيات محتملة:

  • تخزين جميع الأحداث لجميع أنواع الكيانات في موضوع واحد (مع العديد من الأجزاء)
  • بحسب نوع كل موضوع على حدة ، أي أننا نأخذ جميع الأحداث المتعلقة بالمستخدم في موضوع منفصل ، وفي موضوع منفصل - وكلها تتعلق بالمنتج ، وما إلى ذلك.
  • حسب كل موضوع على حدة ، أي حسب موضوع منفصل لكل مستخدم محدد وكل اسم منتج

الاستراتيجية الثالثة (موضوع بموضوع) غير قابلة للتطبيق عمليا. إذا ظهر كل مستخدم جديد في النظام ، فسيتعين عليه بدء موضوع منفصل ، وسرعان ما سيصبح عدد المواضيع غير محدود. سيكون من الصعب للغاية أي تجميع في هذه الحالة ، على سبيل المثال ، سيكون من الصعب فهرسة جميع المستخدمين في محرك البحث ؛ لن تضطر فقط إلى استهلاك عدد كبير من الموضوعات - ولكن لم تكن جميعها معروفة مسبقًا.

لذلك ، يبقى الاختيار بين 1 و 2. كلا الخيارين لهما مزايا وعيوب. إن وجود موضوع واحد يجعل من السهل الحصول على نظرة شاملة لجميع الأحداث. من ناحية أخرى ، من خلال تمييز الموضوع لكل نوع من الكيانات ، يمكنك قياس تدفق كل كيان وتقسيمه على حدة. يعتمد اختيار إحدى الإستراتيجيتين على حالة الاستخدام المحددة.

بالإضافة إلى ذلك ، يمكنك تنفيذ كلا الإستراتيجيتين دفعة واحدة ، إذا كان لديك مساحة تخزين إضافية: أنتج الموضوعات حسب نوع الكيان من موضوع واحد شامل.



في بقية المقالة ، سنعمل مع نوع واحد فقط من الكيان ومع موضوع واحد ، على الرغم من أنه يمكن بسهولة استقراء المادة المقدمة وتطبيقها للعمل مع العديد من المواضيع أو أنواع الكيانات.

(تحرير: كما لاحظ كريس هانت ، هناك مقال ممتاز بقلم مارتن كليبمان ، الذي درس بالتفصيل كيفية توزيع الأحداث حسب الموضوع والقطاع).

أبسط عمليات التخزين في نموذج تسجيل الأحداث

إن أبسط عملية ، والتي من المنطقي توقعها من متجر يدعم تسجيل الأحداث ، هي قراءة الحالة "الحالية" (المصغرة) لكيان معين. كقاعدة ، يكون لكل كيان id واحد أو آخر. وفقًا لذلك ، عند معرفة هذا id ، يجب أن يعرض نظام التخزين الحالة الحالية للكائن.

ستكون الحقيقة في الملاذ الأخير هي سجل الأحداث: يمكن دائمًا استنتاج الحالة الحالية من تيار الأحداث المرتبطة بكيان معين. لهذا ، سيحتاج محرك قاعدة البيانات إلى وظيفة خالصة (بدون آثار جانبية) يقبل الحدث والحالة الأولية ، ويعيد الحالة المتغيرة: Event = > State => State . في وجود مثل هذه الوظيفة وقيمة الحالة الأولية ، فإن الحالة الحالية هي تلاقي لتدفق الأحداث (يجب أن تكون وظيفة تغيير الحالة نظيفة بحيث يمكن تطبيقها بحرية وبشكل متكرر على نفس الأحداث.)

يعمل التطبيق المبسط لعملية "قراءة الحالة الحالية" في كافكا على جمع دفق من جميع الأحداث من الموضوع ، وتصفيتها ، مع ترك الأحداث ذات id المحدد والانهيارات باستخدام الوظيفة المحددة فقط. إذا كان هناك العديد من الأحداث (ومع مرور الوقت ينمو عدد الأحداث فقط) ، يمكن أن تصبح هذه العملية بطيئة وتستهلك الكثير من الموارد. حتى إذا كانت نتيجتها سيتم تخزينها مؤقتًا في الذاكرة وتخزينها في عقدة الخدمة ، فلا يزال يتعين إعادة إنشاء هذه المعلومات بشكل دوري ، على سبيل المثال ، بسبب فشل العقدة أو بسبب الازدحام خارج بيانات ذاكرة التخزين المؤقت.



لذلك ، هناك حاجة إلى طريقة أكثر عقلانية. هذا هو المكان الذي تكون فيه تيارات الكافكا ومستودعات الدولة في متناول اليد. تعمل تطبيقات دفق كافكا على مجموعة كاملة من العقد التي تستهلك موضوعات معينة معًا. يتم تعيين سلسلة من مقاطع الموضوعات المستهلكة لكل عقدة ، تمامًا كما هو الحال مع مستهلك كافكا العادي. ومع ذلك ، توفر تيارات كافكا عمليات بيانات عالية المستوى تجعل من الأسهل إنشاء تيارات مشتقة.

إحدى هذه العمليات في تيارات الكافكا هي تلاقي تيار في التخزين المحلي. تحتوي كل وحدة تخزين محلية على بيانات من تلك الأجزاء فقط التي تستهلكها عقدة معينة. خارج الصندوق ، يتوفر تطبيقان محليان للتخزين: في ذاكرة الوصول العشوائي وعلى أساس RocksDB .

بالعودة إلى موضوع تسجيل الأحداث ، نلاحظ أنه من الممكن انهيار دفق الأحداث في مخزن الدولة من خلال الاحتفاظ بالعقدة المحلية "الحالة الحالية" لكل كيان من الأجزاء المخصصة للعقدة. إذا استخدمنا تنفيذ مخزن الدولة استنادًا إلى RocksDB ، فعندئذٍ يعتمد عدد الكيانات التي يمكننا تتبعها على عقدة واحدة فقط على مقدار مساحة القرص.

في ما يلي الشكل الذي يبدو عليه دمج الأحداث في التخزين المحلي عند استخدام Java API (تعني serde "serializer / deserializer"):

 KStreamBuilder builder = new KStreamBuilder(); builder.stream(keySerde, valueSerde, "my_entity_events") .groupByKey(keySerde, valueSerde) //  :     .reduce((currentState, event) -> ..., "my_entity_store"); .toStream(); //     return builder; 

يتوفر مثال كامل لمعالجة الطلبات بناءً على الخدمات الدقيقة على موقع Confluent على الويب.

(تحرير: كما لاحظ سيرجي إيجوروف ونيكيتا سالنيكوف على تويتر ، لنظام مع تسجيل الأحداث ، ربما تحتاج إلى تغيير إعدادات تخزين البيانات الافتراضية في كافكا بحيث لا يعمل الوقت أو الحجم ، وكذلك اختياريًا ، تمكين ضغط البيانات.)

عرض الوضع الحالي

لقد أنشأنا مستودعا للولاية حيث توجد الحالات الحالية لجميع الكيانات القادمة من المقاطع المخصصة للعقدة ، ولكن كيف تطلب هذا المستودع الآن؟ إذا كان الطلب محليًا (أي أنه يأتي من نفس العقدة التي يوجد بها المستودع) ، فإن كل شيء بسيط جدًا:

 streams .store("my_entity_store", QueryableStoreTypes.keyValueStore()); .get(entityId); 

ولكن ماذا لو أردنا طلب البيانات الموجودة على عقدة أخرى؟ وكيف تعرف ما هي هذه العقدة؟ هنا ، هناك ميزة أخرى تم تقديمها مؤخرًا في Kafka وهي مفيدة: الاستعلامات التفاعلية . بمساعدتهم ، يمكنك الوصول إلى بيانات تعريف كافكا ومعرفة العقدة التي تعالج مقطع الموضوع id المحدد (في هذه الحالة ، يتم استخدام أداة تقسيم الموضوع بشكل ضمني):

 metadataService .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde) 

بعد ذلك ، تحتاج إلى إعادة توجيه الطلب بطريقة ما إلى العقدة الصحيحة. يرجى ملاحظة: أن الطريقة المحددة التي يتم بها تنفيذ الاتصال عبر الموقع ومعالجته - سواء كان REST أو akka-remote أو أي دولة أخرى - لا تنتمي إلى منطقة مسؤولية تيارات كافكا. يوفر Kafka ببساطة الوصول إلى متجر الدولة ويوفر معلومات حول العقدة التي يقع فيها متجر الولاية id المحدد.

التعافي من الكوارث

متاجر الدولة تبدو جميلة ، ولكن ماذا يحدث عندما تفشل العقدة؟ يمكن أن تكون إعادة بناء متجر محلي محلي لقطاع معين عملية مكلفة. يمكن أن يؤدي إلى زيادة التأخير أو فقدان الطلبات لفترة طويلة ، حيث ستحتاج تيارات كافكا إلى إعادة التوازن (بعد إضافة عقدة أو إزالتها).

هذا هو السبب في تسجيل مخازن الحالة طويلة المدى افتراضيًا: أي أن جميع التغييرات التي يتم إجراؤها على المتجر تتم كتابتها بالإضافة إلى موضوع التغيير. يتم ضغط هذا الموضوع (لأنه لكل id نحن مهتمون فقط بالسجل الأخير ، بدون سجل التغييرات ، حيث يتم تخزين السجل في الأحداث نفسها) - لذلك ، فهو صغير قدر الإمكان. هذا هو السبب في أن إعادة إنشاء التخزين على عقدة أخرى يمكن أن يحدث بشكل أسرع.

ومع ذلك ، مع إعادة التوازن في هذه الحالة ، لا تزال التأخيرات ممكنة. num.standby.replicas أكثر ، توفر تيارات num.standby.replicas القدرة على الاحتفاظ num.standby.replicas احتياطية متعددة ( num.standby.replicas ) لكل مستودع. تطبق هذه النسخ المتماثلة جميع التحديثات التي تم استردادها من الموضوعات ذات سجلات التغيير عندما تصبح متاحة ، وتكون جاهزة للتبديل إلى وضع مخزن الحالة الرئيسي لقطاع معين بمجرد فشل المتجر الرئيسي الحالي.

التماسك

باستخدام الإعدادات الافتراضية ، توفر Kafka خدمة التوصيل لمرة واحدة على الأقل. أي أنه في حالة فشل العقدة ، يمكن تسليم بعض الرسائل عدة مرات. على سبيل المثال ، من الممكن أن يتم تطبيق حدث معين مرتين على مخزن الولاية إذا تعطل النظام بعد تغيير مخزن الحالة إلى السجل ، ولكن قبل تنفيذ الإزاحة لهذا الحدث المعين. ربما لن يتسبب هذا في أي صعوبات: يمكن لوظيفة التحديث الخاصة بنا ( Event = > State => State ) أن تتعامل بشكل طبيعي مع مثل هذه المواقف. ومع ذلك ، قد لا يكون قادرًا على التكيف: في مثل هذه الحالة ، يمكن استخدام ضمانات التسليم لمرة واحدة فقط التي تقدمها كافكا. تنطبق هذه الضمانات فقط عند قراءة وكتابة موضوعات كافكا ، ولكن هذا ما نقوم به هنا: في الخلفية ، يتم تقليل جميع الإدخالات في موضوعات كافكا لتحديث سجل التغيير لمتجر الدولة وإجراء عمليات الإزاحة. كل هذا يمكن القيام به في شكل معاملات .

لذلك ، إذا كانت وظيفتنا في تحديث الحالة تتطلب ذلك ، فيمكننا تمكين دلالات معالجة التدفقات "التسليم لمرة واحدة فقط" باستخدام خيار تكوين واحد: processing.guarantee . وبسبب هذا ، ينخفض ​​الأداء ، ولكن لا شيء يأتي هباء.

الاستماع الحدث

الآن بعد أن قمنا بتغطية الأساسيات - الاستعلام عن "الحالة الحالية" وتحديثها لكل كيان - ماذا عن إثارة الآثار الجانبية ؟ في مرحلة ما ، سيصبح هذا ضروريًا ، على سبيل المثال ، من أجل:

  • إرسال رسائل البريد الإلكتروني الإخطار
  • فهرسة كيانات محرك البحث
  • الاتصال بالخدمات الخارجية عبر REST (أو SOAP ، CORBA ، إلخ.)

جميع هذه المهام ، إلى حد ما أو آخر ، يتم حظرها وتتعلق بعمليات الإدخال / الإخراج (وهذا أمر طبيعي بالنسبة للآثار الجانبية) ، لذلك قد لا يكون من الجيد تنفيذها في إطار منطق تحديث الحالة: ونتيجة لذلك ، قد يزيد تكرار الفشل في الحلقة الرئيسية الأحداث ، ومن حيث الأداء سيكون هناك اختناق.

علاوة على ذلك ، يمكن تشغيل وظيفة ذات منطق تحديث الحالة (E Event = > State => State ) عدة مرات (في حالة حدوث أعطال أو إعادة تشغيل) ، وغالبًا ما نريد تقليل عدد الحالات التي يتم فيها تشغيل الآثار الجانبية لحدث معين عدة مرات.

لحسن الحظ ، بما أننا نعمل مع مواضيع كافكا ، لدينا قدر لا بأس به من المرونة. في مرحلة التدفقات ، حيث يتم تحديث متجر الولاية ، يمكن إطلاق الأحداث دون تغيير (أو ، إذا لزم الأمر ، أيضًا في شكل معدل) ، ويمكن استهلاك التيار / الموضوع الناتج (في كافكا هذه المفاهيم متكافئة) كما تريد. علاوة على ذلك ، يمكن استهلاكه إما قبل أو بعد مرحلة تحديث الحالة. أخيرًا ، يمكننا التحكم في كيفية إطلاق الآثار الجانبية: مرة واحدة على الأقل أو مرة واحدة كحد أقصى. يتم توفير الخيار الأول إذا قمت بإجراء الإزاحة لحدث الموضوع المستهلك فقط بعد اكتمال جميع الآثار الجانبية بنجاح. على العكس من ذلك ، بحد أقصى مرة واحدة ، نقوم بإجراء نوبات حتى تبدأ الآثار الجانبية.

هناك العديد من الخيارات لإثارة الآثار الجانبية ، فهي تعتمد على الموقف العملي المحدد. بادئ ذي بدء ، يمكنك تحديد مرحلة تيارات كافكا حيث يتم تشغيل الآثار الجانبية لكل حدث كجزء من وظيفة معالجة التدفق.
يعد إعداد مثل هذه الآلية أمرًا بسيطًا للغاية ، ولكن هذا الحل ليس مرنًا عندما يتعين عليك التعامل مع عمليات إعادة المحاولة والتحكم في الإزاحات والتنافس على الإزاحة للعديد من الأحداث في وقت واحد. في مثل هذه الحالات الأكثر تعقيدًا ، قد يكون من الأنسب تحديد المعالجة باستخدام ، على سبيل المثال ، رد الفعل الكافكا أو آلية أخرى تستهلك مواضيع كافكا "مباشرة".

من الممكن أيضًا أن يؤدي أحد الأحداث إلى تشغيل أحداث أخرى - على سبيل المثال ، يمكن أن يؤدي حدث "الطلب" إلى إطلاق حدثين "التحضير للإرسال" و "إشعار العميل". ويمكن تنفيذ ذلك أيضًا في مرحلة تيارات الكافكا.

أخيرًا ، إذا أردنا تخزين الأحداث أو بعض البيانات المستخرجة من الأحداث في قاعدة بيانات أو محرك بحث ، على سبيل المثال ، في ElasticSearch أو PostgreSQL ، فيمكننا استخدام موصل Kafka Connect ، الذي سيعالج لنا جميع التفاصيل المتعلقة باستهلاك الموضوعات.

إنشاء طرق العرض والإسقاطات

عادة ، لا تقتصر متطلبات النظام على الاستعلام ومعالجة تدفقات كيان واحد فقط. يجب أيضًا دعم التجميع ، مزيج من تيارات الأحداث المتعددة. غالبًا ما يشار إلى هذه التدفقات المجمعة على أنها إسقاطات ، وعندما تنهار ، يمكن استخدامها لإنشاء تمثيلات للبيانات . هل يمكن تنفيذها بالكفكة؟



مرة أخرى ، نعم! تذكر أننا من حيث المبدأ نتعامل ببساطة مع موضوع كافكا ، حيث يتم تخزين أحداثنا. لذلك ، لدينا كل قوة مستهلكي / منتجي كافكا الخام ، وجمع تيارات الكافكا ، وحتى KSQL - كل هذا سيكون مفيدًا بالنسبة لنا لتحديد التوقعات. على سبيل المثال ، باستخدام kafka-streams ، يمكنك تصفية الدفق ، أو العرض ، أو التجميع حسب المفتاح ، أو التجميع في النوافذ المؤقتة أو جلسات الجلسة ، إلخ. سواء على مستوى الكود ، أو باستخدام KSQL شبيه بـ SQL.

يمكن تخزين هذه التدفقات وتوفيرها لطلبات البحث لفترة طويلة باستخدام مخازن الدولة والاستعلامات التفاعلية ، تمامًا كما فعلنا مع تدفقات الكيانات الفردية.

ما هي الخطوة التالية

لمنع التدفق اللانهائي للأحداث مع تطور النظام ، قد يكون خيار الضغط مثل حفظ لقطات من "الحالة الحالية" مفيدًا. وبالتالي ، يمكننا قصر أنفسنا على تخزين بعض اللقطات الأخيرة وتلك الأحداث التي حدثت بعد إنشائها.

على الرغم من أن Kafka ليس لديه دعم مباشر للقطات (وفي بعض الأنظمة الأخرى التي تعمل على مبدأ تسجيل الأحداث ، فهو كذلك) ، يمكنك بالتأكيد إضافة هذا النوع من الوظائف بنفسك ، باستخدام بعض الآليات المذكورة أعلاه ، مثل التدفقات والمستهلكين ومتاجر الدولة وما إلى ذلك. د.

الملخص

على الرغم من أنه في البداية ، لم يتم تصميم Kafka مع التركيز على نموذج تسجيل الحدث ، إلا أنه في الواقع هو محرك تدفق البيانات مع دعم تكرار الموضوع ، والتجزئة ، ومستودعات الدولة ، وواجهات API المتدفقة ، وهو مرن للغاية في نفس الوقت. لذلك ، فوق كافكا ، يمكنك بسهولة تنفيذ نظام تسجيل الأحداث. علاوة على ذلك ، نظرًا لأنه على خلفية كل ما يحدث ، سيكون لدينا دائمًا موضوع كافكا ، سنكتسب مرونة إضافية ، حيث يمكننا العمل إما مع واجهات برمجة التطبيقات المتدفقة عالية المستوى أو المستهلكين ذوي المستوى المنخفض.

Source: https://habr.com/ru/post/ar424739/


All Articles