PubSub ist fast kostenlos: NOTIFY-Funktionen in PostgreSQL

Wenn Ihre Microservices bereits eine gemeinsame PostgreSQL-Datenbank zum Speichern von Daten verwenden oder wenn mehrere Instanzen desselben Dienstes diese auf verschiedenen Servern verwenden, können Sie " PubSub " -Nachrichten relativ kostengünstig abrufen, ohne sie in die Redis-Architektur, den RabbitMQ-Cluster oder in Code einzubetten Anwendungen eines anderen MQ-Systems .

Dazu schreiben wir keine Nachrichten in die Datenbanktabellen , da dies zu viel Overhead verursacht, zuerst beim Schreiben der übertragenen und dann auch beim Löschen der bereits gelesenen .

Wir senden und empfangen Daten mithilfe des NOTIFY / LISTEN- Mechanismus und sammeln eine Modellimplementierung für Node.js.



Aber auf diesem Weg gibt es einen Rechen, der sorgfältig umgangen werden muss.

Protokollfunktionen


Hör zu


LISTEN  
Eine Anwendung, die die libpq-Bibliothek verwendet, führt den Befehl LISTEN als regulären SQL-Befehl aus und muss dann regelmäßig die Funktion PQnotifications aufrufen, um nach neuen Benachrichtigungen zu suchen.
Wenn Sie keine Bibliothek für die Arbeit mit PG schreiben, sondern bereits eine bestimmte Anwendung, haben Sie in den meisten Fällen keinen Zugriff auf den Aufruf dieser Funktion.

Wenn eine solche Bibliothek jedoch bereits für Sie gemäß den Empfehlungen für die Verarbeitung asynchroner Anforderungen und Benachrichtigungen geschrieben wurde , erhalten Sie automatisch eine Nachricht im Anwendungscode. Wenn nicht, können Sie SELECT 1 einfach in regelmäßigen Abständen für die Verbindung ausführen. Dann wird eine Benachrichtigung zusammen mit dem Abfrageergebnis gesendet:
In sehr alten Versionen von libpq gab es nur eine Möglichkeit, den rechtzeitigen Empfang von Nachrichten aus dem NOTIFY-Befehl sicherzustellen: Sie mussten ständig Befehle senden, auch leere, und nach jedem PQexec-Aufruf PQnotifications überprüfen. Obwohl diese Methode immer noch funktioniert, wird sie aufgrund ineffizienter Prozessorauslastung als veraltet angesehen.
In Bezug auf zum Beispiel psql sieht es so aus:

 _tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991. 

Wenn wir für eine angewendete Aufgabe eine maximale Verzögerung für die Zustellung einer Nachricht innerhalb von 1 Sekunde vereinbaren können, führen wir mit einem solchen Intervall die Anforderung aus. Gleichzeitig hilft diese Methode dabei, die "Liveness" der Verbindung zu überwachen , indem sichergestellt wird, dass niemand sie versehentlich von der Serverseite über pg_terminate_backend oder dass es nicht zu einem plötzlichen "Absturz" des pg_terminate_backend , ohne dass dies den Clients pg_terminate_backend wird.

NOTIFY


 NOTIFY  [ ,  ] 

Der NOTIFY-Befehl sendet ein Benachrichtigungsereignis zusammen mit einer zusätzlichen Nachrichtenzeile an alle Clientanwendungen, die zuvor einen Kanal mit dem angegebenen Kanalnamen in der aktuellen LISTEN-Datenbank ausgeführt haben.
...
Die „Nachrichten“ -Zeile, die zusammen mit der Benachrichtigung gesendet wird, muss eine einfache Textkonstante sein . In einer Standardkonfiguration sollte die Länge weniger als 8000 Byte betragen .
Das heißt, wenn unsere „Nachricht“ plötzlich etwas ganz anderes als ASCII enthält, müssen wir es überprüfen und wenn es 8.000 Bytes überschreitet (keine Zeichen!), Schneiden Sie es in Blöcke und kleben Sie es dann . Gleichzeitig sollten wir sowohl Kanalbandbreite als auch Serverressourcen für die Verarbeitung der Übertragung solcher Blöcke einsparen - das heißt, den nützlichen Inhalten so wenig Dienst wie möglich "binden", aber die Client-Anwendung nicht "erwürgen" und zum Packen zwingen gzip -9 .

