كتاب كافكا تيارات في العمل. التطبيقات في الوقت الحقيقي والخدمات الصغيرة »

صورة مرحبا ، habrozhiteli! هذا الكتاب مناسب لأي مطور يريد فهم معالجة البث. إن فهم البرمجة الموزعة سيساعدك على فهم تيارات كافكا وكافكا بشكل أفضل. سيكون من الجيد معرفة إطار كافكا نفسه ، لكن هذا ليس ضروريًا: سأخبرك بكل ما تحتاجه. بفضل هذا الكتاب ، سيتعلم مطورو Kafka ذوي الخبرة ، مثل المبتدئين ، كيفية إنشاء تطبيقات تدفق مثيرة باستخدام مكتبة Kafka Streams. سوف يتعلم مطورو Java من المستوى المتوسط ​​والعالي ممن هم على دراية بمفاهيم مثل التسلسل ، كيفية تطبيق مهاراتهم لإنشاء تطبيقات Kafka Streams. تتم كتابة التعليمات البرمجية المصدر للكتاب في Java 8 ويستخدم أساسًا بناء جملة تعبيرات lambda في Java 8 ، وبالتالي فإن القدرة على العمل مع وظائف lambda (حتى في لغة برمجة أخرى) مفيدة لك.

مقتطفات. 5.3. عمليات التجميع والنافذة


في هذا القسم ، ننتقل إلى الأجزاء الواعدة من تيارات كافكا. حتى الآن قمنا بتغطية الجوانب التالية من تيارات كافكا:

  • إنشاء طوبولوجيا المعالجة ؛
  • استخدام الحالة في تطبيقات التدفق ؛
  • إجراء اتصالات دفق البيانات ؛
  • الاختلافات بين تدفقات الحدث (KStream) وتحديث التدفقات (KTable).

في الأمثلة التالية ، سنجمع كل هذه العناصر معًا. بالإضافة إلى ذلك ، سيتم تعريفك بعمليات النوافذ - ميزة أخرى رائعة لتطبيقات التدفق. مثالنا الأول سيكون تجميع بسيط.

5.3.1. تجميع مبيعات الأسهم حسب الصناعة


التجميع والتجميع أدوات حيوية للعمل مع تدفق البيانات. غالباً ما لا يكون فحص السجلات الفردية على أساس القبول. لاستخراج معلومات إضافية من البيانات ، يعد التجميع والجمع ضروريين.

في هذا المثال ، يتعين عليك تجربة متداول يومي يحتاج إلى تتبع حجم مبيعات الأسهم في العديد من الصناعات. على وجه الخصوص ، أنت مهتم بالشركات الخمس التي حققت أكبر مبيعات في كل صناعة.

لمثل هذا التجميع ، ستحتاج إلى العديد من الخطوات التالية لترجمة البيانات إلى النموذج المطلوب (بعبارات عامة).

  1. إنشاء مصدر يستند إلى الموضوع الذي ينشر معلومات تداول الأسهم الخام. سيتعين علينا تعيين كائن من النوع StockTransaction إلى كائن من النوع ShareVolume. الحقيقة هي أن كائن StockTransaction يحتوي على بيانات تعريف المبيعات ، ونحن بحاجة فقط إلى بيانات حول عدد الأسهم المباعة.
  2. تجميع بيانات ShareVolume بواسطة رموز الأسهم. بعد التجميع حسب الرموز ، يمكنك طي هذه البيانات إلى مجاميع مبيعات الأسهم. تجدر الإشارة إلى أن الأسلوب KStream.groupBy يُرجع مثيلاً من النوع KGroupedStream. ويمكنك الحصول على مثيل KTable عن طريق استدعاء الأسلوب KGroupedStream.reduce لاحقًا.

ما هي واجهة KGroupedStream

أساليب KStream.groupBy و KStream.groupByKey بإرجاع مثيل KGroupedStream. KGroupedStream هو تمثيل متوسط ​​لدفق الحدث بعد التجميع حسب المفتاح. ليس المقصود على الإطلاق للعمل مباشرة معها. بدلاً من ذلك ، يتم استخدام KGroupedStream لعمليات التجميع ، والنتيجة هي دائمًا KTable. ونظرًا لأن نتيجة عمليات التجميع هي KTable وأنها تستخدم سعة التخزين الحكومية ، فمن المحتمل ألا يتم إرسال جميع التحديثات كنتيجة أخرى إلى أسفل خط الأنابيب.

تقوم طريقة KTable.groupBy بإرجاع KGroupedTable مشابه - وهو تمثيل وسيط لدفق التحديثات المعاد تجميعها حسب المفتاح.

لنأخذ استراحة قصيرة وننظر إلى التين. 5.9 ، مما يدل على ما حققناه. يجب أن تكون هذه الهيكلية مألوفة لك بالفعل.

صورة

