PubSub est presque gratuit: NOTIFY fonctionnalités de PostgreSQL

Si vos microservices utilisent déjà une base de données PostgreSQL commune pour stocker des données, ou si plusieurs instances du même service l'utilisent sur des serveurs différents, il est relativement bon marché d'avoir la possibilité d'échanger des messages (PubSub) entre eux sans intégration dans l'architecture Redis, le cluster RabbitMQ ou l'incorporation dans le code applications d' un autre système MQ .

Pour cela, nous n'écrirons pas de messages dans les tables de la base de données , car cela entraîne trop de surcharge, d'abord pour écrire ce qui est transmis , puis aussi pour effacer de ce qui a déjà été lu .

Nous transmettrons et recevrons des données à l'aide du mécanisme NOTIFY / LISTEN , et nous collecterons une implémentation de modèle pour Node.js.



Mais sur ce chemin, il y a un râteau qui devra être soigneusement contourné.

Caractéristiques du protocole


Écoutez


LISTEN  
Une application utilisant la bibliothèque libpq exécute la commande LISTEN en tant que commande SQL standard, puis elle doit périodiquement appeler la fonction PQnotifies pour vérifier les nouvelles notifications.
Si vous n'écrivez pas une bibliothèque pour travailler avec PG, mais déjà une application spécifique, dans la plupart des cas, vous n'aurez pas accès à l'appel à cette fonction.

Mais si une telle bibliothèque a déjà été écrite pour vous conformément aux recommandations de traitement des demandes et notifications asynchrones , vous recevrez automatiquement un message dans le code de l'application. Sinon, vous pouvez simplement exécuter périodiquement SELECT 1 sur la connexion, puis une notification accompagnera le résultat de la requête:
Dans les versions très anciennes de libpq, il n'y avait qu'une seule façon de garantir la réception en temps opportun des messages de la commande NOTIFY - d'envoyer constamment des commandes, même vides, puis de vérifier PQnotifies après chaque appel PQexec. Bien que cette méthode fonctionne toujours, elle est considérée comme obsolète en raison d'une utilisation inefficace du processeur.
En termes de psql , par exemple , cela ressemble à ceci:

 _tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991. 

Si pour une tâche appliquée, nous pouvons convenir d'un délai maximum pour la livraison d'un message en 1 seconde, avec un tel intervalle, nous exécutons la demande. Dans le même temps, cette méthode permet de surveiller la «vivacité» de la connexion , en veillant à ce que personne ne la coupe accidentellement du côté serveur via pg_terminate_backend , ou qu'il n'y ait pas eu de «plantage» soudain du PG sans aucune notification aux clients.

NOTIFIER


 NOTIFY  [ ,  ] 

La commande NOTIFY envoie un événement de notification ainsi qu'une ligne de «message» supplémentaire à toutes les applications clientes qui ont précédemment exécuté un canal avec le nom de canal spécifié dans la base de données LISTEN actuelle.
...
La ligne "message" qui sera transmise avec la notification ... doit être une simple constante de texte . Dans une configuration standard, sa longueur doit être inférieure à 8 000 octets .
Autrement dit, si notre «message» contient soudainement quelque chose de très différent de l'ASCII, alors nous devrons le filtrer , et s'il dépasse la taille de 8000 octets (pas de caractères!), Puis le couper en blocs et le coller . Dans le même temps, nous devons économiser à la fois la bande passante du canal et les ressources du serveur pour traiter le transfert de ces blocs - c'est-à-dire, ajouter le moins de «liaison» de service possible au contenu utile, mais également ne pas «étrangler» l'application cliente, la forçant à emballer avec gzip -9 .

