O PubSub é quase gratuito: NOTIFY apresenta no PostgreSQL

Se seus microsserviços já usam um banco de dados PostgreSQL comum para armazenar dados, ou se várias instâncias do mesmo serviço os usam em servidores diferentes, é possível receber mensagens " PubSub " entre eles de maneira relativamente barata, sem integração à arquitetura Redis, cluster RabbitMQ ou incorporação no código aplicativos de outro sistema MQ .

Para fazer isso, não escreveremos mensagens nas tabelas do banco de dados , pois isso causa muita sobrecarga, primeiro ao gravar o transmitido e depois também à limpeza do que já foi lido .

Transmitiremos e receberemos dados usando o mecanismo NOTIFY / LISTEN e coletaremos uma implementação de modelo para o Node.js.



Mas, dessa maneira, há um ancinho que deve ser cuidadosamente contornado.

Recursos de protocolo


Ouça


LISTEN  
Um aplicativo que usa a biblioteca libpq executa o comando LISTEN como um comando SQL regular e, em seguida, deve chamar periodicamente a função PQnotifies para verificar novas notificações.
Se você não está escrevendo uma biblioteca para trabalhar com o PG, mas já é um aplicativo específico, na maioria dos casos, não terá acesso à chamada para esta função.

Mas se essa biblioteca já tiver sido gravada para você de acordo com as recomendações para o processamento de solicitações e notificações assíncronas , você receberá automaticamente uma mensagem no código do aplicativo. Caso contrário, você pode simplesmente executar periodicamente o SELECT 1 na conexão e, em seguida, uma notificação será exibida com o resultado da consulta:
Em versões muito antigas da libpq, havia apenas uma maneira de garantir o recebimento oportuno de mensagens do comando NOTIFY - enviar constantemente comandos, mesmo vazios, e depois verificar PQnotifies após cada chamada PQexec. Embora esse método ainda funcione, é considerado obsoleto devido ao uso ineficiente do processador.
Em termos de, por exemplo, psql, fica assim:

 _tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991. 

Se para uma tarefa aplicada, podemos concordar com um atraso máximo na entrega de uma mensagem dentro de 1 segundo, com esse intervalo, executamos a solicitação. Ao mesmo tempo, esse método ajuda a monitorar a "vitalidade" da conexão , garantindo que ninguém a corte acidentalmente do lado do servidor via pg_terminate_backend , ou se não houve uma "falha" repentina do PG sem nenhuma notificação aos clientes.

NOTIFICAR


 NOTIFY  [ ,  ] 

O comando NOTIFY envia um evento de notificação junto com uma linha de "mensagem" adicional para todos os aplicativos clientes que executaram anteriormente um canal com o nome de canal especificado no banco de dados LISTEN atual.
...
A linha de "mensagem" que será transmitida junto com a notificação ... deve ser uma constante de texto simples . Em uma configuração padrão, seu comprimento deve ser menor que 8000 bytes .
Ou seja, se nossa “mensagem” repentinamente contiver algo muito diferente do ASCII, teremos que examiná-lo e, se exceder 8.000 bytes (não caracteres!), Corte-o em blocos e cole-o . Ao mesmo tempo, devemos economizar a largura de banda do canal e os recursos do servidor para processar a transferência de tais blocos - ou seja, adicionar o mínimo possível de "ligação" ao conteúdo útil possível, mas também não "estrangular" o aplicativo cliente, forçando-o a empacotar com gzip -9 .

Das vantagens adicionais do mecanismo, também é possível observar a ligação à "fonte" da mensagem ...
... trabalho adicional pode ser evitado verificando se o PID do processo de sinalização (indicado nos dados do evento) corresponde ao PID da própria sessão (você pode encontrá-lo entrando em contato com libpq). Se eles corresponderem, a sessão receberá uma notificação de suas próprias ações, para que possa ser ignorada.
... e ordem de entrega garantida:
Além de filtrar instâncias subsequentes de notificações duplicadas, o NOTIFY garante que as notificações de uma única transação sempre cheguem na mesma ordem em que foram enviadas. Também é garantido que as mensagens de transações diferentes cheguem na ordem em que essas transações são confirmadas .
Não combinaremos nada especificamente; portanto, cada uma de nossas solicitações corresponderá apenas a uma transação separada.