الآن دعنا نلقي نظرة على الكود الخاص بهذا الهيكل (يمكن العثور عليه في ملف src / main / java / bbejeck / Chapter_5 / AggregationsAndReducingExample.java) (سرد 5.2).

صورة

يختلف الرمز المحدد في الإيجاز وحجم كبير من الإجراءات المنفذة في عدة سطور. في المعلمة الأولى للأسلوب builder.stream ، يمكنك أن تلاحظ شيئًا جديدًا لنفسك: قيمة النوع التعداد AutoOffsetReset.EARLIEST (يوجد أيضًا LATEST) ، تم تعيينه باستخدام الأسلوب Consumed.withOffsetResetPolicy. باستخدام هذا النوع المرقم ، يمكنك تحديد استراتيجية لإعادة تعيين الإزاحة لكل من KStream أو KTable ؛ لها الأولوية على المعلمة لإعادة تعيين الإزاحة من التكوين.

GroupByKey و GroupBy

تحتوي واجهة KStream على طريقتين لتجميع السجلات: GroupByKey و GroupBy. كلاهما يعرض KGroupedTable ، لذا قد يكون لديك سؤال مشروع: ما هو الفرق بينهما ومتى تستخدم أيًا منها؟

يتم استخدام طريقة GroupByKey عندما تكون المفاتيح في KStream غير فارغة بالفعل. والأهم من ذلك أن العلم "يتطلب إعادة التقسيم" لم يتم تعيينه.

تفترض طريقة GroupBy أنك غيّرت مفاتيح التجميع ، لذلك تم تعيين علامة إعادة التقسيم إلى true. يؤدي إجراء التوصيلات والتجمعات وما إلى ذلك بعد طريقة GroupBy إلى إعادة التقسيم التلقائي.
ملخص: يجب عليك استخدام GroupByKey بدلاً من GroupBy كلما كان ذلك ممكنًا.

ما يمكن فهمه عن طريق mapValues ​​و groupBy ، لذلك يمكنك إلقاء نظرة على طريقة sum () (يمكن العثور عليها في ملف src / main / java / bbejeck / model / ShareVolume.java) (سرد 5.3).

صورة

تقوم طريقة ShareVolume.sum بإرجاع المجموع الفرعي لحجم مبيعات الأسهم ، وتكون نتيجة سلسلة الحساب بأكملها عبارة عن كائن KTable <String ، ShareVolume>. أنت الآن تفهم الدور الذي يلعبه KTable. عند وصول كائنات ShareVolume ، يتم حفظ آخر تحديث حالي في KTable المقابل. من المهم ألا ننسى أن جميع التحديثات تنعكس في shareVolumeKTable السابقة ، ولكن لا يتم إرسالها جميعًا.

علاوة على ذلك ، بمساعدة KTable ، نقوم بإجراء التجميع (حسب عدد الأسهم المباعة) من أجل الحصول على الشركات الخمس التي حققت أعلى مبيعات في كل صناعة. ستكون إجراءاتنا في هذه الحالة مماثلة لإجراءات التجميع الأولى.

  1. قم بإجراء عملية groupBy أخرى لتجميع كائنات ShareVolume الفردية حسب الصناعة.
  2. تابع تلخيص كائنات ShareVolume. هذه المرة ، كائن التجميع قائمة انتظار أولوية ذات حجم ثابت. يتم الاحتفاظ فقط خمس شركات مع أكبر عدد من الأسهم المباعة في قائمة انتظار ذات حجم ثابت.
  3. عرض الأسطر من الفقرة السابقة في قيمة السلسلة وإرجاع الخمسة الأكثر مبيعًا بعدد الأسهم حسب الصناعة.
  4. اكتب النتائج في شكل سلسلة للموضوع.

في التين. 5.10 يوضح الرسم البياني لطوبولوجيا حركة البيانات. كما ترون ، الجولة الثانية من المعالجة بسيطة للغاية.

صورة

الآن ، بعد أن فهمت بوضوح بنية هذه الجولة الثانية من المعالجة ، يمكنك الرجوع إلى الكود المصدر الخاص بها (ستجده في الملف src / main / java / bbejeck / Chapter_5 / AggregationsAndReducingExample.java) (سرد 5.4).

يوجد متغير fixQueue في هذا المُهيئ. هذا هو كائن مخصص - محول java.util.TreeSet ، والذي يستخدم لتتبع N أعلى النتائج في تناقص ترتيب عدد المشاركات المباعة.

صورة

لقد واجهت بالفعل مكالمات إلى groupBy و mapValues ​​، لذلك لن نتوقف عن هذه المكالمات (نسميها KTable.toStream ، لأن أسلوب KTable.print قد تم إهماله). لكنك لم تشاهد بعد إصدار KTable للأسلوب التجميعي () ، لذلك سنقضي بعض الوقت في مناقشته.