Parmi les avantages supplémentaires du mécanisme, on peut également noter la liaison à la "source" du message ...
... un travail supplémentaire peut être évité en vérifiant si le PID du processus de signalisation (indiqué dans les données d'événement) correspond au PID de la session (vous pouvez le trouver en contactant libpq). S'ils correspondent, la session a reçu une notification de ses propres actions, afin qu'elle puisse être ignorée.
... et bon de livraison garanti:
Outre le filtrage des instances ultérieures de notifications en double, NOTIFY garantit que les notifications d'une transaction unique arrivent toujours dans le même ordre dans lequel elles ont été envoyées. Il est également garanti que les messages de différentes transactions arrivent dans l'ordre dans lequel ces transactions sont validées .
Nous ne combinerons rien spécifiquement, donc chacune de nos demandes correspondra simplement à une transaction distincte.

Mais rappelez-vous que s'il y a également une activité d'application sur la connexion utilisée pour l'échange, notre NOTIFY peut ne pas être dans la transaction de notre propre gré, donc des effets secondaires peuvent se produire :
Les transactions ont un impact significatif sur NOTIFY. Premièrement, si NOTIFY est exécuté à l'intérieur d'une transaction, les notifications sont envoyées aux destinataires une fois la transaction validée, et uniquement dans ce cas. Ceci est raisonnable, car en cas d'interruption d'une transaction, l'action de toutes les commandes qu'elle contient est annulée, y compris NOTIFY .
Par conséquent, il est préférable d'utiliser une connexion où il n'y a évidemment pas de transactions ou de longues requêtes.

AccessExclusiveLock sur l'objet 0 de la classe 1262 de la base de données 0


Si tout à coup vos NOTIFY ont commencé à émousser et à enregistrer l'attente d'une telle serrure, alors vous avez toujours "grandi en pantalon court", et il est temps de penser au MQ "adulte".

Après tout, la file d'attente de notification, bien qu'assez grande (8 Go dans les versions standard), est toujours limitée. Selon la réponse de Tom Lane :
Ce verrou est maintenu lors de l'insertion du ou des messages de notification de la transaction, après quoi la transaction valide et libère le verrou.
Autrement dit, il n'y a pas trop d'options pour contourner:

  • envoyer mais moins souvent
    Autrement dit, pour agréger les indicateurs envoyés, s'il s'agit de certains compteurs, sur un intervalle plus long.
  • envoyer moins
    Par exemple, pour supprimer «par défaut» du point de vue des valeurs de clé d'application du JSON transmis.
  • envoyer uniquement du signal , pas de contenu du tout
    En option - pour démarrer plusieurs chaînes, le nom de chacune portera déjà en soi un certain sens appliqué.
  • toujours faire un envoi à partir de la base de données

Envoi de messages complexes


Encodage du corps


Dans le cas général, nous pouvons vouloir transmettre dans le message non seulement des caractères autorisés, mais aussi des lettres russes et «tous les binaires» - par conséquent, il serait pratique d'utiliser la conversion en représentation hexadécimale pour former la chaîne transmise. Et oui, cette méthode fonctionne plutôt bien:

 NOTIFY test, E'\x20\x21' 

 Asynchronous notification "test" with payload " !" received from server process with PID 63991. 

Mais revenons à la documentation:
Vous devez vous assurer que les séquences d'octets que vous créez de cette manière, en particulier en notation octale et hexadécimale, forment des caractères codés par le serveur valides . Lorsque le serveur fonctionne avec le codage UTF-8, au lieu d'un tel enregistrement d'octets, utilisez des séquences spéciales Unicode ou la syntaxe alternative Unicode décrite à la section 4.1.2.3. (Sinon, vous devrez encoder manuellement les caractères UTF-8 et les écrire par octets, ce qui est très gênant.)
Par conséquent, même avec le symbole banal de la patte de guillemet de win1251, nous prenons du pain dans la douleur:

 NOTIFY test, E'\x98' -- ERROR: invalid byte sequence for encoding "UTF8": 0x98 

Comme nous ne voulons pas " coder manuellement les caractères UTF-8 et les écrire par octets ", nous accepterons immédiatement d'envoyer le corps du message emballé en base64 s'il y a des caractères en dehors de la plage \x20-\x7E ou, si nécessaire, une segmentation. D'une part, cette méthode de packaging n'augmente pas trop la redondance (coefficient 4: 3), d'autre part, elle est implémentée au niveau des bibliothèques système dans n'importe quelle langue et fournira une charge supplémentaire minimale.

Mais même si nous n'avons pas de caractères «étranges» et que le message tient dans un segment, il y a encore une fonctionnalité - échapper à l'apostrophe :
Pour inclure une apostrophe dans une ligne, écrivez deux apostrophes à côté d'elle , par exemple: «Jeanne d'Arc». Notez que ce n'est pas la même chose que les guillemets doubles (").

Identification de segment


La tâche suivante consiste à "couper" correctement le message en blocs autorisés pour la transmission de 7999 octets , si sa taille dépasse soudainement cette valeur. Et pour que le destinataire puisse le récupérer sans casser l'ordre ou tomber dans la chaîne des segments «étrangers». Pour cela, chacun d'eux doit être identifié d'une manière ou d'une autre.

En fait, nous connaissons déjà les deux «coordonnées» - c'est le PID du processus d'envoi et le nom du canal qui vient dans chaque notification. Et l'ordre d'arrivée des segments nous est garanti par le protocole lui-même.

Écriture de voisins
Nous ne considérerons pas le cas lorsque plusieurs écrivains sur le même canal sont actifs en même temps que la connexion à la base de données (c'est-à-dire, évidemment dans le même processus de demande). Techniquement, cela peut être pris en charge en passant un identifiant supplémentaire dans l'en-tête du segment - mais il est préférable de «partager» un seul objet PubSub à l'intérieur de votre application.

Limite de conteneur


Pour assembler un conteneur intégral à partir de plusieurs segments, nous devons connaître le moment de sa réalisation. Il y a deux manières typiques de procéder:

  • transfert de la taille cible (en octets ou segments) dans le premier d'entre eux
  • transmission du signe [pas] du dernier segment dans chacun d'eux

Puisque nous écrivons PubSub après tout, la plupart de nos messages seront courts et il n'est pas rentable de réserver beaucoup d'octets pour le transfert de taille. Par conséquent, nous utiliserons la deuxième méthode, ayant réservé le premier caractère des données de segment comme indicateur de continuation / fin du conteneur.

Transfert d'objets


Afin de transmettre à la fois des chaînes de texte brut et des objets JSON en tant que «message», nous ajoutons un signe de symbole supplémentaire pour la transformation inverse du côté destinataire.

Puisque nous avons décidé d'encoder "non-format" en base64, pour les drapeaux, nous pouvons prendre tous les caractères autorisés qui ne sont pas dans cet ensemble.

Au total, nous avons obtenu les options suivantes pour les segments transmis:

 -- ""   !simple string -- "  "  @{"a":1} --    base64 #<segment> --    base64 $<segment> 

Comme vous pouvez le voir, il suffit d'analyser lors de la réception d'un segment uniquement le premier caractère pour comprendre ce que vous devez en faire.

Écrire une implémentation PubSub


Notre application sera sur Node.js, nous allons donc utiliser le module node-postgres pour travailler avec PostgreSQL.

On écrit la trame de départ
Pour commencer, créons PubSub en tant qu'héritier de l' EventEmitter , afin de pouvoir générer des événements pour ceux qui se sont abonnés à des chaînes spécifiques:

 const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) { //     this.connection = connection; //        this.connection.on('notification', p._onmessage.bind(this)); //         this.skipSelf = skipSelf; //  "" setInterval(() => { this.connection.query('SELECT 1'); }, interval); //     ""  this.slices = {}; }; util.inherits(PubSub, EventEmitter); const p = PubSub.prototype; 

Nous travaillons avec des canaux
Étant donné que LISTEN / UNLISTEN ne jure en aucune façon lors de la réinscription à une chaîne ou de la désinscription de ce à quoi nous n'étions pas abonnés, nous ne compliquerons rien.

 //     - "",      //     -        const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`; p.subscribe = function(channel) { this.connection.query(`LISTEN ${quot(channel)}`); return this; }; p.unsubscribe = function(channel) { this.connection.query(`UNLISTEN ${quot(channel)}`); return this; }; 

Envoi et réception de messages
 const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD; //  ""  const reASCII = /^[\x20-\x7E]*$/; //  p.publish = function(channel, payload) { let query = `NOTIFY ${quot(channel)}`; if (payload !== null && payload !== undefined) { //    -    let str = typeof payload == 'string' ? PAYLOAD_FL_STR + payload : PAYLOAD_FL_OBJ + JSON.stringify(payload); if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) { //   base64- const b64 = Buffer.from(str).toString('base64'); for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) { let fin = pos + PAYLOAD_SZ_DATA; let seg = fin >= len ? PAYLOAD_FL_FIN + b64.slice(pos) : PAYLOAD_FL_SEQ + b64.slice(pos, fin); this.connection.query(`${query}, '${seg}'`); } } else { //        ? //     str = str.replace(/'/g, "''"); this.connection.query(`${query}, '${str}'`); } } else { //       this.connection.query(query); } return this; }; //    p._onmessage = function(msg) { const {processId, channel, payload} = msg; //  "" if (processId == this.connection.processID && this.skipSelf) { return; } // ""  const id = `${processId}:${channel}`; let rv; //   let fl = payload.charAt(0); if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) { // base64 const str = payload.slice(PAYLOAD_SZ_HEAD); const slices = this.slices; let b64; if (fl == PAYLOAD_FL_FIN) { //   if (slices[id]) { slices[id].push(str); b64 = slices[id].join(''); delete slices[id]; } else { b64 = str; } } else { //     if (slices[id]) { slices[id].push(str); } else { slices[id] = [str]; } } if (b64) { rv = Buffer.from(b64, 'base64').toString(); fl = rv.charAt(0); } } else { //  / rv = payload; } if (rv !== undefined) { //   '' let res = { processId , channel }; if (rv) { //       let data = rv.slice(1); res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data; } this.emit(channel, res); } }; 

Quelques tests
 const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, '  '); psB.publish(chA, {a : 1}); psA.publish(chB, '   100  '.repeat(100)); }); 

Tout est assez simple, vous pouvez donc l'implémenter facilement sur n'importe quel autre PL utilisé dans votre projet, en prenant comme exemple la base pour travailler avec des notifications asynchrones:

Source: https://habr.com/ru/post/fr484978/


All Articles