Von den zusätzlichen Vorteilen des Mechanismus kann man auch die Bindung an die "Quelle" der Nachricht bemerken ...
... zusätzliche Arbeit kann vermieden werden, indem überprüft wird, ob die PID des Signalisierungsprozesses (in den Ereignisdaten angegeben) mit der PID der Sitzung übereinstimmt (Sie finden sie, indem Sie sich an libpq wenden). Wenn sie übereinstimmen, hat die Sitzung eine Benachrichtigung über ihre eigenen Aktionen erhalten, damit sie ignoriert werden kann.
... und garantierter Lieferschein:
Neben der Filterung nachfolgender Instanzen doppelter Benachrichtigungen stellt NOTIFY sicher, dass Benachrichtigungen von einer einzelnen Transaktion immer in der Reihenfolge eingehen, in der sie gesendet wurden. Es wird auch garantiert, dass Nachrichten von verschiedenen Transaktionen in der Reihenfolge eintreffen, in der diese Transaktionen festgeschrieben sind .
Wir werden nichts spezifisch kombinieren, so dass jede unserer Anfragen nur einer separaten Transaktion entspricht.

Beachten Sie jedoch, dass, wenn auf der für den Austausch verwendeten Verbindung auch Anwendungsaktivität vorhanden ist, unser NOTIFY möglicherweise nicht freiwillig in der Transaktion enthalten ist, sodass Nebenwirkungen auftreten können :
Transaktionen haben erhebliche Auswirkungen auf NOTIFY. Erstens, wenn NOTIFY innerhalb einer Transaktion ausgeführt wird, werden Benachrichtigungen an die Empfänger zugestellt, nachdem die Transaktion festgeschrieben wurde, und zwar nur in diesem Fall. Dies ist sinnvoll, da im Falle einer Unterbrechung einer Transaktion die Aktion aller darin enthaltenen Befehle, einschließlich NOTIFY, abgebrochen wird .
Daher ist es besser, eine Verbindung zu verwenden, bei der offensichtlich keine Transaktionen oder langen Abfragen vorliegen.

AccessExclusiveLock für Objekt 0 der Klasse 1262 der Datenbank 0


Wenn Ihr NOTIFY plötzlich anfängt, die Erwartung eines solchen Schlosses zu stumpfen und zu protokollieren, sind Sie immer noch "aus kurzen Hosen herausgewachsen" und es ist Zeit, über das "erwachsene" MQ nachzudenken.

Immerhin ist die Benachrichtigungswarteschlange, obwohl sie ziemlich groß ist (8 GB in Standard-Builds), immer noch endlich. Nach der Antwort von Tom Lane :
Diese Sperre wird gehalten, während die Benachrichtigungsnachricht (en) der Transaktion eingefügt werden. Danach wird die Sperre von der Transaktion festgeschrieben und freigegeben.
Das heißt, es gibt nicht zu viele Möglichkeiten, um dies zu umgehen:

  • senden aber seltener
    Dies bedeutet, dass die gesendeten Indikatoren, sofern es sich um einige Zähler handelt, über einen längeren Zeitraum zusammengefasst werden.
  • sende weniger
    Zum Beispiel, um "default" aus der Sicht der Anwendungsschlüsselwerte aus dem übertragenen JSON zu entfernen.
  • Sende nur ein Signal , überhaupt keinen Inhalt
    Optional - um mehrere Kanäle zu starten, hat der Name jedes Kanals bereits einen gewissen Sinn.
  • noch eine Sendung aus der Datenbank machen

Komplexe Nachrichten senden


Body-Codierung


Im allgemeinen Fall möchten wir in der Nachricht möglicherweise nicht nur zulässige Zeichen, sondern auch russische Buchstaben und „beliebige Binärdateien“ übertragen. Daher ist es zweckmäßig, die übertragene Zeichenfolge in hexadezimale Darstellung umzuwandeln. Und ja, diese Methode funktioniert ganz gut:

 NOTIFY test, E'\x20\x21' 

 Asynchronous notification "test" with payload " !" received from server process with PID 63991. 