كما تتذكر ، تتميز KTable بحقيقة أن السجلات ذات المفاتيح نفسها تعتبر تحديثات. يستبدل KTable السجل القديم بالسجل الجديد. يحدث التجميع بنفس الطريقة: يتم تجميع السجلات الأخيرة بمفتاح واحد. عند وصول سجل ، تتم إضافته إلى مثيل للفئة FixedSizePriorityQueue باستخدام adder (المعلمة الثانية في استدعاء الأسلوب التجميعي) ، ولكن في حالة وجود سجل آخر بنفس المفتاح بالفعل ، فسيتم حذف السجل القديم باستخدام المفتاح الفرعي (المعلمة الثالثة في استدعاء الأسلوب التجميعي).

كل هذا يعني أن أداة التجميع الخاصة بنا ، FixedSizePriorityQueue ، لا تجمع كل القيم بمفتاح واحد ، ولكنها تخزن المبلغ المتحرك للكميات N من أنواع الأسهم الأكثر مبيعًا. يحتوي كل إدخال على إجمالي عدد الأسهم المباعة حتى الآن. ستقدم لك KTable معلومات حول الأسهم التي يتم بيعها في الوقت الحالي على الأكثر ؛ ولا يلزم التجميع المتداول لكل تحديث.

لقد تعلمنا أن نفعل شيئين مهمين:

  • قيم المجموعة في KTable بواسطة مفتاح مشترك لها ؛
  • إجراء عمليات مفيدة مثل الالتفاف والتجميع على هذه القيم المجمعة.

تعد القدرة على تنفيذ هذه العمليات مهمة لفهم معنى البيانات التي تنتقل عبر تطبيق Kafka Streams ومعرفة المعلومات التي تحملها.

لقد جمعنا أيضًا بعض المفاهيم الأساسية التي نوقشت سابقًا في هذا الكتاب. في الفصل 4 ، تحدثنا عن مدى أهمية وجود حالة محلية آمنة من الفشل للتطبيق المتدفق. يوضح المثال الأول في هذا الفصل سبب أهمية الولاية المحلية - فهو يجعل من الممكن تتبع المعلومات التي رأيتها بالفعل. يتجنب الوصول المحلي تأخيرات الشبكة ، مما يجعل التطبيق أكثر إنتاجية ومقاومًا للخطأ.

عند إجراء أي عملية تحويل أو تجميع ، يجب عليك تحديد اسم متجر الحالة. تقوم عمليات الالتفاف والتجميع بإرجاع مثيل KTable ، ويستخدم KTable متجرا حكوميا لاستبدال النتائج القديمة بنتائج جديدة. كما رأيت ، لا يتم إرسال جميع التحديثات إلى أسفل خط الأنابيب ، وهذا أمر مهم ، لأن عمليات التجميع مصممة للحصول على المعلومات النهائية. إذا لم يتم تطبيق الحالة المحلية ، فسوف يقوم KTable بإرسال المزيد من نتائج التجميع واللف.

بعد ذلك ، ننظر إلى تنفيذ عمليات مثل التجميع ، خلال فترة زمنية محددة - ما يسمى بعمليات النوافذ.

5.3.2. نافذة العمليات


في القسم السابق ، قدمنا ​​الإلتفاف والتجميع "المتداول". أجرى التطبيق الإلتفاف المستمر لمبيعات الأسهم مع التجميع اللاحق للأسهم الخمسة الأكثر مبيعًا.

في بعض الأحيان مثل هذا التجميع المستمر والالتفاف من النتائج أمر ضروري. وأحيانًا تحتاج إلى إجراء عمليات فقط في فترة زمنية محددة. على سبيل المثال ، احسب عدد معاملات البورصة التي تمت مع أسهم شركة معينة في آخر 10 دقائق. أو كم عدد المستخدمين الذين نقروا على إعلان بانر جديد في آخر 15 دقيقة. يمكن للتطبيق إجراء مثل هذه العمليات عدة مرات ، ولكن مع النتائج المتعلقة فقط بفواصل زمنية محددة (إطارات زمنية).

عد المعاملات الصرف من قبل المشتري


في المثال التالي ، سوف نشارك في تتبع معاملات التبادل للعديد من المتداولين - سواء المؤسسات الكبيرة أو الممولين الأذكياء.

هناك سببان محتملان لهذا التتبع. واحد منهم هو الحاجة إلى معرفة ما هي الرائدة في السوق شراء / بيع. إذا رأى هؤلاء اللاعبون الكبار والمستثمرون المتطورون الفرص لأنفسهم ، فمن المنطقي اتباع استراتيجيتهم. السبب الثاني هو الرغبة في ملاحظة أي علامات محتملة للمعاملات غير القانونية باستخدام المعلومات الداخلية. للقيام بذلك ، سوف تحتاج إلى تحليل العلاقة بين الطفرات الكبيرة في المبيعات والنشرات الصحفية الهامة.