Mas lembre-se de que, se houver também atividade de aplicativo na conexão usada para a troca, nosso NOTIFY pode não estar dentro da transação por vontade própria, portanto , podem ocorrer efeitos colaterais :
As transações têm um impacto significativo no NOTIFY. Primeiro, se o NOTIFY for executado dentro de uma transação, as notificações serão entregues aos destinatários após a transação ser confirmada e somente neste caso. Isso é razoável, pois, no caso de uma transação ser interrompida, a ação de todos os comandos nela é cancelada, incluindo NOTIFY .
Portanto, é melhor usar uma conexão onde obviamente não há transações ou consultas longas.

AccessExclusiveLock no objeto 0 da classe 1262 do banco de dados 0


Se, de repente, o seu NOTIFY começou a diminuir e registrar a expectativa de uma fechadura assim, você ainda "ficou sem calças" e é hora de pensar no MQ "adulto".

Afinal, a fila de notificações, embora bastante grande (8 GB em compilações padrão), ainda é finita. De acordo com a resposta de Tom Lane :
Esse bloqueio é mantido ao inserir as mensagens de notificação da transação, após as quais a transação confirma e libera o bloqueio.
Ou seja, não há muitas opções para contornar:

  • envie com menos frequência
    Ou seja, para agregar os indicadores enviados, se houver alguns contadores, por um intervalo maior.
  • envie menos
    Por exemplo, para remover "padrão" do ponto de vista dos valores da chave do aplicativo do JSON transmitido.
  • envie apenas sinal , nenhum conteúdo
    Como opção - para iniciar vários canais, o nome de cada um já terá em si algum sentido aplicado.
  • ainda faça uma remessa do banco de dados

Enviando mensagens complexas


Codificação do corpo


No caso geral, podemos querer transmitir não apenas caracteres permitidos na mensagem, mas também letras russas e “quaisquer binários” - portanto, seria conveniente usar a conversão em representação hexadecimal para formar a sequência transmitida. E sim, este método funciona muito bem:

 NOTIFY test, E'\x20\x21' 

 Asynchronous notification "test" with payload " !" received from server process with PID 63991. 

Mas vamos voltar à documentação novamente:
Você deve garantir que as seqüências de bytes criadas dessa maneira, especialmente nas notações octal e hexadecimal, formem caracteres codificados no servidor válidos . Quando o servidor opera com codificação UTF-8, em vez de uma gravação de byte, use sequências especiais Unicode ou a sintaxe Unicode alternativa descrita na Seção 4.1.2.3. (Caso contrário, você precisará codificar caracteres UTF-8 manualmente e gravá-los por bytes, o que é muito inconveniente.)
Portanto, mesmo com o símbolo comum da pata de aspas de win1251, tomamos o pão de luto:

 NOTIFY test, E'\x98' -- ERROR: invalid byte sequence for encoding "UTF8": 0x98 

Como não queremos " codificar caracteres UTF-8 manualmente e escrevê-los por bytes ", concordamos imediatamente em enviar o corpo da mensagem compactado em base64 se ele contiver caracteres fora do intervalo \x20-\x7E ou, se necessário, segmentação. Por um lado, esse método de empacotamento não aumenta muito a redundância (coeficiente 4: 3); por outro lado, é implementado no nível das bibliotecas do sistema em qualquer idioma e fornecerá carga adicional mínima.

Mas mesmo se não tivermos caracteres "estranhos" e a mensagem se encaixar em um segmento, ainda há um recurso: escapar do apóstrofo :
Para incluir um apóstrofo em uma linha, escreva dois apóstrofos próximos a ela , por exemplo: 'Joana d'Arc'. Observe que isso não é o mesmo que aspas duplas (").

Identificação do segmento


A próxima tarefa é "cortar" corretamente a mensagem em blocos permitidos para transmissão de 7999 bytes , se seu tamanho exceder esse valor repentinamente. E para que o destinatário possa coletá-lo sem quebrar a ordem ou cair na cadeia de segmentos "alienígenas". Para isso, cada um deles precisa ser identificado de alguma forma.

Na verdade, já conhecemos as duas "coordenadas" - este é o PID do processo de envio e o nome do canal que aparece em cada notificação. E a ordem de chegada dos segmentos nos é garantida pelo próprio protocolo.

Escrever vizinhos
Não consideraremos o caso em que vários gravadores no mesmo canal estão ativos ao mesmo tempo em que se conectam ao banco de dados (ou seja, obviamente dentro do mesmo processo de aplicação). Tecnicamente, isso pode ser suportado passando um identificador adicional no cabeçalho do segmento - mas é melhor “compartilhar” um único objeto PubSub dentro do seu aplicativo.

Limite do contêiner