Aber wenden wir uns noch einmal der Dokumentation zu:
Sie müssen sicherstellen, dass die auf diese Weise erstellten Bytefolgen, insbesondere in oktaler und hexadezimaler Schreibweise, gültige servercodierte Zeichen bilden . Wenn der Server mit UTF-8-Codierung arbeitet, verwenden Sie anstelle einer solchen Byte-Aufzeichnung spezielle Unicode-Sequenzen oder die in Abschnitt 4.1.2.3 beschriebene alternative Unicode-Syntax. (Andernfalls müssen Sie UTF-8-Zeichen manuell codieren und byteweise schreiben, was sehr unpraktisch ist.)
Deshalb nehmen wir auch mit dem alltäglichen Symbol des Anführungszeichens-Pfote von win1251 Brot in Trauer:

 NOTIFY test, E'\x98' -- ERROR: invalid byte sequence for encoding "UTF8": 0x98 

Da wir UTF-8-Zeichen nicht "manuell codieren und \x20-\x7E schreiben " möchten, stimmen wir sofort zu, den in base64 gepackten Nachrichtentext zu senden, wenn er Zeichen außerhalb des Bereichs \x20-\x7E oder erforderlichenfalls Segmentierung enthält. Einerseits erhöht diese Packmethode die Redundanz nicht zu sehr (4: 3-Koeffizient), andererseits wird sie auf der Ebene der Systembibliotheken in einer beliebigen Sprache implementiert und bietet eine minimale zusätzliche Last.

Aber auch wenn wir keine "seltsamen" Zeichen haben und die Nachricht in ein Segment passt, gibt es immer noch eine Funktion - dem Apostroph entgehen :
Um einen Apostroph in eine Zeile einzufügen, schreiben Sie zwei Apostrophe daneben , zum Beispiel: 'Jeanne d'Arc'. Beachten Sie, dass dies nicht mit dem doppelten Anführungszeichen (") identisch ist.

Segmentidentifikation


Die nächste Aufgabe besteht darin, die Nachricht korrekt in Blöcke zu "schneiden", die für die Übertragung von 7999 Bytes zugelassen sind , wenn ihre Größe diesen Wert plötzlich überschreitet. Und damit der Empfänger es abholen kann, ohne die Reihenfolge zu brechen oder in die Kette der „fremden“ Segmente zu fallen. Dafür muss jeder von ihnen irgendwie identifiziert werden.

Tatsächlich kennen wir bereits die beiden „Koordinaten“ - dies ist die PID des Sendevorgangs und der Name des Kanals , der in jeder Benachrichtigung enthalten ist. Die Reihenfolge des Eintreffens der Segmente wird uns durch das Protokoll selbst garantiert.

Nachbarn schreiben
Wir werden den Fall nicht berücksichtigen, wenn mehrere Autoren desselben Kanals gleichzeitig mit der Verbindung zur Datenbank aktiv sind (dh offensichtlich innerhalb desselben Anwendungsprozesses). Technisch kann dies durch die Übergabe eines zusätzlichen Bezeichners im Segmentheader unterstützt werden. Es ist jedoch besser, ein einzelnes PubSub-Objekt in Ihrer Anwendung freizugeben.

Behälterlimit


Um einen integralen Container aus mehreren Segmenten zusammenzusetzen, müssen wir den Zeitpunkt seiner Fertigstellung kennen. Hierfür gibt es zwei typische Möglichkeiten:

  • Übertragung der Zielgröße (in Bytes oder Segmenten) in das erste von ihnen
  • Übertragung des Vorzeichens des letzten Abschnitts in jedem von ihnen

Da wir doch PubSub schreiben, sind die meisten unserer Nachrichten kurz und es lohnt sich nicht, viele Bytes für die Größenübertragung zu reservieren. Daher verwenden wir die zweite Methode, bei der das erste Zeichen der Segmentdaten als Fortsetzungs- / End-Flag des Containers reserviert wurde.

Objektübertragung


Um sowohl reine Textzeichenfolgen als auch JSON-Objekte als „Nachricht“ zu übertragen, fügen wir auf der Empfängerseite ein weiteres Symbolzeichen für die inverse Transformation hinzu.

