PubSub es casi gratis: NOTIFICAR características en PostgreSQL

Si sus microservicios ya usan una base de datos PostgreSQL común para almacenar datos, o si varias instancias del mismo servicio lo usan en diferentes servidores, puede obtener mensajes " PubSub " entre ellos de manera relativamente económica sin integración en la arquitectura Redis, el clúster RabbitMQ o incrustarlos en el código aplicaciones de otro sistema MQ .

Para esto, no escribiremos mensajes en las tablas de la base de datos , ya que esto causa demasiada sobrecarga, primero para escribir lo que se transmite y luego también para borrar lo que ya se ha leído .

Transmitiremos y recibiremos datos utilizando el mecanismo NOTIFICAR / ESCUCHAR , y recopilaremos una implementación modelo para Node.js.



Pero de esta manera hay un rastrillo que deberá ser evitado cuidadosamente.

Características del protocolo


Escucha


LISTEN  
Una aplicación que usa la biblioteca libpq ejecuta el comando LISTEN como un comando SQL normal, y luego debe llamar periódicamente a la función PQnotifies para verificar si hay nuevas notificaciones.
Si no está escribiendo una biblioteca para trabajar con PG, pero ya es una aplicación específica, en la mayoría de los casos, no tendrá acceso a la llamada a esta función.

Pero si dicha biblioteca ya se ha escrito para usted de acuerdo con las recomendaciones para procesar solicitudes y notificaciones asincrónicas , recibirá automáticamente un mensaje en el código de la aplicación. Si no, simplemente puede ejecutar periódicamente SELECT 1 en la conexión, luego aparecerá una notificación con el resultado de la consulta:
En versiones muy antiguas de libpq, solo había una forma de garantizar la recepción oportuna de los mensajes del comando NOTIFICAR: enviar constantemente comandos, incluso los vacíos, y luego verificar PQnotificaciones después de cada llamada PQexec. Aunque este método todavía funciona, se considera obsoleto debido al uso ineficiente del procesador.
En términos de, por ejemplo, psql, se ve así:

 _tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991. 

Si para una tarea aplicada podemos acordar un retraso máximo en la entrega de un mensaje dentro de 1 segundo, con dicho intervalo, ejecutamos la solicitud. Al mismo tiempo, este método ayuda a monitorear la "vida" de la conexión , asegurándose de que nadie la corte accidentalmente desde el lado del servidor a través de pg_terminate_backend , o que no haya un "bloqueo" repentino de la PG sin ninguna notificación a los clientes.

NOTIFICAR


 NOTIFY  [ ,  ] 

El comando NOTIFICAR envía un evento de notificación junto con una línea de "mensaje" adicional a todas las aplicaciones cliente que hayan ejecutado previamente un canal con el nombre del canal especificado en la base de datos LISTEN actual.
...
La línea de "mensaje" que se transmitirá junto con la notificación ... debe ser una constante de texto simple . En una configuración estándar, su longitud debe ser inferior a 8000 bytes .
Es decir, si nuestro "mensaje" de repente contiene algo muy diferente de ASCII, entonces tendremos que filtrarlo , y si supera los 8,000 bytes (¡no caracteres!), Luego córtelo en bloques y luego péguelo . Al mismo tiempo, debemos guardar tanto el ancho de banda del canal como los recursos del servidor para procesar la transferencia de dichos bloques, es decir, agregar la menor "vinculación" de servicios al contenido útil posible, pero no "estrangular" la aplicación cliente, forzándola a empacar gzip -9 .

De las ventajas adicionales del mecanismo, también se puede observar la vinculación a la "fuente" del mensaje ...
... se puede evitar el trabajo adicional al verificar si el PID del proceso de señalización (indicado en los datos del evento) coincide con el propio PID de la sesión (puede encontrarlo contactando a libpq). Si coinciden, la sesión recibió una notificación de sus propias acciones, para que pueda ignorarse.
... y orden de entrega garantizada:
Además de filtrar las instancias posteriores de notificaciones duplicadas, NOTIFY garantiza que las notificaciones de una sola transacción siempre lleguen en el mismo orden en que fueron enviadas. También se garantiza que los mensajes de diferentes transacciones lleguen en el orden en que se confirman estas transacciones .
No combinaremos nada específicamente, por lo que cada una de nuestras solicitudes solo corresponderá a una transacción por separado.