Para montar um contêiner integral de vários segmentos, precisamos saber o momento de sua conclusão. Existem duas maneiras típicas para isso:

  • transferência do tamanho do destino (em bytes ou segmentos) no primeiro deles
  • transmissão do sinal [não] do último segmento em cada um deles

Como escrevemos o PubSub, afinal, a maioria das nossas mensagens será curta e não é rentável reservar muitos bytes para a transferência de tamanho. Portanto, usaremos o segundo método, tendo reservado o primeiro caractere dos dados do segmento como sinalizador de continuação / final do contêiner.

Transferência de Objetos


Para transmitir as strings de texto sem formatação e os objetos JSON como uma "mensagem", adicionamos mais um sinal de símbolo para a transformação inversa no lado do destinatário.

Como decidimos codificar "não-formato" em base64, para sinalizadores, podemos usar qualquer caractere permitido que não esteja nesse conjunto.

No total, temos as seguintes opções para os segmentos transmitidos:

 -- ""   !simple string -- "  "  @{"a":1} --    base64 #<segment> --    base64 $<segment> 

Como você pode ver, basta analisar ao receber um segmento apenas o primeiro caractere para entender o que precisa ser feito com ele.

Escrevendo uma implementação do PubSub


Nossa aplicação estará no Node.js, portanto, usaremos o módulo node-postgres para trabalhar com o PostgreSQL.

Nós escrevemos o quadro inicial
Para começar, vamos criar o PubSub como herdeiro do EventEmitter , para poder gerar eventos para aqueles que se inscreveram em canais específicos:

 const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) { //     this.connection = connection; //        this.connection.on('notification', p._onmessage.bind(this)); //         this.skipSelf = skipSelf; //  "" setInterval(() => { this.connection.query('SELECT 1'); }, interval); //     ""  this.slices = {}; }; util.inherits(PubSub, EventEmitter); const p = PubSub.prototype; 

Trabalhamos com canais
Como LISTEN / UNLISTEN não jura de forma alguma ao reinscrever-se em um canal ou cancelar a inscrição em que não fomos inscritos, não complicaremos nada.

 //     - "",      //     -        const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`; p.subscribe = function(channel) { this.connection.query(`LISTEN ${quot(channel)}`); return this; }; p.unsubscribe = function(channel) { this.connection.query(`UNLISTEN ${quot(channel)}`); return this; }; 

Enviando e recebendo mensagens
 const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD; //  ""  const reASCII = /^[\x20-\x7E]*$/; //  p.publish = function(channel, payload) { let query = `NOTIFY ${quot(channel)}`; if (payload !== null && payload !== undefined) { //    -    let str = typeof payload == 'string' ? PAYLOAD_FL_STR + payload : PAYLOAD_FL_OBJ + JSON.stringify(payload); if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) { //   base64- const b64 = Buffer.from(str).toString('base64'); for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) { let fin = pos + PAYLOAD_SZ_DATA; let seg = fin >= len ? PAYLOAD_FL_FIN + b64.slice(pos) : PAYLOAD_FL_SEQ + b64.slice(pos, fin); this.connection.query(`${query}, '${seg}'`); } } else { //        ? //     str = str.replace(/'/g, "''"); this.connection.query(`${query}, '${str}'`); } } else { //       this.connection.query(query); } return this; }; //    p._onmessage = function(msg) { const {processId, channel, payload} = msg; //  "" if (processId == this.connection.processID && this.skipSelf) { return; } // ""  const id = `${processId}:${channel}`; let rv; //   let fl = payload.charAt(0); if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) { // base64 const str = payload.slice(PAYLOAD_SZ_HEAD); const slices = this.slices; let b64; if (fl == PAYLOAD_FL_FIN) { //   if (slices[id]) { slices[id].push(str); b64 = slices[id].join(''); delete slices[id]; } else { b64 = str; } } else { //     if (slices[id]) { slices[id].push(str); } else { slices[id] = [str]; } } if (b64) { rv = Buffer.from(b64, 'base64').toString(); fl = rv.charAt(0); } } else { //  / rv = payload; } if (rv !== undefined) { //   '' let res = { processId , channel }; if (rv) { //       let data = rv.slice(1); res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data; } this.emit(channel, res); } }; 

Alguns testes
 const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, '  '); psB.publish(chA, {a : 1}); psA.publish(chB, '   100  '.repeat(100)); }); 

Tudo é bastante simples, então você pode implementá-lo facilmente em qualquer outro PL usado em seu projeto, tomando como exemplo a base para trabalhar com notificações assíncronas:

Source: https://habr.com/ru/post/pt484978/


All Articles