
Redis Stream - نوع بيانات مجردة جديد تم تقديمه في Redis مع إصدار الإصدار 5.0
من الناحية النظرية ، Redis Stream هي قائمة يمكنك من خلالها إضافة إدخالات. كل دخول له معرف فريد. افتراضيًا ، يتم إنشاء معرف تلقائيًا ويتضمن طابعًا زمنيًا. لذلك ، يمكنك طلب تسجيل النطاقات حسب الوقت أو تلقي بيانات جديدة فور وصولها إلى الدفق ، حيث يقرأ الأمر Unix tail -f ملف السجل ويتجمد تحسبا لبيانات جديدة. يرجى ملاحظة أن العديد من العملاء يمكنهم الاستماع إلى البث في نفس الوقت ، حيث يمكن للعديد من عمليات "tail -f" قراءة ملف في نفس الوقت دون تعارض مع بعضهم البعض.
لفهم جميع مزايا نوع البيانات الجديد ، دعنا نتذكر بإيجاز هياكل Redis القائمة منذ فترة طويلة والتي تكرر جزئيًا وظيفة Redis Stream.
رحلة تاريخية
Redis حانة / فرعية
Redis Pub / Sub هو نظام مراسلة بسيط مدمج بالفعل في التخزين ذي القيمة الأساسية. ومع ذلك ، للبساطة عليك أن تدفع:
- إذا فشل الناشر لأي سبب من الأسباب ، فإنه يفقد كل مشتركيه
- يحتاج الناشر إلى معرفة العنوان الدقيق لجميع المشتركين فيه.
- يمكن للناشر أن يفرط في تحميل المشتركين إذا تم نشر البيانات بشكل أسرع من معالجتها
- يتم حذف الرسالة من المخزن المؤقت للناشر مباشرة بعد النشر ، بغض النظر عن عدد المشتركين الذين تم تسليمهم ومدى سرعة تمكنهم من معالجة هذه الرسالة.
- سيتلقى جميع المشتركين الرسالة في نفس الوقت. يجب على المشتركين أنفسهم الاتفاق بطريقة أو بأخرى فيما بينهم على كيفية معالجة نفس الرسالة.
- لا توجد آلية مضمنة لتأكيد نجاح معالجة الرسالة من قبل المشترك. إذا تلقى المشترك رسالة وسقطت أثناء المعالجة ، فلن يعرف الناشر عنها.
قائمة Redis
Redis List هي بنية بيانات تدعم أوامر قراءة التأمين. يمكنك إضافة وقراءة الرسائل من بداية أو نهاية القائمة. على أساس هذا الهيكل ، يمكنك إنشاء مكدس أو قائمة انتظار جيدة لنظامك الموزع ، وسيكون هذا في معظم الحالات كافيًا. الاختلافات الرئيسية من Redis Pub / Sub:
- يتم تسليم الرسالة إلى عميل واحد. سيحصل العميل الأول المحظور بالقراءة على البيانات أولاً.
- يجب على Clint بدء عملية قراءة لكل رسالة. قائمة لا يعرف شيئا عن العملاء.
- يتم تخزين الرسائل حتى يحسبها شخص ما أو يحذفها صراحة. إذا قمت بإعداد خادم Redis لنقل البيانات إلى القرص ، فإن موثوقية النظام تزداد بشكل كبير.
مقدمة لتيار
إضافة سجل إلى دفق
يضيف أمر
XADD سجلًا جديدًا إلى الدفق. السجل ليس مجرد سلسلة ، فهو يتكون من واحد أو أكثر من أزواج قيمة المفتاح. وبالتالي ، فإن كل سجل منظم بالفعل ويشبه بنية ملف CSV.
> XADD mystream * sensor-id 1234 temperature 19.8 1518951480106-0
في المثال أعلاه ، نضيف حقلين إلى الدفق مع الاسم (المفتاح) "mystream": "معرف المستشعر" و "درجة الحرارة" مع القيم "1234" و "19.8" ، على التوالي. كالوسيطة الثانية ، يقبل الأمر المعرف الذي سيتم تعيينه للسجل - يحدد هذا المعرف بشكل فريد كل سجل في الدفق. ومع ذلك ، في هذه الحالة ، تجاوزنا * لأننا نريد من Redis إنشاء معرف جديد لنا. كل معرف جديد سيزيد. لذلك ، سيكون لكل سجل جديد معرف أكبر بالنسبة للسجلات السابقة.
شكل معرف
يتكون معرف السجل الذي تم إرجاعه بواسطة أمر
XADD من جزأين:
{millisecondsTime}-{sequenceNumber}
millisecondsTime - وقت يونكس بالمللي ثانية (وقت خادم Redis). ومع ذلك ، إذا كان الوقت الحالي هو نفسه أو أقل من وقت السجل السابق ، فسيتم استخدام الطابع الزمني للسجل السابق. لذلك ، إذا تم إرجاع وقت الخادم إلى الماضي ، فسيظل المعرف الجديد يحتفظ بخاصية الزيادة.
يتم استخدام
sequenceNumber للسجلات التي تم إنشاؤها في نفس المللي ثانية. ستتم زيادة رقم
التسلسل بمقدار 1 نسبة إلى السجل السابق. نظرًا لأن
sequenceNumber هو 64 بت في الحجم ، في الممارسة يجب ألا تعمل على الحد من عدد السجلات التي يمكن إنشاؤها في غضون مللي ثانية واحدة.
قد يبدو تنسيق هذه المعرفات للوهلة الأولى غريبًا. قد يتساءل القارئ المذهل عن سبب كون الوقت جزءًا من المعرف. السبب هو أن تدفقات Redis تدعم طلبات النطاق بواسطة المعرفات. بما أن المعرف مرتبط بالوقت الذي تم فيه إنشاء السجل ، فإن هذا يجعل من الممكن طلب نطاقات زمنية. سننظر إلى مثال ملموس عندما ننتقل إلى دراسة
أمر XRANGE .
إذا احتاج المستخدم لأي سبب من الأسباب إلى تحديد معرفه الخاص ، والذي يرتبط ، على سبيل المثال ، ببعض النظام الخارجي ، فيمكننا تمريره إلى
أمر XADD بدلاً من علامة * كما هو موضح أدناه:
> XADD somestream 0-1 field value 0-1 > XADD somestream 0-2 foo bar 0-2
يرجى ملاحظة أنه في هذه الحالة ، يجب عليك مراقبة الزيادة في المعرف. في مثالنا ، يكون الحد الأدنى للمعرف هو "0-1" ، لذلك لن يقبل الفريق معرفًا آخر يساوي أو "0-1".
> XADD somestream 0-1 foo bar (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
عدد السجلات في الدفق
يمكنك الحصول على عدد السجلات في دفق ببساطة عن طريق استخدام الأمر
XLEN . على سبيل المثال ، سيعيد هذا الأمر القيمة التالية:
> XLEN somestream (integer) 2
طلبات النطاق - XRANGE و XREVRANGE
لطلب بيانات عن نطاق ما ، نحتاج إلى تحديد معرفات اثنين - بداية ونهاية النطاق. يشمل النطاق المرتجع جميع العناصر ، بما في ذلك الحدود. هناك أيضًا معرّفان خاصان "-" و "+" ، على التوالي ، وهذا يعني أصغر (السجل الأول) وأكبر معرّف (السجل الأخير) في الدفق. المثال أدناه سيعرض جميع إدخالات الدفق.
> XRANGE mystream - + 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 2) 1) 1518951482479-0 2) 1) "sensor-id" 2) "9999" 3) "temperature" 4) "18.2"
كل سجل تم إرجاعه عبارة عن صفيف مكون من عنصرين: معرف وقائمة بأزواج قيمة المفتاح. قلنا بالفعل أن معرفات السجلات مرتبطة بالوقت. لذلك ، يمكننا أن نطلب مجموعة من فترة زمنية محددة. ومع ذلك ، لا يمكننا تحديد المعرف الكامل في الطلب ، ولكن فقط وقت يونكس ، مع حذف الجزء المتعلق بـ
sequenceNumber . يساوي الجزء المحذوف من المعرف تلقائيًا الصفر في بداية النطاق وإلى أقصى قيمة ممكنة في نهاية النطاق. التالي مثال عن كيفية طلب نطاق من ميلي ثانية.
> XRANGE mystream 1518951480106 1518951480107 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8"
لدينا سجل واحد فقط في هذا النطاق ، ولكن في مجموعات البيانات الحقيقية يمكن أن تكون النتيجة التي تم إرجاعها ضخمة. لهذا السبب ، يدعم
XRANGE خيار COUNT. من خلال تحديد الكمية ، يمكننا ببساطة الحصول على سجلات N الأولى. إذا احتجنا إلى الحصول على إدخالات N التالية (ترقيم الصفحات) ، فيمكننا استخدام المعرف الأخير المستلم ، وزيادة
تسلسله رقمًا واحدًا وطلب مرة أخرى. دعنا ننظر إلى هذا في المثال التالي. لقد بدأنا في إضافة 10 عناصر باستخدام
XADD (لنفترض أن تدفق mystream قد شغل بالفعل بـ 10 عناصر). لبدء التكرار ، والحصول على عنصرين لكل أمر ، نبدأ بالنطاق الكامل ، ولكن مع COUNT يساوي 2.
> XRANGE mystream - + COUNT 2 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2"
لمتابعة التكرار باستخدام العنصرين التاليين ، نحتاج إلى تحديد آخر معرف تم استلامه ، وهو 1519073279157-0 ، وإضافة 1 إلى
sequenceNumber .
يمكن الآن استخدام المعرف الناتج ، في هذه الحالة 1519073279157-1 ، كوسيطة جديدة لبداية النطاق لاستدعاء
XRANGE التالي:
> XRANGE mystream 1519073279157-1 + COUNT 2 1) 1) 1519073280281-0 2) 1) "foo" 2) "value_3" 2) 1) 1519073281432-0 2) 1) "foo" 2) "value_4"
و هكذا. نظرًا لأن تعقيد
XRANGE هو O (log (N)) للبحث ، ثم O (M) لإرجاع عناصر M ، فكل خطوة تكرارية سريعة. وبالتالي ، باستخدام
XRANGE ، من الممكن
تكرار التدفقات بكفاءة.
أمر
XREVRANGE هو المكافئ لـ
XRANGE ، لكنه يعرض العناصر بالترتيب العكسي:
> XREVRANGE mystream + - COUNT 1 1) 1) 1519073287312-0 2) 1) "foo" 2) "value_10"
لاحظ أن الأمر
XREVRANGE يأخذ وسيطات نطاق البدء
والإيقاف بترتيب عكسي.
قراءة سجلات جديدة مع XREAD
غالبًا ما تكون هناك مهمة للاشتراك في البث واستقبال الرسائل الجديدة فقط. قد يبدو هذا المفهوم وكأنه حانة Redis Pub / Sub أو حجب قائمة Redis ، ولكن هناك اختلافات جوهرية في كيفية استخدام Redis Stream:
- يتم تسليم كل رسالة جديدة لكل مشترك بشكل افتراضي. يختلف هذا السلوك عن حظر قائمة Redis ، حيث سيتم قراءة رسالة جديدة بواسطة مشترك واحد فقط.
- بينما في Redis Pub / Sub يتم نسيان جميع الرسائل ولا يتم حفظها أبدًا ، في Stream يتم تخزين جميع الرسائل إلى أجل غير مسمى (ما لم يتسبب العميل في الحذف بشكل صريح).
- يتيح لك Redis Stream إمكانية التمييز بين الوصول إلى الرسائل ضمن دفق واحد. يمكن لمشترك معين رؤية سجل رسالته الشخصية فقط.
يمكنك الاشتراك في الدفق وتلقي رسائل جديدة باستخدام أمر
XREAD . هذا أكثر تعقيدًا من
XRANGE ، لذلك سنبدأ بأمثلة أبسط أولاً.
> XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2"
في المثال أعلاه ، يتم
تحديد نموذج
XREAD غير
محظور . يرجى ملاحظة أن خيار COUNT اختياري. في الواقع ، فإن خيار الأمر الوحيد المطلوب هو خيار STREAMS ، الذي يقوم بتعيين قائمة التدفقات إلى جانب المعرف الأقصى المقابل. لقد كتبنا "STREAMS mystream 0" - نريد الحصول على جميع سجلات تيار mystream مع معرف أكبر من "0-0". كما ترى من المثال ، يُرجع الأمر اسم الدفق ، لأنه يمكننا الاشتراك في عدة تدفقات في نفس الوقت. يمكننا أن نكتب ، على سبيل المثال ، "STREAMS mystream otherstream 0 0". يرجى ملاحظة أنه بعد خيار STREAMS ، نحتاج أولاً إلى تقديم أسماء جميع التدفقات اللازمة وبعد ذلك فقط قائمة من المعرفات.
في هذا النموذج البسيط ، لا يقوم الأمر بأي شيء خاص مقارنةً بـ
XRANGE . ومع ذلك ، فإن الشيء المثير للاهتمام هو أنه يمكننا بسهولة تحويل
XREAD إلى أمر حظر عن طريق تحديد وسيطة BLOCK:
> XREAD BLOCK 0 STREAMS mystream $
في المثال أعلاه ، يتم تحديد خيار BLOCK جديد مع مهلة 0 مللي ثانية (وهذا يعني الانتظار بلا نهاية). علاوة على ذلك ، بدلاً من تمرير المعرف المعتاد لتيار mystream ، تم تمرير المعرف الخاص $. هذا المعرف الخاص يعني أن
XREAD يجب أن يستخدم المعرف الأقصى في قطار mystream كمعرف. لذلك سوف نتلقى رسائل جديدة فقط ، بدءًا من لحظة بدء الاستماع. يشبه هذا الأمر Unix tail -f.
يرجى ملاحظة أنه عند استخدام خيار BLOCK ، لا نحتاج إلى استخدام المعرف الخاص $. يمكننا استخدام أي معرف موجود في التدفق. إذا تمكن الفريق من تقديم طلبنا على الفور ، دون حظر ، فسوف يقوم بذلك ، وإلا فسيتم حظره.
يمكن
لحظر XREAD أيضًا الاستماع إلى عدة تدفقات مرة واحدة ، ما عليك سوى تحديد أسمائها. في هذه الحالة ، سيعود الأمر بسجل للدفق الأول الذي وصلت إليه البيانات. سيحصل المشترك الأول المحظور على هذا التدفق على البيانات أولاً.
مجموعات المستهلكين
في بعض المهام ، نريد التمييز بين وصول المشتركين إلى الرسائل داخل نفس سلسلة الرسائل. مثال حيث يمكن أن يكون ذلك مفيدًا ، قائمة انتظار الرسائل مع العمال الذين سيتلقون رسائل مختلفة من الدفق ، مما يسمح لك بتوسيع نطاق معالجة الرسائل.
إذا تخيلنا أن لدينا ثلاثة مشتركين C1 و C2 و C3 ودفق يحتوي على رسائل 1 و 2 و 3 و 4 و 5 و 6 و 7 ، فستحدث خدمة الرسائل كما في الرسم البياني أدناه:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
للحصول على هذا التأثير ، يستخدم Redis Stream مفهومًا يسمى مجموعة المستهلكين. يشبه هذا المفهوم مشتركًا زائفًا يتلقى بيانات من دفق ، ولكن يخدمه بالفعل العديد من المشتركين داخل المجموعة ، مما يوفر ضمانات معينة:
- يتم تسليم كل رسالة إلى مشتركين مختلفين داخل المجموعة.
- داخل المجموعة ، يتم تحديد المشتركين بالاسم ، وهي سلسلة حساسة لحالة الأحرف. إذا انسحب بعض المشتركين مؤقتًا من المجموعة ، فيمكن إعادته إلى المجموعة باسمه الفريد.
- تتبع كل مجموعة مستهلكين مفهوم "أول رسالة غير مقروءة". عندما يطلب أحد المشتركين رسائل جديدة ، يمكنه فقط استقبال الرسائل التي لم يتم تسليمها إلى أي مشترك داخل المجموعة.
- يوجد أمر يؤكد صراحة نجاح معالجة الرسالة من قبل المشترك. حتى يتم استدعاء هذا الأمر ، ستبقى الرسالة المطلوبة في حالة "معلقة".
- ضمن مجموعة المستهلكين ، يمكن لكل مشترك طلب سجل من الرسائل التي تم تسليمها إليه ، ولكن لم تتم معالجتها بعد (في حالة "المعلقة")
بمعنى ما ، يمكن تمثيل حالة المجموعة على النحو التالي:
+----------------------------------------+ | consumer_group_name: mygroup | consumer_group_stream: somekey | last_delivered_id: 1292309234234-92 | | consumers: | "consumer-1" with pending messages | 1292309234234-4 | 1292309234232-8 | "consumer-42" with pending messages | ... (and so forth) +----------------------------------------+
الآن حان الوقت للتعرف على الفرق الرئيسية لمجموعة المستهلك ، وهي:
- يستخدم XGROUP لإنشاء وتدمير وإدارة المجموعات.
- يستخدم XREADGROUP لقراءة دفق من خلال مجموعة.
- XACK - يسمح هذا الأمر للمشترك بوضع علامة على الرسالة بنجاح
إنشاء مجموعة المستهلك
لنفترض وجود تدفق غامض بالفعل. بعد ذلك ، سيبدو أمر إنشاء المجموعة:
> XGROUP CREATE mystream mygroup $
OK
عند إنشاء مجموعة ، يجب أن نمرر معرفًا تبدأ به المجموعة ستتلقى الرسائل. إذا كنا نريد فقط تلقي جميع الرسائل الجديدة ، فيمكننا استخدام المعرف الخاص $ (كما في المثال أعلاه). إذا حددت 0 بدلاً من معرف خاص ، فستكون جميع رسائل البث متاحة للمجموعة.
الآن بعد إنشاء المجموعة ، يمكننا على الفور البدء في قراءة الرسائل باستخدام
أمر XREADGROUP . يشبه هذا الأمر إلى حد كبير
XREAD ويدعم خيار BLOCK الاختياري. ومع ذلك ، هناك خيار GROUP إلزامي ، والذي يجب تحديده دائمًا بوسيطتين: اسم المجموعة واسم المشترك. يتم دعم خيار COUNT أيضًا.
قبل قراءة البث ، دعنا نضع بعض الرسائل هناك:
> XADD mystream * message apple 1526569495631-0 > XADD mystream * message orange 1526569498055-0 > XADD mystream * message strawberry 1526569506935-0 > XADD mystream * message apricot 1526569535168-0 > XADD mystream * message banana 1526569544280-0
الآن دعونا نحاول قراءة هذا الدفق من خلال المجموعة:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
يقرأ الأمر أعلاه حرفيًا كما يلي:
"أنا ، مشترك أليس ، عضو في mygroup ، أرغب في قراءة رسالة واحدة من الغموض الذي لم يتم تسليمه إلى أي شخص من قبل."
في كل مرة يقوم فيها أحد المشتركين بإجراء عملية مع مجموعة ، يجب عليه الإشارة إلى اسمه ، وتحديد هويته داخل المجموعة بشكل فريد. هناك تفصيل مهم آخر في الأمر أعلاه - المعرف الخاص ">". يقوم هذا المعرف الخاص بتصفية الرسائل ، مما يترك فقط تلك التي لم يتم تسليمها حتى الآن.
أيضًا ، في حالات خاصة ، يمكنك تحديد معرف حقيقي ، مثل 0 أو أي معرف صالح آخر. في هذه الحالة ،
سيعود الأمر
XREADGROUP إليك بتاريخ الرسائل ذات الحالة "المعلقة" ، والتي تم تسليمها إلى المشترك المحدد (Alice) ، لكن لم يتم تأكيدها بعد باستخدام
الأمر XACK .
يمكننا التحقق من هذا السلوك عن طريق تحديد المعرف 0 على الفور ، بدون خيار
COUNT . نرى فقط الرسالة المعلقة ، أي الرسالة التي تحتوي على التفاحة:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
ومع ذلك ، إذا أكدنا أن الرسالة قد تمت معالجتها بنجاح ، فلن يتم عرضها بعد الآن:
> XACK mystream mygroup 1526569495631-0 (integer) 1 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) (empty list or set)
الآن حان دور بوب لقراءة شيء ما:
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 2) 1) 1526569506935-0 2) 1) "message" 2) "strawberry"
طلب بوب ، وهو عضو في mygroup ، أكثر من رسالتين. يقوم الأمر بالإبلاغ عن الرسائل غير المسلمة فقط بسبب المعرف الخاص ">". كما ترى ، لا يتم عرض رسالة "apple" ، حيث تم تسليمها بالفعل إلى Alice ، لذلك يتلقى Bob "برتقال" و "فراولة".
وبالتالي ، يمكن لـ Alice و Bob وأي مشترك في مجموعة أخرى قراءة رسائل مختلفة من نفس الدفق. يمكنهم أيضًا قراءة محفوظات الرسائل الأولية أو وضع علامة على الرسائل كما تمت معالجتها.
هناك بعض الأشياء التي يجب وضعها في الاعتبار:
- بمجرد أن يرى المشترك أن الرسالة هي أمر XREADGROUP ، تنتقل هذه الرسالة إلى الحالة "معلقة" ويتم تعيينها لهذا المشترك المحدد. لن يتمكن مشتركو المجموعة الآخرون من قراءة هذه الرسالة.
- يتم إنشاء المشتركين تلقائيا في أول ذكر ، ليست هناك حاجة لإنشاء الخاصة بهم صريحة.
- مع XREADGROUP ، يمكنك قراءة الرسائل من عدة تدفقات مختلفة في نفس الوقت ، ولكن لكي يعمل هذا ، تحتاج أولاً إلى إنشاء مجموعات بنفس الاسم لكل قطار باستخدام XGROUP
تحطم الانتعاش
يمكن للمشترك استرداد من الفشل وإعادة قراءة قائمة الرسائل الخاصة به مع حالة "معلقة". ومع ذلك ، في العالم الحقيقي ، يمكن للمشتركين في نهاية المطاف أن تفشل. ماذا يحدث لرسالة المشترك المتدلي إذا لم يستطع التعافي بعد الفشل؟
يوفر Consumer Group ميزة يتم استخدامها خصيصًا لمثل هذه الحالات - عندما تحتاج إلى تغيير مالك الرسائل.
بادئ ذي
بدء ، تحتاج إلى استدعاء الأمر
XPENDING ، والذي يعرض جميع رسائل المجموعة بحالة "معلقة". في أبسط أشكاله ، يتم استدعاء الأمر باستخدام وسيطين فقط: اسم الدفق واسم المجموعة:
> XPENDING mystream mygroup 1) (integer) 2 2) 1526569498055-0 3) 1526569506935-0 4) 1) 1) "Bob" 2) "2"
طبع الفريق عدد الرسائل غير المجهزة للمجموعة بأكملها ولكل مشترك. ليس لدينا سوى بوب
برسالتين غير معالجتين ، لأن الرسالة الوحيدة التي طلبتها أليس تم تأكيدها مع
XACK .
يمكننا طلب معلومات إضافية باستخدام المزيد من الوسائط:
XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - نطاق المعرفات (يمكنك استخدام "-" و "+")
{count} - عدد محاولات التسليم
{اسم المستهلك} - اسم المجموعة
> XPENDING mystream mygroup - + 10 1) 1) 1526569498055-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 2) 1) 1526569506935-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1
الآن لدينا تفاصيل كل رسالة: المعرف ، واسم المشترك ، والتوقف عن العمل بالميللي ثانية ، وأخيراً ، عدد محاولات التسليم. لدينا رسالتان من بوب ، وهما في وضع الخمول لـ 74170458 ميلي ثانية ، حوالي 20 ساعة.
يرجى ملاحظة أن لا أحد يمنعنا من التحقق من محتوى الرسالة فقط باستخدام
XRANGE .
> XRANGE mystream 1526569498055-0 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange"
علينا فقط تكرار المعرف نفسه مرتين في الوسيطات. الآن وقد أصبح لدينا بعض الأفكار ، يمكن أن تقرر أليس أن بوب لن يتعافى بعد 20 ساعة من عدم النشاط ، وقد حان الوقت لطلب هذه الرسائل واستئناف معالجتها بدلاً من بوب. للقيام بذلك ، نستخدم الأمر
XCLAIM :
XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}
باستخدام هذا الأمر ، يمكننا الحصول على رسالة "أجنبية" لم تتم معالجتها بعد عن طريق تغيير المالك إلى {مستهلك}. ومع ذلك ، يمكننا أيضًا توفير حد أدنى للتوقف عن العمل {min-idle-time}. يساعد هذا في تجنب موقف حيث يحاول عميلان تغيير مالك نفس الرسائل في وقت واحد:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
سيقوم العميل الأول بإعادة ضبط وقت التوقف عن العمل وزيادة عدد مرات التسليم. لذلك لن يتمكن العميل الثاني من طلب ذلك.
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange"
تمت المطالبة بالرسالة بنجاح بواسطة Alice ، التي يمكنها الآن معالجة الرسالة والاعتراف بها.
من المثال أعلاه ، من الواضح أن التنفيذ الناجح للطلب يُرجع محتويات الرسالة نفسها. ومع ذلك ، هذا ليس ضروريا. يمكن استخدام خيار JUSTID لإرجاع معرفات الرسائل فقط. يعد هذا مفيدًا إذا لم تكن مهتمًا بتفاصيل الرسالة وترغب في زيادة أداء النظام.
عداد التسليم
العداد الذي
تلاحظه في إخراج
XPENDING هو عدد عمليات تسليم كل رسالة. يتم زيادة عداد مثل بطريقتين: عندما يتم طلب الرسالة بنجاح من خلال
XCLAIM أو عند
استخدام استدعاء
XREADGROUP .
من الطبيعي أن يتم تسليم بعض الرسائل عدة مرات. الشيء الرئيسي هو أنه نتيجة لذلك ، تتم معالجة جميع الرسائل. في بعض الأحيان ، عند معالجة رسالة ، توجد مشاكل بسبب تلف الرسالة نفسها أو تسبب معالجة الرسالة خطأ في رمز المعالج.
في هذه الحالة ، قد يتضح أنه لن يتمكن أي شخص من معالجة هذه الرسالة. نظرًا لأن لدينا عداد محاولات التسليم ، يمكننا استخدام هذا العداد للكشف عن مثل هذه الحالات. لذلك ، بمجرد وصول عداد التسليم إلى رقم كبير تحدده أنت ، من المحتمل أن يكون من المعقول وضع هذه الرسالة في قطار آخر وإرسال إشعار إلى مسؤول النظام.حالة الموضوع
يتم استخدام الأمر XINFO لطلب معلومات متنوعة حول دفق ومجموعاته. على سبيل المثال ، يكون الشكل الأساسي للأمر كما يلي: > XINFO STREAM mystream 1) length 2) (integer) 13 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 9) first-entry 10) 1) 1524494395530-0 2) 1) "a" 2) "1" 3) "b" 4) "2" 11) last-entry 12) 1) 1526569544280-0 2) 1) "message" 2) "banana"
يعرض الأمر أعلاه معلومات عامة على الدفق المحدد. الآن مثال أكثر تعقيدًا: > XINFO GROUPS mystream 1) 1) name 2) "mygroup" 3) consumers 4) (integer) 2 5) pending 6) (integer) 2 2) 1) name 2) "some-other-group" 3) consumers 4) (integer) 1 5) pending 6) (integer) 0
يعرض الأمر أعلاه معلومات عامة لجميع مجموعات الدفق المحدد > XINFO CONSUMERS mystream mygroup 1) 1) name 2) "Alice" 3) pending 4) (integer) 1 5) idle 6) (integer) 9104628 2) 1) name 2) "Bob" 3) pending 4) (integer) 1 5) idle 6) (integer) 83841983
يعرض الأمر أعلاه معلومات عن جميع المشتركين في مجموعة البث والمجموعة المحددة.إذا نسيت بناء جملة الأمر ، فما عليك سوى الاتصال بالأمر للحصول على المساعدة: > XINFO HELP 1) XINFO {subcommand} arg arg ... arg. Subcommands are: 2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}. 3) GROUPS {key} -- Show the stream consumer groups. 4) STREAM {key} -- Show information about the stream. 5) HELP -- Print this help.
تيار حجم الحد
لا تريد العديد من التطبيقات جمع البيانات في الدفق إلى الأبد. من المفيد غالبًا الحصول على الحد الأقصى لعدد الرسائل في الدفق. في حالات أخرى ، من المفيد نقل جميع الرسائل من الدفق إلى وحدة تخزين ثابتة أخرى عند الوصول إلى حجم الدفق المحدد. يمكنك تحديد حجم الدفق باستخدام المعلمة MAXLEN في الأمر XADD : > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3"
عند استخدام MAXLEN ، يتم حذف السجلات القديمة تلقائيًا عند الوصول إلى الطول المحدد ، لذلك يكون للتيار حجم ثابت. ومع ذلك ، لا يحدث التشذيب في هذه الحالة بالطريقة الأكثر إنتاجية في ذاكرة Redis. يمكن تحسين الموقف على النحو التالي: تعني الوسيطة ~ في المثال أعلاه أننا لسنا بحاجة إلى قصر طول الدفق على قيمة محددة. في مثالنا ، يمكن أن يكون هذا العدد أكبر من أو يساوي 1000 (على سبيل المثال ، 1000 أو 1010 أو 1030). لقد أشرنا للتو صراحة إلى أننا نريد أن يخزن التدفق 1000 سجل على الأقل. هذا يجعل العمل مع الذاكرة أكثر فعالية داخل Redis. يوجد أيضًا أمر XTRIM منفصل يقوم بنفس الشيء:XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
> XTRIM mystream MAXLEN 10
> XTRIM mystream MAXLEN ~ 10
التخزين المستمر والنسخ المتماثل
يتم نسخ Redis Stream بشكل غير متزامن إلى العقد الرقيق وحفظه في ملفات مثل AOF (لقطة لجميع البيانات) و RDB (سجل جميع عمليات الكتابة). يتم أيضًا دعم النسخ المتماثل لحالة مجموعات المستهلكين. لذلك ، إذا كانت الرسالة في حالة "معلقة" على العقدة الرئيسية ، فستكون هذه الرسالة في حالة العقد الرقيق بنفس الحالة.إزالة العناصر الفردية من دفق
لحذف الرسائل ، يوجد أمر XDEL خاص . يحصل الأمر على اسم الدفق ، متبوعًا بمعرفات الرسالة التي يجب حذفها: > XRANGE mystream - + COUNT 2 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" > XDEL mystream 1526654999635-0 (integer) 1 > XRANGE mystream - + COUNT 2 1) 1) 1526655000369-0 2) 1) "value" 2) "3"
عند استخدام هذا الأمر ، يتعين عليك مراعاة أنه في الواقع لن يتم تحرير الذاكرة على الفور.تيارات ذات طول صفري
الفرق بين التدفقات وهياكل بيانات Redis الأخرى هو أنه عندما لم تعد هياكل البيانات الأخرى تحتوي على عناصر بداخلها ، كتأثير جانبي ، سيتم حذف بنية البيانات نفسها من الذاكرة. لذلك ، على سبيل المثال ، سيتم حذف المجموعة المصنفة بالكامل عندما تزيل استدعاء ZREM العنصر الأخير. بدلاً من ذلك ، يُسمح للبقاء في مؤشرات الترابط دون حتى وجود عنصر واحد بالداخل.استنتاج
يعتبر Redis Stream مثاليًا لإنشاء وسطاء الرسائل وقوائم الرسائل والسجلات الموحدة وأنظمة الدردشة التي تخزن السجل.كما قال Nicklaus Wirth ذات مرة ، فإن البرامج عبارة عن خوارزميات بالإضافة إلى هياكل البيانات ، و Redis يمنحك بالفعل كليهما.