Da wir beschlossen haben, "non-format" in base64 zu codieren, können wir für Flags alle zulässigen Zeichen verwenden, die nicht in dieser Menge enthalten sind.

Insgesamt haben wir für die übertragenen Segmente folgende Möglichkeiten:

 -- ""   !simple string -- "  "  @{"a":1} --    base64 #<segment> --    base64 $<segment> 

Wie Sie sehen, ist es ausreichend, beim Empfang eines Segments nur das erste Zeichen zu analysieren, um zu verstehen, was weiter damit zu tun ist.

Schreiben einer PubSub-Implementierung


Unsere Anwendung wird sich auf Node.js befinden, daher werden wir das Node-Postgres-Modul verwenden , um mit PostgreSQL zu arbeiten.

Wir schreiben den Startrahmen
Zunächst erstellen wir PubSub als Erben des EventEmitter , um Ereignisse für diejenigen zu generieren, die bestimmte Channels abonniert haben:

 const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) { //     this.connection = connection; //        this.connection.on('notification', p._onmessage.bind(this)); //         this.skipSelf = skipSelf; //  "" setInterval(() => { this.connection.query('SELECT 1'); }, interval); //     ""  this.slices = {}; }; util.inherits(PubSub, EventEmitter); const p = PubSub.prototype; 

Wir arbeiten mit Kanälen
Da LISTEN / UNLISTEN beim erneuten Abonnieren eines Kanals oder beim Abbestellen dessen, was wir nicht abonniert haben, in keiner Weise schwören, werden wir nichts komplizieren.

 //     - "",      //     -        const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`; p.subscribe = function(channel) { this.connection.query(`LISTEN ${quot(channel)}`); return this; }; p.unsubscribe = function(channel) { this.connection.query(`UNLISTEN ${quot(channel)}`); return this; }; 

Nachrichten senden und empfangen
 const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD; //  ""  const reASCII = /^[\x20-\x7E]*$/; //  p.publish = function(channel, payload) { let query = `NOTIFY ${quot(channel)}`; if (payload !== null && payload !== undefined) { //    -    let str = typeof payload == 'string' ? PAYLOAD_FL_STR + payload : PAYLOAD_FL_OBJ + JSON.stringify(payload); if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) { //   base64- const b64 = Buffer.from(str).toString('base64'); for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) { let fin = pos + PAYLOAD_SZ_DATA; let seg = fin >= len ? PAYLOAD_FL_FIN + b64.slice(pos) : PAYLOAD_FL_SEQ + b64.slice(pos, fin); this.connection.query(`${query}, '${seg}'`); } } else { //        ? //     str = str.replace(/'/g, "''"); this.connection.query(`${query}, '${str}'`); } } else { //       this.connection.query(query); } return this; }; //    p._onmessage = function(msg) { const {processId, channel, payload} = msg; //  "" if (processId == this.connection.processID && this.skipSelf) { return; } // ""  const id = `${processId}:${channel}`; let rv; //   let fl = payload.charAt(0); if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) { // base64 const str = payload.slice(PAYLOAD_SZ_HEAD); const slices = this.slices; let b64; if (fl == PAYLOAD_FL_FIN) { //   if (slices[id]) { slices[id].push(str); b64 = slices[id].join(''); delete slices[id]; } else { b64 = str; } } else { //     if (slices[id]) { slices[id].push(str); } else { slices[id] = [str]; } } if (b64) { rv = Buffer.from(b64, 'base64').toString(); fl = rv.charAt(0); } } else { //  / rv = payload; } if (rv !== undefined) { //   '' let res = { processId , channel }; if (rv) { //       let data = rv.slice(1); res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data; } this.emit(channel, res); } }; 

Einige Tests
 const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, '  '); psB.publish(chA, {a : 1}); psA.publish(chB, '   100  '.repeat(100)); }); 

Alles ist recht einfach, so dass Sie es leicht auf jedem anderen in Ihrem Projekt verwendeten PL implementieren können, wobei Sie als Beispiele die Grundlage für die Arbeit mit asynchronen Benachrichtigungen heranziehen:

Source: https://habr.com/ru/post/de484978/


All Articles