يتكون هذا التتبع من خطوات مثل:

  • إنشاء دفق للقراءة من موضوع معاملات الأسهم ؛
  • تجميع السجلات الواردة بواسطة معرف العميل ورمز السهم للسهم. إرجاع استدعاء الأسلوب groupBy مثيل لفئة KGroupedStream؛
  • يُرجع KGroupedStream.windowedBy دفق بيانات مُحدَّد بإطار مؤقت ، مما يسمح بتجميع الإطارات. اعتمادًا على نوع الإطار ، يتم إرجاع TimeWindowedKStream أو SessionWindowedKStream ؛
  • حساب المعاملات لعملية التجميع. يحدد دفق بيانات النافذة ما إذا كان سجل معين يؤخذ في الاعتبار في هذا الحساب ؛
  • كتابة النتائج لموضوع أو إخراجها إلى وحدة التحكم أثناء التطوير.

طوبولوجيا هذا التطبيق بسيطة ، ولكن صورته البصرية لا تؤذي. نلقي نظرة على الموافقة المسبقة عن علم. 5.11.

علاوة على ذلك ، سننظر في وظائف عمليات النافذة والرمز المقابل.

صورة

أنواع النوافذ


هناك ثلاثة أنواع من النوافذ في Kafka Streams:

  • الدورة؛
  • تراجع (تراجع) ؛
  • انزلاق / "القفز" (انزلاق / التنقل).

أي واحد للاختيار يعتمد على متطلبات العمل. تكون إطارات "السقوط" و "القفز" محدودة في الوقت المناسب ، في حين ترتبط قيود الجلسة بإجراءات المستخدم - يتم تحديد مدة الجلسة (جلسات) فقط من خلال كيفية سلوك المستخدم بفعالية. الشيء الرئيسي هو عدم نسيان أن جميع أنواع النوافذ تعتمد على طوابع التاريخ / الوقت للسجلات وليس على وقت النظام.

بعد ذلك ، نطبق طوبولوجيا لدينا مع كل نوع من أنواع النوافذ. سيتم إعطاء الرمز الكامل فقط في المثال الأول ، ولن يتغير أي شيء بالنسبة للأنواع الأخرى من النوافذ ، باستثناء نوع تشغيل النافذة.

نوافذ الجلسة


تختلف نوافذ الجلسة تمامًا عن جميع أنواع النوافذ الأخرى. فهي محدودة ليس بالوقت بقدر نشاطها من قبل نشاط المستخدم (أو نشاط الكيان الذي ترغب في تتبعه). يتم تحديد نوافذ الجلسة بفترات من عدم النشاط.

يوضح الشكل 5.12 مفهوم نوافذ الجلسة. سيتم دمج جلسة أصغر مع جلسة إلى اليسار. وستكون الجلسة على اليمين منفصلة ، لأنها تتبع فترة طويلة من عدم النشاط. تعتمد نوافذ الجلسة على تصرفات المستخدم ، ولكن تطبق طوابع التاريخ / الوقت من السجلات لتحديد الجلسة التي ينتمي إليها السجل.

صورة


استخدام جلسة Windows لتتبع معاملات التبادل


سنستخدم نوافذ الجلسة لالتقاط معلومات حول معاملات التبادل. يظهر تنفيذ نوافذ الجلسة في قائمة 5.5 (التي يمكن العثور عليها في src / main / java / bbejeck / Chapter_5 / CountingWindowingAndKTableJoinExample.java).

صورة

لقد قابلت بالفعل معظم عمليات هذا الهيكل ، لذلك ليست هناك حاجة للنظر فيها مرة أخرى. ولكن هناك العديد من العناصر الجديدة التي سنناقشها الآن.

في أي عملية من مجموعات GroupBy ، عادة ما يتم إجراء نوع من عمليات التجميع (التجميع أو الإلتفاف أو العد). يمكنك تنفيذ التجميع التراكمي مع المجموع التراكمي ، أو تجميع النافذة ، والتي يتم فيها أخذ السجلات في الاعتبار داخل نافذة زمنية محددة.

يحسب الكود في قائمة 5.5 عدد المعاملات داخل نوافذ الجلسة. في التين. 5.13 يتم تحليل هذه الإجراءات خطوة بخطوة.

عن طريق استدعاء windowedBy (SessionWindows.with (twentySeconds) .until (fifteenMinutes)) نقوم بإنشاء إطار جلسة مع فاصل خمول من 20 ثانية وفترة استبقاء من 15 دقيقة. يعني الفاصل الزمني الخمول 20 ثانية أن التطبيق سيشمل أي سجل يصل خلال 20 ثانية من نهاية أو بداية الجلسة الحالية في الجلسة الحالية (النشطة).

صورة

