Se seus microsserviços
já usam um banco de dados PostgreSQL comum para armazenar dados, ou se várias instâncias do mesmo serviço os usam em servidores diferentes, é possível
receber mensagens " PubSub
" entre eles de maneira relativamente barata, sem integração à arquitetura Redis, cluster RabbitMQ ou incorporação no código aplicativos de
outro sistema MQ .
Para fazer isso,
não escreveremos mensagens nas tabelas do banco de dados , pois isso causa muita sobrecarga, primeiro ao
gravar o transmitido e depois também à
limpeza do que já foi lido .
Transmitiremos e receberemos dados usando o mecanismo
NOTIFY /
LISTEN e coletaremos uma implementação de modelo para o Node.js.

Mas, dessa maneira, há um ancinho que deve ser cuidadosamente contornado.
Recursos de protocolo
Ouça
LISTEN
Um aplicativo que usa a biblioteca libpq executa o comando LISTEN como um comando SQL regular e, em seguida, deve chamar periodicamente a função PQnotifies para verificar novas notificações.
Se você não está escrevendo uma biblioteca para trabalhar com o PG, mas já é um aplicativo específico, na maioria dos casos, não terá acesso à chamada para esta função.
Mas se essa biblioteca já tiver sido gravada para você de acordo com as recomendações para o processamento de
solicitações e
notificações assíncronas , você receberá automaticamente uma mensagem no código do aplicativo. Caso contrário, você pode simplesmente
executar periodicamente o SELECT 1
na conexão e, em seguida, uma notificação será exibida com o resultado da consulta:
Em versões muito antigas da libpq, havia apenas uma maneira de garantir o recebimento oportuno de mensagens do comando NOTIFY - enviar constantemente comandos, mesmo vazios, e depois verificar PQnotifies após cada chamada PQexec. Embora esse método ainda funcione, é considerado obsoleto devido ao uso ineficiente do processador.
Em termos de, por exemplo,
psql, fica assim:
_tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991.
Se para uma tarefa aplicada, podemos concordar com um atraso máximo na entrega de uma mensagem dentro de 1 segundo, com esse intervalo, executamos a solicitação. Ao mesmo tempo, esse método ajuda a
monitorar a "vitalidade" da conexão , garantindo que ninguém a corte acidentalmente do lado do servidor via
pg_terminate_backend
, ou se não houve uma "falha" repentina do PG sem nenhuma notificação aos clientes.
NOTIFICAR
NOTIFY [ , ]
O comando NOTIFY envia um evento de notificação junto com uma linha de "mensagem" adicional para todos os aplicativos clientes que executaram anteriormente um canal com o nome de canal especificado no banco de dados LISTEN atual.
...
A linha de "mensagem" que será transmitida junto com a notificação ... deve ser uma constante de texto simples . Em uma configuração padrão, seu comprimento deve ser menor que 8000 bytes .
Ou seja, se nossa “mensagem” repentinamente contiver algo muito diferente do ASCII, teremos
que examiná-lo e, se exceder 8.000 bytes (não caracteres!),
Corte-o em blocos e cole-o . Ao mesmo tempo, devemos economizar a largura de banda do canal e os recursos do servidor para processar a transferência de tais blocos - ou seja, adicionar o mínimo possível de "ligação" ao conteúdo útil possível, mas também não "estrangular" o aplicativo cliente, forçando-o a empacotar com
gzip -9
.
Das vantagens adicionais do mecanismo, também é possível observar a ligação à "fonte" da mensagem ...
... trabalho adicional pode ser evitado verificando se o PID do processo de sinalização (indicado nos dados do evento) corresponde ao PID da própria sessão (você pode encontrá-lo entrando em contato com libpq). Se eles corresponderem, a sessão receberá uma notificação de suas próprias ações, para que possa ser ignorada.
... e ordem de entrega garantida:
Além de filtrar instâncias subsequentes de notificações duplicadas, o NOTIFY garante que as notificações de uma única transação sempre cheguem na mesma ordem em que foram enviadas. Também é garantido que as mensagens de transações diferentes cheguem na ordem em que essas transações são confirmadas .
Não combinaremos nada especificamente; portanto, cada uma de nossas solicitações corresponderá apenas a uma transação separada.
Mas lembre-se de que, se houver também atividade de aplicativo na conexão usada para a troca, nosso NOTIFY pode não estar dentro da transação por vontade própria, portanto
, podem ocorrer efeitos colaterais :
As transações têm um impacto significativo no NOTIFY. Primeiro, se o NOTIFY for executado dentro de uma transação, as notificações serão entregues aos destinatários após a transação ser confirmada e somente neste caso. Isso é razoável, pois, no caso de uma transação ser interrompida, a ação de todos os comandos nela é cancelada, incluindo NOTIFY .
Portanto, é melhor usar uma conexão onde obviamente não há transações ou consultas longas.
AccessExclusiveLock no objeto 0 da classe 1262 do banco de dados 0
Se, de repente, o seu NOTIFY começou a diminuir e registrar a expectativa de uma fechadura assim, você ainda "ficou sem calças" e é hora de pensar no MQ "adulto".
Afinal, a fila de notificações, embora bastante grande (8 GB em compilações padrão), ainda é finita. De acordo com
a resposta de Tom Lane :
Esse bloqueio é mantido ao inserir as mensagens de notificação da transação, após as quais a transação confirma e libera o bloqueio.
Ou seja, não há muitas opções para contornar:
- envie com menos frequência
Ou seja, para agregar os indicadores enviados, se houver alguns contadores, por um intervalo maior. - envie menos
Por exemplo, para remover "padrão" do ponto de vista dos valores da chave do aplicativo do JSON transmitido. - envie apenas sinal , nenhum conteúdo
Como opção - para iniciar vários canais, o nome de cada um já terá em si algum sentido aplicado. - ainda faça uma remessa do banco de dados
Enviando mensagens complexas
Codificação do corpo
No caso geral, podemos querer transmitir não apenas caracteres permitidos na mensagem, mas também letras russas e “quaisquer binários” - portanto, seria conveniente usar a
conversão em representação hexadecimal para formar a sequência transmitida. E sim, este método funciona muito bem:
NOTIFY test, E'\x20\x21'
Asynchronous notification "test" with payload " !" received from server process with PID 63991.
Mas vamos voltar à documentação novamente:
Você deve garantir que as seqüências de bytes criadas dessa maneira, especialmente nas notações octal e hexadecimal, formem caracteres codificados no servidor válidos . Quando o servidor opera com codificação UTF-8, em vez de uma gravação de byte, use sequências especiais Unicode ou a sintaxe Unicode alternativa descrita na Seção 4.1.2.3. (Caso contrário, você precisará codificar caracteres UTF-8 manualmente e gravá-los por bytes, o que é muito inconveniente.)
Portanto, mesmo com o símbolo comum da pata de aspas de win1251, tomamos o pão de luto:
NOTIFY test, E'\x98'
Como não queremos "
codificar caracteres UTF-8 manualmente e escrevê-los por bytes ", concordamos imediatamente em enviar o corpo da mensagem
compactado em base64 se ele contiver caracteres fora do intervalo
\x20-\x7E
ou, se necessário, segmentação. Por um lado, esse método de empacotamento não aumenta muito a redundância (coeficiente 4: 3); por outro lado, é implementado no nível das bibliotecas do sistema em qualquer idioma e fornecerá carga adicional mínima.
Mas mesmo se não tivermos caracteres "estranhos" e a mensagem se encaixar em um segmento, ainda há um recurso:
escapar do apóstrofo :
Para incluir um apóstrofo em uma linha, escreva dois apóstrofos próximos a ela , por exemplo: 'Joana d'Arc'. Observe que isso não é o mesmo que aspas duplas (").
Identificação do segmento
A próxima tarefa é "cortar" corretamente a mensagem em
blocos permitidos para transmissão
de 7999 bytes , se seu tamanho exceder esse valor repentinamente. E para que o destinatário possa coletá-lo sem quebrar a ordem ou cair na cadeia de segmentos "alienígenas". Para isso, cada um deles precisa ser identificado de alguma forma.
Na verdade, já conhecemos as duas "coordenadas" - este é o
PID do processo de envio e o
nome do canal que aparece em cada notificação. E a ordem de chegada dos segmentos nos é garantida pelo próprio protocolo.
Escrever vizinhosNão consideraremos o caso em que vários gravadores no mesmo canal estão ativos ao mesmo tempo em que se conectam ao banco de dados (ou seja, obviamente dentro do mesmo processo de aplicação). Tecnicamente, isso pode ser suportado passando um identificador adicional no cabeçalho do segmento - mas é melhor “compartilhar” um único objeto PubSub dentro do seu aplicativo.
Limite do contêiner
Para montar um contêiner integral de vários segmentos, precisamos saber o momento de sua conclusão. Existem duas maneiras típicas para isso:
- transferência do tamanho do destino (em bytes ou segmentos) no primeiro deles
- transmissão do sinal [não] do último segmento em cada um deles
Como escrevemos o PubSub, afinal, a maioria das nossas mensagens será curta e não é rentável reservar muitos bytes para a transferência de tamanho. Portanto, usaremos o segundo método, tendo reservado o
primeiro caractere dos dados do segmento como sinalizador de continuação / final do contêiner.
Transferência de Objetos
Para transmitir as strings de texto sem formatação e os objetos JSON como uma "mensagem", adicionamos
mais um sinal de símbolo para a transformação inversa no lado do destinatário.
Como decidimos codificar "não-formato" em base64, para sinalizadores, podemos usar qualquer caractere permitido que não esteja nesse conjunto.
No total, temos as seguintes opções para os segmentos transmitidos:
-- "" !simple string -- " " @{"a":1} -- base64 #<segment> -- base64 $<segment>
Como você pode ver, basta analisar ao receber um segmento apenas o primeiro caractere para entender o que precisa ser feito com ele.
Escrevendo uma implementação do PubSub
Nossa aplicação estará no Node.js, portanto, usaremos o
módulo node-postgres para trabalhar com o PostgreSQL.
Nós escrevemos o quadro inicialPara começar, vamos criar o PubSub como herdeiro do
EventEmitter , para poder gerar eventos para aqueles que se inscreveram em canais específicos:
const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) {
Trabalhamos com canaisComo LISTEN / UNLISTEN não jura de forma alguma ao reinscrever-se em um canal ou cancelar a inscrição em que não fomos inscritos, não complicaremos nada.
Enviando e recebendo mensagens const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD;
Alguns testes const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, ' '); psB.publish(chA, {a : 1}); psA.publish(chB, ' 100 '.repeat(100)); });
Tudo é bastante simples, então você pode implementá-lo facilmente em qualquer outro PL usado em seu projeto, tomando como exemplo a base para trabalhar com notificações assíncronas: