PubSub مجاني تقريبًا: ميزات NOTIFY في PostgreSQL

إذا كانت microservices الخاصة بك تستخدم بالفعل قاعدة بيانات PostgreSQL شائعة لتخزين البيانات ، أو إذا كانت هناك عدة مثيلات لنفس الخدمة تستخدمها على خوادم مختلفة ، فمن الأرخص نسبياً أن تحصل على فرصة لتبادل الرسائل (PubSub) بينها دون الاندماج في بنية Redis ، أو مجموعة RabbitMQ ، أو تضمينها في الكود تطبيقات نظام MQ آخر .

لهذا ، لن نكتب رسائل إلى جداول قاعدة البيانات ، لأن هذا يسبب الكثير من الحمل ، أولاً لكتابة ما يتم إرساله ، ومن ثم أيضًا مسح ما تمت قراءته بالفعل .

سنقوم بإرسال واستقبال البيانات باستخدام آلية NOTIFY / LISTEN ، وسنجمع تطبيقًا نموذجيًا لـ Node.js.



ولكن على هذا النحو ، هناك شعلة يجب التحايل عليها بعناية.

ميزات البروتوكول


الاستماع


LISTEN  
يقوم أحد التطبيقات التي تستخدم مكتبة libpq بتنفيذ الأمر LISTEN كأمر SQL عادي ، ثم يجب عليه استدعاء دالة PQnotifies بشكل دوري للتحقق من وجود إعلامات جديدة.
إذا كنت لا تكتب مكتبة للعمل مع PG ، ولكنك بالفعل تطبيقًا معينًا ، في معظم الحالات ، لن يكون بإمكانك الوصول إلى استدعاء هذه الوظيفة.

ولكن إذا كانت هذه المكتبة قد تمت كتابتها بالفعل وفقًا لتوصيات معالجة الطلبات والإشعارات غير المتزامنة ، فستتلقى رسالة في رمز التطبيق تلقائيًا. إذا لم يكن الأمر كذلك ، يمكنك ببساطة تنفيذ SELECT 1 بشكل دوري على الاتصال ، ثم سيأتي إشعار مع نتيجة الاستعلام:
في الإصدارات القديمة جدًا من libpq ، كان هناك طريقة واحدة فقط لضمان استلام الرسائل في الوقت المناسب من أمر NOTIFY - لإرسال الأوامر باستمرار ، حتى تلك الفارغة ، ثم التحقق من PQnotifies بعد كل مكالمة PQexec. على الرغم من أن هذه الطريقة لا تزال تعمل ، إلا أنها تعتبر قديمة بسبب الاستخدام غير الفعال للمعالج.
فيما يتعلق ، على سبيل المثال ، psql ، يبدو كما يلي:

 _tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991. 

إذا كان من الممكن بالنسبة للمهمة المطبقة أن نتفق على الحد الأقصى للتأخير في تسليم رسالة في غضون ثانية واحدة ، مع هذا الفاصل الزمني ، نقوم بتنفيذ الطلب. في الوقت نفسه ، تساعد هذه الطريقة في مراقبة "حيوية" الاتصال ، والتأكد من عدم قيام أي شخص بقطعها عن طريق الخطأ من جانب الخادم عبر pg_terminate_backend ، أو لم يكن هناك "تعطل" مفاجئ لـ PG دون أي إخطار للعملاء.

إعلام


 NOTIFY  [ ,  ] 

يرسل أمر NOTIFY حدث إعلام مع سطر "رسالة" إضافي إلى جميع تطبيقات العميل التي سبق أن نفذت قناة باسم القناة المحدد في قاعدة بيانات LISTEN الحالية.
...
يجب أن يكون سطر "الرسالة" الذي سيتم إرساله مع الإشعار ... ثابتًا في نص بسيط . في التكوين القياسي ، يجب أن يكون طوله أقل من 8000 بايت .
وهذا يعني أنه إذا كانت "رسالتنا" تحتوي فجأة على شيء مختلف تمامًا عن ASCII ، فسوف يتعين علينا فحصه ، وإذا تجاوز حجمه 8000 بايت (وليس أحرف!) ، ثم قصه إلى كتل ثم قم بلصقه . في الوقت نفسه ، ينبغي لنا أن نوفر موارد النطاق الترددي للقناة وموارد الخادم لمعالجة نقل هذه الكتل - أي إضافة "خدمة" بأقل قدر ممكن من الخدمة إلى محتوى مفيد قدر الإمكان ، ولكن ليس كذلك "خنق" تطبيق العميل ، مما يفرض عليه أن يحزم مع gzip -9 .

من المزايا الإضافية للآلية ، يمكن للمرء أيضًا ملاحظة الارتباط بـ "مصدر" الرسالة ...
... يمكن تجنب العمل الإضافي عن طريق التحقق مما إذا كان معرّف العملية الخاص بعملية الإشارة (المشار إليه في بيانات الحدث) يطابق معرّف العملية الخاص بالجلسة (يمكنك العثور عليه عن طريق الاتصال بـ libpq). في حالة تطابقها ، تلقت الجلسة إشعارًا بالإجراءات الخاصة بها ، بحيث يمكن تجاهله.
... وأمر التسليم مضمون:
بصرف النظر عن تصفية الحالات اللاحقة للإشعارات المكررة ، يضمن NOTIFY وصول الإخطارات من معاملة واحدة دائمًا بنفس الترتيب الذي تم إرسالها به. ويضمن أيضًا وصول الرسائل من معاملات مختلفة بالترتيب الذي يتم به تنفيذ هذه المعاملات .
لن نجمع بين أي شيء على وجه التحديد ، وبالتالي فإن كل طلب من طلباتنا سيتوافق فقط مع معاملة منفصلة.

لكن تذكر أنه إذا كان هناك أيضًا نشاط تطبيق على الاتصال المستخدم للتبادل ، فقد لا يكون NOTIFY داخل المعاملة الخاصة بنا ، لذلك قد تحدث آثار جانبية :
المعاملات لها تأثير كبير على NOTIFY. أولاً ، إذا تم تنفيذ NOTIFY داخل معاملة ، فسيتم تسليم الإخطارات إلى المستلمين بعد الالتزام بالمعاملة ، وفي هذه الحالة فقط. هذا معقول ، لأنه في حالة مقاطعة أي معاملة ، يتم إلغاء إجراء جميع الأوامر فيه ، بما في ذلك الإخطار .
لذلك ، من الأفضل استخدام اتصال حيث من الواضح أنه لا توجد معاملات أو استعلامات طويلة.

AccessExclusiveLock على الكائن 0 من الفئة 1262 من قاعدة البيانات 0


إذا بدأت NOTIFYs فجأة في تسجيل وتوقع توقع هذا القفل ، فلا يزال لديك "سروال قصير" ، وقد حان الوقت للتفكير في MQ "للبالغين".

بعد كل شيء ، قائمة انتظار الإعلام ، رغم أنها كبيرة جدًا (8 غيغابايت في الإنشاءات القياسية) ، لا تزال محدودة. وفقًا لإجابة توم لين :
يتم تعليق هذا القفل أثناء إدراج رسالة (رسائل) الإخطار بالمعاملة ، وبعدها تلتزم المعاملة وتصدر القفل.
أي أنه لا يوجد الكثير من الخيارات لحلها:

  • إرسال ولكن في كثير من الأحيان أقل
    وهذا هو ، لتجميع المؤشرات المرسلة ، إذا كانت هذه بعض العدادات ، على مدى فترة أطول.
  • ارسل اقل
    على سبيل المثال ، لإزالة "الافتراضي" من وجهة نظر قيم مفتاح التطبيق من JSON المنقولة.
  • إرسال إشارة فقط ، أي محتوى على الإطلاق
    كخيار - لبدء عدة قنوات ، يحمل اسم كل منها بالفعل بعض المعنى المطبق.
  • لا تزال تجعل شحنة من قاعدة البيانات

إرسال رسائل معقدة


ترميز الجسم


في الحالة العامة ، قد نرغب في نقل ليس فقط الأحرف المسموح بها في الرسالة ، ولكن أيضًا الأحرف الروسية و "أي ثنائيات" - لذلك ، سيكون من المناسب استخدام التحويل إلى التمثيل السداسي لتشكيل السلسلة المنقولة. ونعم ، هذه الطريقة تعمل بشكل جيد:

 NOTIFY test, E'\x20\x21' 

 Asynchronous notification "test" with payload " !" received from server process with PID 63991. 

ولكن دعنا ننتقل إلى الوثائق مرة أخرى:
يجب أن تتأكد من أن تسلسلات البايت التي تنشئها بهذه الطريقة ، خاصةً فيما يتعلق بتدوين ثماني وعشري ، تشكل أحرفًا صالحة مشفرة بالخادم . عندما يعمل الخادم بترميز UTF-8 ، بدلاً من تسجيل البايت هذا ، استخدم تسلسلات Unicode الخاصة أو بناء جملة Unicode البديل الموضح في القسم 4.1.2.3. (وإلا ، فسيتعين عليك ترميز أحرف UTF-8 يدويًا وكتابتها بالبايت ، وهو أمر غير مريح للغاية.)
لذلك ، حتى مع وجود رمز مألوف في علامة اقتباس مخلب من win1251 ، فإننا نأخذ الخبز في حزن:

 NOTIFY test, E'\x98' -- ERROR: invalid byte sequence for encoding "UTF8": 0x98 

نظرًا لأننا لا نريد " تشفير أحرف UTF-8 يدويًا وكتابتها بالبايت " ، فسوف نوافق على الفور على إرسال نص الرسالة المعبأ في base64 إذا كان هناك أي أحرف خارج النطاق \x20-\x7E أو \x20-\x7E ، إذا لزم الأمر. من ناحية ، لا تؤدي طريقة التغليف هذه إلى زيادة التكرار أكثر من اللازم (معامل 3: 4) ، من ناحية أخرى ، يتم تنفيذها على مستوى مكتبات النظام بأي لغة وتوفر الحد الأدنى من التحميل الإضافي.

ولكن حتى لو لم يكن لدينا أحرف "غريبة" ، وكانت الرسالة مناسبة في مقطع واحد ، فلا تزال هناك ميزة واحدة - الهروب من الفاصلة العليا :
لتضمين علامة اقتباس أحادية في سطر ، اكتب علامة اقتباس أحادية في جوارها ، على سبيل المثال: "Joan of Arc". لاحظ أن هذا ليس هو نفسه الاقتباس المزدوج (").

تحديد القطاع


المهمة التالية هي "قص" الرسالة بشكل صحيح إلى كتل مسموح بها لنقل 7999 بايت ، إذا كان حجمها يتجاوز هذه القيمة فجأة. وبإمكان المستلم جمعها دون كسر الترتيب أو الوقوع في سلسلة الأجزاء "الغريبة". لهذا ، كل منهم يحتاج إلى تحديد بطريقة أو بأخرى.

في الواقع ، نحن نعرف بالفعل "الإحداثيين" - وهذا هو PID لعملية الإرسال واسم القناة التي تأتي في كل إخطار. ويضمن البروتوكول نفسه الترتيب الذي تصل به القطاعات.

كتابة الجيران
لن نأخذ في الاعتبار الحالة التي يكون فيها العديد من الكتاب لنفس القناة نشطين في نفس الوقت الذي يتم فيه الاتصال بقاعدة البيانات (أي ، من الواضح أنه في نفس عملية التطبيق). من الناحية الفنية ، يمكن دعم ذلك بتمرير معرف إضافي في رأس المقطع - ولكن من الأفضل "مشاركة" كائن PubSub واحد داخل التطبيق الخاص بك.

حاوية الحد


لتجميع حاوية متكاملة من عدة قطاعات ، نحتاج إلى معرفة لحظة اكتمالها. هناك طريقتان نموذجيتان لهذا:

  • نقل الحجم المستهدف (بالبايت أو المقاطع) في الأول منها
  • إرسال علامة [لا] المقطع الأخير في كل منها

نظرًا لأننا نكتب PubSub بعد كل شيء ، فستكون معظم رسائلنا قصيرة ولن يكون من المجدي حجز الكثير من وحدات البايت لنقل الحجم. لذلك ، سوف نستخدم الطريقة الثانية ، بعد حجز الحرف الأول لبيانات القطعة كعلامة استمرار / نهاية للحاوية.

نقل الكائن


لنقل كل من سلاسل النص العادي وكائنات JSON كـ "رسالة" ، نضيف علامة رمز أخرى للتحول العكسي على جانب المستلم.

نظرًا لأننا قررنا ترميز "non-format" في base64 ، بالنسبة إلى العلامات ، يمكننا أخذ أي أحرف مسموح بها غير موجودة في هذه المجموعة.

إجمالي ، حصلنا على الخيارات التالية للقطاعات المرسلة:

 -- ""   !simple string -- "  "  @{"a":1} --    base64 #<segment> --    base64 $<segment> 

كما ترون ، يكفي تحليل عند تلقي شريحة فقط الحرف الأول لفهم ما عليك القيام به أكثر.

كتابة تطبيق PubSub


سيكون تطبيقنا على Node.js ، لذلك سوف نستخدم وحدة العقدة postgres للعمل مع PostgreSQL.

نكتب إطار البداية
للبدء ، دعنا ننشئ PubSub وريث EventEmitter ، حتى نتمكن من إنشاء أحداث لأولئك الذين اشتركوا في قنوات محددة:

 const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) { //     this.connection = connection; //        this.connection.on('notification', p._onmessage.bind(this)); //         this.skipSelf = skipSelf; //  "" setInterval(() => { this.connection.query('SELECT 1'); }, interval); //     ""  this.slices = {}; }; util.inherits(PubSub, EventEmitter); const p = PubSub.prototype; 

نحن نعمل مع القنوات
نظرًا لأن LISTEN / UNLISTEN لا تقسم بأي شكل من الأشكال عند إعادة الاشتراك في قناة أو إلغاء الاشتراك فيما لم نكن مشتركين فيه ، فلن نقوم بتعقيد أي شيء.

 //     - "",      //     -        const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`; p.subscribe = function(channel) { this.connection.query(`LISTEN ${quot(channel)}`); return this; }; p.unsubscribe = function(channel) { this.connection.query(`UNLISTEN ${quot(channel)}`); return this; }; 

إرسال واستقبال الرسائل
 const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD; //  ""  const reASCII = /^[\x20-\x7E]*$/; //  p.publish = function(channel, payload) { let query = `NOTIFY ${quot(channel)}`; if (payload !== null && payload !== undefined) { //    -    let str = typeof payload == 'string' ? PAYLOAD_FL_STR + payload : PAYLOAD_FL_OBJ + JSON.stringify(payload); if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) { //   base64- const b64 = Buffer.from(str).toString('base64'); for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) { let fin = pos + PAYLOAD_SZ_DATA; let seg = fin >= len ? PAYLOAD_FL_FIN + b64.slice(pos) : PAYLOAD_FL_SEQ + b64.slice(pos, fin); this.connection.query(`${query}, '${seg}'`); } } else { //        ? //     str = str.replace(/'/g, "''"); this.connection.query(`${query}, '${str}'`); } } else { //       this.connection.query(query); } return this; }; //    p._onmessage = function(msg) { const {processId, channel, payload} = msg; //  "" if (processId == this.connection.processID && this.skipSelf) { return; } // ""  const id = `${processId}:${channel}`; let rv; //   let fl = payload.charAt(0); if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) { // base64 const str = payload.slice(PAYLOAD_SZ_HEAD); const slices = this.slices; let b64; if (fl == PAYLOAD_FL_FIN) { //   if (slices[id]) { slices[id].push(str); b64 = slices[id].join(''); delete slices[id]; } else { b64 = str; } } else { //     if (slices[id]) { slices[id].push(str); } else { slices[id] = [str]; } } if (b64) { rv = Buffer.from(b64, 'base64').toString(); fl = rv.charAt(0); } } else { //  / rv = payload; } if (rv !== undefined) { //   '' let res = { processId , channel }; if (rv) { //       let data = rv.slice(1); res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data; } this.emit(channel, res); } }; 

بعض الاختبارات
 const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, '  '); psB.publish(chA, {a : 1}); psA.publish(chB, '   100  '.repeat(100)); }); 

كل شيء بسيط للغاية ، بحيث يمكنك تنفيذه بسهولة على أي رر آخر يستخدم في مشروعك ، مع الأخذ على سبيل المثال الأساس للعمل مع الإشعارات غير المتزامنة:

Source: https://habr.com/ru/post/ar484978/


All Articles