بعد ذلك ، نشير إلى عملية التجميع التي يجب تنفيذها في نافذة الجلسة - في هذه الحالة ، يتم حسابها. إذا كان السجل الوارد يقع خارج فاصل الخمول (على جانبي طابع التاريخ / الوقت) ، فسيقوم التطبيق بإنشاء جلسة جديدة. فاصل الحفظ يعني الاحتفاظ بجلسة لفترة معينة ويسمح بالبيانات المتأخرة التي تتجاوز فترة عدم نشاط الجلسة ولكن لا يزال من الممكن إرفاقها. بالإضافة إلى ذلك ، تتوافق بداية ونهاية الجلسة الجديدة الناتجة عن الدمج مع طابع التاريخ / الوقت الأقدم والأحدث.

دعونا نلقي نظرة على بعض الإدخالات من طريقة العد لنرى كيف تعمل الجلسات (الجدول 5.1).

صورة

عند استلام السجلات ، نبحث عن جلسات موجودة بالفعل باستخدام نفس المفتاح ، ووقت الانتهاء أقل من طابع التاريخ / الوقت الحالي - فاصل الخمول ووقت البدء أكثر من الفاصل الزمني للتاريخ / الوقت الحالي + فاصل الخمول الحالي. مع وضع هذا في الاعتبار ، أربعة سجلات من الجدول. 5.1 دمج في جلسة واحدة على النحو التالي.

1. السجل الأول يأتي أولاً ، وبالتالي فإن وقت البدء يساوي وقت النهاية وهو 00:00:00.

2. التالي يأتي الرقم القياسي 2 ، ونحن نبحث عن الجلسات التي تنتهي في موعد لا يتجاوز 23:59:55 وتبدأ في موعد لا يتجاوز 00:00:35. ابحث عن السجل 1 ودمج الجلستين 1 و 2. خذ وقت بدء الجلسة 1 (الأقدم) ووقت الانتهاء من الجلسة 2 (الأحدث) ، لذلك تبدأ جلستنا الجديدة في 00:00:00 وتنتهي في 00:00:15.

3. وصل السجل 3 ، ونحن نبحث عن الجلسات بين 00:00:30 و 00:01:10 ولا نجد أي. أضف جلسة ثانية للمفتاح 123-345-654 ، FFBE ، تبدأ وتنتهي في 00:00:50.

4. يصل سجل 4 ، ونحن نبحث عن جلسات بين 23:59:45 و 00:00:25. هذه المرة يوجد كلتا الدورتين - 1 و 2. يتم دمج جميع الجلسات الثلاث في واحدة ، مع وقت البدء من 00:00:00 ووقت الانتهاء من 00:00:15.

مما يقال في هذا القسم ، يجدر تذكر الفروق المهمة التالية:

  • الجلسات ليست نوافذ ذات حجم ثابت. يتم تحديد مدة الجلسة حسب النشاط خلال فترة زمنية محددة ؛
  • تحدد طوابع التاريخ / الوقت في البيانات ما إذا كان الحدث يقع في جلسة حالية أو في فترة من عدم النشاط.

علاوة على ذلك ، سنناقش نوع النوافذ التالي - windows "شقلبة".

هبوط النوافذ


تلتقط النوافذ "السقطة" الأحداث التي تقع في فترة زمنية معينة. تخيل أنك بحاجة إلى التقاط جميع معاملات التبادل الخاصة بشركة ما كل 20 ثانية ، بحيث تجمع كل الأحداث لهذه الفترة الزمنية. في نهاية الفاصل الزمني لمدة 20 ثانية ، "تتراجع" النافذة وتتحول إلى فاصل مراقبة جديد مدته 20 ثانية. يوضح الشكل 5.14 هذا الموقف.

صورة

كما ترون ، يتم تضمين جميع الأحداث المستلمة خلال العشرين ثانية الماضية في النافذة. في نهاية هذه الفترة الزمنية ، يتم إنشاء نافذة جديدة.

تُظهر القائمة 5.6 الكود الذي يوضح استخدام النوافذ المتدفقة لالتقاط معاملات التبادل كل 20 ثانية (يمكنك العثور عليها في src / main / java / bbejeck / Chapter_5 / CountingWindowingAndKtableJoinExample.java).

صورة

بفضل هذا التغيير الطفيف في استدعاء الأسلوب TimeWindows.of ، يمكنك استخدام الإطار المتراجع. في هذا المثال ، لا يوجد استدعاء للطريقة () ، ونتيجة لذلك سيتم استخدام فاصل الحفظ الافتراضي لمدة 24 ساعة.

أخيرًا ، حان الوقت للانتقال إلى آخر خيارات النافذة - التنقل بين النوافذ.

انزلاق ("القفز") النوافذ


تشبه النوافذ المنزلق / "التنقل" النوافذ "المتدفقة" ، ولكن مع اختلاف بسيط. لا تنتظر النوافذ المنزلقة نهاية الفاصل الزمني قبل إنشاء نافذة جديدة للتعامل مع الأحداث الأخيرة. يبدأون حسابات جديدة بعد فترة انتظار أقصر من مدة النافذة.