Pero recuerde que si también hay actividad de aplicación en la conexión utilizada para el intercambio, nuestro AVISO puede no estar dentro de la transacción de nuestro propio libre albedrío, por lo que pueden ocurrir efectos secundarios :
Las transacciones tienen un impacto significativo en NOTIFICAR. Primero, si NOTIFY se ejecuta dentro de una transacción, las notificaciones se entregan a los destinatarios después de que se confirma la transacción, y solo en este caso. Esto es razonable, ya que en caso de que se interrumpa una transacción, se cancela la acción de todos los comandos, incluida NOTIFY .
Por lo tanto, es mejor usar una conexión donde obviamente no hay transacciones o consultas largas.

AccessExclusiveLock en el objeto 0 de la clase 1262 de la base de datos 0


Si de repente sus NOTIFICACIONES comenzaron a disminuir y registrar la expectativa de un bloqueo de este tipo, entonces todavía se ha "quedado sin pantalones" y es hora de pensar en el MQ "adulto".

Después de todo, la cola de notificaciones, aunque bastante grande (8 GB en compilaciones estándar), todavía es finita. Según la respuesta de Tom Lane :
Este bloqueo se mantiene mientras se insertan los mensajes de notificación de la transacción, después de lo cual la transacción confirma y libera el bloqueo.
Es decir, no hay muchas opciones para evitar:

  • enviar pero con menos frecuencia
    Es decir, para agregar los indicadores enviados, si se trata de algunos contadores, durante un intervalo más largo.
  • enviar menos
    Por ejemplo, para eliminar "predeterminado" desde el punto de vista de los valores clave de la aplicación del JSON transmitido.
  • enviar solo señal , sin contenido
    Como opción, para iniciar varios canales, el nombre de cada uno ya tendrá un sentido aplicado.
  • todavía hacer un envío desde la base de datos

Enviar mensajes complejos


Codificación del cuerpo


En el caso general, es posible que deseemos transmitir no solo los caracteres permitidos en el mensaje, sino también las letras rusas y "cualquier binario"; por lo tanto, sería conveniente utilizar la conversión a representación hexadecimal para formar la cadena transmitida. Y sí, este método funciona bastante bien:

 NOTIFY test, E'\x20\x21' 

 Asynchronous notification "test" with payload " !" received from server process with PID 63991. 

Pero volvamos a la documentación nuevamente:
Debe asegurarse de que las secuencias de bytes que cree de esta manera, especialmente en notación octal y hexadecimal, formen caracteres válidos codificados por el servidor . Cuando el servidor funciona con codificación UTF-8, en lugar de dicha grabación de bytes, utilice secuencias especiales de Unicode o la sintaxis alternativa de Unicode descrita en la Sección 4.1.2.3. (De lo contrario, tendrá que codificar los caracteres UTF-8 manualmente y escribirlos por bytes, lo cual es muy inconveniente).
Por lo tanto, incluso con el símbolo común de la cita marca-pata de win1251, tomamos el pan con pena:

 NOTIFY test, E'\x98' -- ERROR: invalid byte sequence for encoding "UTF8": 0x98 

Como no queremos " codificar los caracteres UTF-8 manualmente y escribirlos por bytes ", inmediatamente aceptamos enviar el cuerpo del mensaje empaquetado en base64 si contiene caracteres fuera del rango \x20-\x7E o, si es necesario, segmentación. Por un lado, este método de empaque no aumenta demasiado la redundancia (coeficiente 4: 3), por otro lado, se implementa a nivel de bibliotecas del sistema en cualquier idioma y proporcionará una carga adicional mínima.