لتوضيح الاختلافات بين windows "الشقلبة" و "القفز" ، دعنا نعود إلى المثال مع حساب معاملات التبادل. هدفنا ، كما كان من قبل ، هو حساب عدد المعاملات ، لكننا لا نريد الانتظار طوال الوقت قبل تحديث العداد. بدلاً من ذلك ، سنقوم بتحديث العداد على فترات زمنية أقصر. على سبيل المثال ، سنستمر في حساب عدد المعاملات كل 20 ثانية ، ولكن لتحديث العداد كل 5 ثوان ، كما هو مبين في الشكل. 5.15. في الوقت نفسه ، لدينا ثلاث نوافذ للنتائج تحتوي على بيانات متداخلة.

صورة

تُظهر القائمة 5.7 رمز تحديد النوافذ المنزلقة (يمكن العثور عليه في src / main / java / bbejeck / Chapter_5 / CountingWindowingAndKtableJoinExample.java).

صورة

يمكن تحويل النافذة المتداعية إلى نافذة متداعية بإضافة استدعاء إلى الأسلوب advanceBy (). في المثال أعلاه ، الفاصل الزمني الحفظ هو 15 دقيقة.

لقد رأيت في هذا القسم كيفية قصر نتائج التجميع على الإطارات الزمنية. على وجه الخصوص ، أود منك أن تتذكر الأشياء الثلاثة التالية من هذا القسم:

  • لا يقتصر حجم إطارات جلسة العمل على الفاصل الزمني ، ولكن حسب نشاط المستخدم ؛
  • تعطي النوافذ "المتداعية" فكرة عن الأحداث خلال فترة زمنية محددة ؛
  • يتم إصلاح مدة الإطارات "القفز" ، لكنها غالباً ما يتم تحديثها وقد تحتوي على إدخالات متداخلة في جميع النوافذ.

بعد ذلك ، سوف نتعلم كيفية تحويل KTable مرة أخرى إلى KStream للاتصال.

5.3.3. انضم إلى كائنات KStream و KTable


في الفصل 4 ، ناقشنا ربط كائنين KStream. الآن علينا أن نتعلم كيفية توصيل KTable و KStream. قد تكون هناك حاجة لهذا السبب البسيط التالي. KStream هو دفق من السجلات ، و KTable هو دفق من تحديثات السجل ، ولكن في بعض الأحيان قد يكون من الضروري إضافة سياق إضافي إلى دفق السجلات باستخدام تحديثات من KTable.

نحن نأخذ البيانات المتعلقة بعدد المعاملات التبادلية ونجمعها بتبادل الأخبار حول الصناعات ذات الصلة. إليك ما عليك القيام به لتحقيق ذلك ، بالنظر إلى الكود الموجود.

  1. قم بتحويل كائن KTable ببيانات حول عدد معاملات التبادل إلى KStream ، متبوعًا باستبدال المفتاح بمفتاح يحدد الصناعة المطابقة لرمز المشاركات هذا.
  2. إنشاء كائن KTable يقرأ البيانات من موضوع مع تبادل الأخبار. سيتم تصنيف KTable الجديد حسب الصناعة.
  3. ادمج تحديثات الأخبار مع معلومات عن عدد المعاملات التبادلية حسب الصناعة.

الآن دعونا نرى كيفية تنفيذ خطة العمل هذه.

تحويل KTable إلى KStream


لتحويل KTable إلى KStream ، قم بما يلي.

  1. استدعاء الأسلوب KTable.toStream ().
  2. باستخدام استدعاء الأسلوب KStream.map ، استبدل المفتاح باسم الصناعة ، ثم قم باستخراج كائن TransactionSummary من مثيل Windowed.

نقوم بسلسلة هذه العمليات بالطريقة التالية (يمكن العثور على الكود في src / main / java / bbejeck / Chapter_5 / CountingWindowingAndKtableJoinExample.java) (سرد 5.8).

صورة

KStream.map, KStream .

, KTable .

KTable


, KTable ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.9).

صورة

, Serde , Serde. EARLIEST .

— .


. , ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.10).

صورة

بيان leftJoin بسيط للغاية. بخلاف الاتصالات في الفصل 4 ، لا يتم استخدام الأسلوب JoinWindow ، لأنه عند إنشاء KStream-KTable ، يوجد سجل واحد فقط لكل مفتاح في KTable. لا يقتصر مثل هذا الاتصال في الوقت: السجل إما في KTable أو غائب. الاستنتاج الرئيسي: بمساعدة كائنات KTable ، يمكنك إثراء KStream ببيانات مرجعية أقل تحديثًا بشكل متكرر.

سننظر الآن في طريقة أكثر فعالية لإثراء الأحداث من KStream.

5.3.4. كائنات GlobalKTable


, . 4 KStream, — KStream KTable. . , Kafka Streams . , , ( 4, « » 4.2.4).


— , ; . , , .


في بعض الحالات ، تكون كمية البيانات المرجعية التي تم التخطيط للاتصال بها صغيرة نسبيًا ، بحيث يمكن نسخ كاملة منها محليًا على كل عقد من العقد. لمثل هذه الحالات ، يوفر Kafka Streams فئة GlobalKTable.

مثيلات GlobalKTable فريدة من نوعها لأن التطبيق ينسخ كافة البيانات إلى كل عقدة. ونظرًا لوجود جميع البيانات الموجودة على كل عقدة ، ليست هناك حاجة لتقسيم دفق الأحداث بواسطة مفتاح البيانات المرجعي بحيث يكون في متناول جميع الأقسام. يمكن أن تؤدي الكائنات GlobalKTable أيضًا اتصالات بدون مفتاح. دعنا نعود إلى أحد الأمثلة السابقة لإظهار هذه الميزة.

توصيل كائنات KStream بكائنات GlobalKTable


في القسم الفرعي 5.3.2 ، قمنا بتجميع النوافذ لمعاملات الصرف من قبل العملاء. تبدو نتائج هذا التجميع مثل هذا:

{customerId='074-09-3705', stockTicker='GUTM'}, 17 {customerId='037-34-5184', stockTicker='CORK'}, 16 

على الرغم من أن هذه النتائج كانت متوافقة مع الهدف ، إلا أنه سيكون أكثر ملاءمة إذا تم عرض اسم العميل والاسم الكامل للشركة. لإضافة اسم عميل واسم شركة ، يمكنك إجراء اتصالات عادية ، ولكن ستحتاج إلى إجراء تعيينين رئيسيين وإعادة التقسيم. مع GlobalKTable يمكنك تجنب تكلفة هذه العمليات.

للقيام بذلك ، سوف نستخدم كائن countStream من القائمة 5.11 (يمكن العثور على الكود المقابل في ملف src / main / java / bbejeck / Chapter_5 / GlobalKTableExample.java) ، وربطه بكائنين GlobalKTable.

صورة

لقد ناقشنا هذا من قبل ، لذا لن أكرره. لكنني لاحظت أن الكود الموجود في الدالة toStream (). Map يتم استخلاصه في كائن الوظيفة من أجل سهولة القراءة بدلاً من تعبير lambda المضمن.

الخطوة التالية هي إعلان حالتين من GlobalKTable (يمكن العثور على الكود الظاهر في src / main / java / bbejeck / Chapter_5 / GlobalKTableExample.java) (سرد 5.12).

صورة


لاحظ أن أسماء الموضوعات موصوفة باستخدام أنواع قائمة.

الآن وبعد أن أعددنا جميع المكونات ، يبقى كتابة التعليمات البرمجية للاتصال (التي يمكن العثور عليها في الملف src / main / java / bbejeck / Chapter_5 / GlobalKTableExample.java) (سرد 5.13).

صورة

على الرغم من وجود مركبين في هذا الكود ، إلا أنهما منظمان في سلسلة ، حيث لا يتم استخدام أي من نتائجهما بشكل منفصل. يتم عرض النتائج في نهاية العملية بأكملها.

عند بدء عملية الاتصال أعلاه ، ستحصل على النتائج التالية:

 {customer='Barney, Smith' company="Exxon", transactions= 17} 

لم يتغير الجوهر ، لكن هذه النتائج تبدو أكثر وضوحًا.

عد الفصل 4 ، لقد رأيت بالفعل عدة أنواع من الاتصالات في العمل. يتم سردها في الجدول. 5.2. يعكس هذا الجدول التوصيلية المتعلقة بالإصدار 1.0.0 من Kafka Streams ؛ سوف يتغير شيء في الإصدارات المستقبلية.

صورة

في الختام ، سوف أذكركم بالشيء الرئيسي: يمكنك توصيل تدفقات الأحداث (KStream) وتحديث التدفقات (KTable) باستخدام الحالة المحلية. بالإضافة إلى ذلك ، إذا لم يكن حجم البيانات المرجعية كبيرًا جدًا ، فيمكنك استخدام كائن GlobalKTable. يقوم GlobalKTable بتكرار جميع الأقسام لكل عقد من تطبيق Kafka Streams ، وبالتالي ضمان توفر جميع البيانات بغض النظر عن القسم الذي يتوافق مع المفتاح.

بعد ذلك ، سنرى إمكانية تطبيق Kafka Streams ، حيث يمكنك مراقبة التغييرات التي طرأت على الحالة دون استهلاك البيانات من موضوع Kafka.

5.3.5. حالة الطلب


لقد أجرينا بالفعل العديد من العمليات التي تنطوي على الحالة ونخرج دائمًا النتائج إلى وحدة التحكم (لأغراض التطوير) أو نكتبها إلى الموضوع (للتشغيل الصناعي). عند كتابة النتائج لموضوع ما ، عليك استخدام مستهلك كافكا لمشاهدته.