Pero incluso si no tenemos caracteres "extraños", y el mensaje cabe en un segmento, todavía hay una característica: escapar del apóstrofe :
Para incluir un apóstrofe en una línea, escriba dos apóstrofos al lado , por ejemplo: 'Juana de Arco'. Tenga en cuenta que esto no es lo mismo que la comilla doble (").

Identificación de segmento


La siguiente tarea es "cortar" correctamente el mensaje en bloques permitidos para la transmisión de 7999 bytes , si su tamaño excede repentinamente este valor. Y para que el destinatario pueda recogerlo sin romper el orden o caer en la cadena de segmentos "alienígenas". Para esto, cada uno de ellos necesita ser identificado de alguna manera.

En realidad, ya conocemos las dos "coordenadas": este es el PID del proceso de envío y el nombre del canal que viene en cada notificación. Y el protocolo en sí mismo nos garantiza el orden de llegada de los segmentos.

Escribiendo vecinos
No consideraremos el caso cuando varios escritores en el mismo canal están activos al mismo tiempo que se conectan a la base de datos (es decir, obviamente dentro del mismo proceso de solicitud). Técnicamente, esto puede ser soportado al pasar un identificador adicional en el encabezado del segmento, pero es mejor "compartir" un solo objeto PubSub dentro de su aplicación.

Límite de contenedor


Para ensamblar un contenedor integral de varios segmentos, necesitamos saber el momento de su finalización. Hay dos formas típicas para esto:

  • transferencia del tamaño de destino (en bytes o segmentos) en el primero de ellos
  • transmisión del signo de [no] el último segmento en cada uno de ellos

Como escribimos PubSub después de todo, la mayoría de nuestros mensajes serán cortos y no es rentable reservar muchos bytes para la transferencia de tamaño. Por lo tanto, utilizaremos el segundo método, habiendo reservado el primer carácter de los datos del segmento como el indicador de continuación / final del contenedor.

Transferencia de objetos


Para transmitir cadenas de texto sin formato y objetos JSON como un "mensaje", agregamos un signo de símbolo más para la transformación inversa en el lado del destinatario.

Como decidimos codificar "sin formato" en base64, para las banderas podemos tomar cualquier carácter permitido que no esté en este conjunto.

Total, tenemos las siguientes opciones para los segmentos transmitidos:

 -- ""   !simple string -- "  "  @{"a":1} --    base64 #<segment> --    base64 $<segment> 

Como puede ver, es suficiente analizar cuando recibe un segmento solo el primer carácter para comprender lo que necesita hacer con él.

Escribir una implementación de PubSub


Nuestra aplicación estará en Node.js, por lo que utilizaremos el módulo node-postgres para trabajar con PostgreSQL.

Escribimos el marco inicial
Para comenzar, creemos PubSub como el heredero del EventEmitter , para poder generar eventos para aquellos que se suscribieron a canales específicos:

 const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) { //     this.connection = connection; //        this.connection.on('notification', p._onmessage.bind(this)); //         this.skipSelf = skipSelf; //  "" setInterval(() => { this.connection.query('SELECT 1'); }, interval); //     ""  this.slices = {}; }; util.inherits(PubSub, EventEmitter); const p = PubSub.prototype; 

Trabajamos con canales
Como ESCUCHAR / NO ESCUCHAR no jura de ninguna manera al volver a suscribirse a un canal o darse de baja de lo que no estábamos suscritos, no complicaremos nada.

 //     - "",      //     -        const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`; p.subscribe = function(channel) { this.connection.query(`LISTEN ${quot(channel)}`); return this; }; p.unsubscribe = function(channel) { this.connection.query(`UNLISTEN ${quot(channel)}`); return this; }; 

Enviar y recibir mensajes
 const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD; //  ""  const reASCII = /^[\x20-\x7E]*$/; //  p.publish = function(channel, payload) { let query = `NOTIFY ${quot(channel)}`; if (payload !== null && payload !== undefined) { //    -    let str = typeof payload == 'string' ? PAYLOAD_FL_STR + payload : PAYLOAD_FL_OBJ + JSON.stringify(payload); if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) { //   base64- const b64 = Buffer.from(str).toString('base64'); for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) { let fin = pos + PAYLOAD_SZ_DATA; let seg = fin >= len ? PAYLOAD_FL_FIN + b64.slice(pos) : PAYLOAD_FL_SEQ + b64.slice(pos, fin); this.connection.query(`${query}, '${seg}'`); } } else { //        ? //     str = str.replace(/'/g, "''"); this.connection.query(`${query}, '${str}'`); } } else { //       this.connection.query(query); } return this; }; //    p._onmessage = function(msg) { const {processId, channel, payload} = msg; //  "" if (processId == this.connection.processID && this.skipSelf) { return; } // ""  const id = `${processId}:${channel}`; let rv; //   let fl = payload.charAt(0); if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) { // base64 const str = payload.slice(PAYLOAD_SZ_HEAD); const slices = this.slices; let b64; if (fl == PAYLOAD_FL_FIN) { //   if (slices[id]) { slices[id].push(str); b64 = slices[id].join(''); delete slices[id]; } else { b64 = str; } } else { //     if (slices[id]) { slices[id].push(str); } else { slices[id] = [str]; } } if (b64) { rv = Buffer.from(b64, 'base64').toString(); fl = rv.charAt(0); } } else { //  / rv = payload; } if (rv !== undefined) { //   '' let res = { processId , channel }; if (rv) { //       let data = rv.slice(1); res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data; } this.emit(channel, res); } }; 

Algunas pruebas
 const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, '  '); psB.publish(chA, {a : 1}); psA.publish(chB, '   100  '.repeat(100)); }); 

Todo es bastante simple, por lo que puede implementarlo fácilmente en cualquier otro PL utilizado en su proyecto, tomando como ejemplo la base para trabajar con notificaciones asincrónicas:

Source: https://habr.com/ru/post/484978/


All Articles