يمكن اعتبار قراءة البيانات من هذه المواضيع نوعًا من وجهات النظر الملموسة. لمهامنا ، يمكننا استخدام تعريف طريقة العرض الملموسة من ويكيبيديا: "... كائن قاعدة بيانات فعلية يحتوي على نتائج استعلام. على سبيل المثال ، يمكن أن تكون نسخة محلية من البيانات المحذوفة ، أو مجموعة فرعية من الصفوف و / أو أعمدة الجدول أو الانضمام إلى النتائج ، أو جدول محوري تم الحصول عليه باستخدام التجميع "(https://en.wikipedia.org/wiki/Materialized_view).

يتيح لك Kafka Streams أيضًا إجراء استعلامات تفاعلية في المتاجر الحكومية ، مما يسمح لك بقراءة هذه المشاهدات المادية مباشرةً. من المهم أن نلاحظ أن الطلب إلى متجر الحالة في طبيعة عملية للقراءة فقط. بفضل هذا ، لا يمكن أن تخاف من جعل الحالة بطريق الخطأ تطبيقًا غير متسق أثناء معالجة البيانات.

القدرة على الاستعلام مباشرة مخازن الدولة أمر مهم. يعني ذلك أنه يمكنك إنشاء تطبيقات - لوحات معلومات دون الحاجة إلى استلام البيانات أولاً من مستهلك كافكا. يزيد من كفاءة التطبيق ، نظرًا لحقيقة أنه غير مطلوب لتسجيل البيانات مرة أخرى:

  • بسبب موقع البيانات ، يمكن الوصول إليها بسرعة ؛
  • يتم استبعاد ازدواجية البيانات ، لأنها ليست مكتوبة على وحدة التخزين الخارجية.

الشيء الرئيسي الذي أود منك أن تتذكره: يمكنك تنفيذ طلبات الحالة مباشرةً من التطبيق. لا يمكنك المبالغة في تقدير الفرص التي يوفرها لك هذا. بدلاً من استهلاك البيانات من Kafka وتخزين السجلات في قاعدة البيانات للتطبيق ، يمكنك الاستعلام عن متاجر الحالة بالنتيجة نفسها. الطلبات المباشرة إلى المتاجر الحكومية تعني رمزًا أقل (لا يوجد مستهلك) وبرنامج أقل (لا حاجة لجدول قاعدة بيانات لتخزين النتائج).

لقد قمنا بتغطية قدر كبير من المعلومات في هذا الفصل ، لذلك سنوقف مؤقتًا مناقشتنا للاستعلامات التفاعلية إلى المتاجر الحكومية. ولكن لا تقلق: في الفصل 9 ، سننشئ تطبيقًا بسيطًا - لوحة معلومات تتضمن استعلامات تفاعلية. لإظهار الاستعلامات التفاعلية وإمكانيات إضافتها إلى تطبيقات Kafka Streams ، ستستخدم بعض الأمثلة من هذا الفصل والفصول السابقة.

ملخص


  • تمثل كائنات KStream تدفقات الأحداث المماثلة لإدراج قاعدة البيانات. تمثل كائنات KTable تدفقات التحديث ، فهي أكثر تشابهًا للتحديثات في قاعدة البيانات. لا يزداد حجم كائن KTable ؛ يتم استبدال السجلات القديمة بسجلات جديدة.
  • كائنات KTable مطلوبة لعمليات التجميع.
  • باستخدام عمليات النافذة ، يمكنك تقسيم البيانات المجمعة إلى سلال زمنية.
  • بفضل كائنات GlobalKTable ، يمكنك الوصول إلى البيانات المرجعية في أي مكان في التطبيق ، بغض النظر عن التقسيم.
  • الاتصالات بين الكائنات KStream و KTable و GlobalKTable ممكنة.

لقد ركزنا حتى الآن على إنشاء تطبيقات Kafka Streams باستخدام KStream DSL عالي المستوى. على الرغم من أن النهج رفيع المستوى يسمح لك بإنشاء برامج أنيقة وموجزة ، إلا أن استخدامه يمثل حلاً وسطًا واضحًا. العمل مع DSL KStream يعني زيادة دقة الكود عن طريق تقليل درجة التحكم. في الفصل التالي ، سننظر في واجهة برمجة التطبيقات ذات المستوى المنخفض لعقد المعالج ونحاول إجراء عمليات تبادل أخرى. ستصبح البرامج أطول مما كانت عليه حتى الآن ، ولكن ستكون لدينا الفرصة لإنشاء أي عقدة معالجة قد نحتاجها تقريبًا.

→ يمكن العثور على مزيد من التفاصيل حول الكتاب على موقع الناشر

→ للحصول على خصم Khabrozhiteley 25 ٪ على القسيمة - تيارات كافكا

→ عند دفع النسخة الورقية من الكتاب ، يتم إرسال كتاب إلكتروني عن طريق البريد الإلكتروني.

Source: https://habr.com/ru/post/ar457756/


All Articles