Filas de mensagens do PostgreSQL usando PgQ



As filas de mensagens são usadas para executar: operações pendentes, interação entre serviços, "processamento em lote", etc. Existem soluções especializadas para organizar essas filas, como: RabbitMQ, ActiveMQ, ZeroMQ, etc., mas muitas vezes acontece que elas não são de grande necessidade, e sua instalação e suporte causarão mais dor e sofrimento do que benefícios. Suponha que você tenha um serviço ao se registrar, no qual um email é enviado ao usuário para confirmação e, se você usa o Postgres, está com sorte - no Postgres, quase pronto, existe uma extensão PgQ que fará todo o trabalho sujo para você.

Neste artigo, falarei sobre enfileiramento de mensagens (tarefas) no PostgreSQL usando a extensão PgQ. Este artigo será útil se você não tiver usado o PgQ ou estiver usando filas auto-escritas no Postgres.

Por que você precisa do PgQ, se você pode apenas criar um tablet e escrever tarefas lá? Parece possível, mas você precisará levar em consideração o acesso paralelo às tarefas, possíveis erros (o que acontecerá se o processo que processa a tarefa cair?), Além do desempenho (o PgQ é muito rápido e as soluções auto-escritas geralmente não são, especialmente se a transação estiver em andamento). o banco de dados não fecha durante toda a execução da tarefa), mas a razão mais importante pela qual, na minha opinião, é necessário usar o PgQ é que o PgQ já está escrito e funciona, e a solução auto-escrita ainda precisa ser escrita (UPD: sobre por que não vale a pena usar filas auto-escritas) , você pode ler, por exemplo, aqui ).
(UPD: como o PgQ é executado no Postgres, todas as delícias das transações também podem ser usadas)

Mas o PgQ tem um grande ponto negativo - a falta de documentação, tento compensar essa falha com este artigo.

Dispositivo


O PgQ consiste em partes (pelo menos 2): 1 - a extensão pgq para o postgres, 2 - o daemon pgqd (sobre como instalá-los um pouco mais tarde).

Toda interação com a fila é realizada usando funções dentro do Postgres.

Por exemplo, para criar uma fila, você deve executar

select * from pgq.create_queue({ } text); 

Após a criação da fila, você pode adicionar mensagens a ela.

 select * from pgq.insert_event({ } text, { } text, {  } text); 

Agora precisamos aprender a receber mensagens gravadas. Para isso, existe uma entidade como "consumidor" (escreverei um consumidor), que recebe não mensagens (eventos), mas "lote" (lote). Um bach é um grupo de mensagens consecutivas, os baches são criados usando o pgqd. Periodicamente (o parâmetro “ticker_period” no arquivo de configuração), o pgqd pega todas as mensagens acumuladas e grava em um novo bach. É importante que o pgqd não funcione e, em seguida, novos bachs não sejam criados, o que significa que os consumidores não têm nada para ler; também, se o pgqd não funcionar por um longo período de tempo e depois ligado, ele criará um grande bach a partir das mensagens acumuladas durante esse período; portanto, o pgqd não deve ser apenas desligue-o.

Registro de um consumidor ( Importante! Um consumidor receberá eventos registrados somente após seu registro, portanto, você deve primeiro criar um consumidor e, em seguida, gravar eventos):

 select * from pgq.register_consumer({ } text, { } text); 

(semelhante a pgq.unregister_consumer)
Cada consumidor receberá absolutamente todos os eventos que ocorreram após sua criação (mesmo processados ​​por outro consumidor), o que significa que provavelmente você precisará de apenas um consumidor por um turno. Em seguida, mostrarei como dividir a carga em vários servidores.

Para obter um bach, primeiro você precisa descobrir seu ID:

 select * from pgq.next_batch({ } text, { } text); 

A função pode retornar NULL se o consumidor tiver processado todos os bachs. Nesse caso, você só precisa esperar até o pgqd criar um novo bach.

Nesse caso, enquanto o bach não for processado, essa função retornará o mesmo valor.
Você pode obter todos os eventos do lote usando:

 select * from pgq.get_batch_events({id } bigint); 

(O bach pode estar vazio.)

Se ocorreu um erro ao processar um deles, você pode tentar processar este evento posteriormente:

 select * from pgq.event_retry({id } bigint, {id } bigint, {   } integer); 

Para informar sobre o final do bach e ter a oportunidade de iniciar um novo, ele é usado

 select * from pgq.finish_batch({id } bigint); 

Obviamente, essas não são todas as funções da extensão, recomendo a leitura de pgq.imtqy.com/extension/pgq/files/external-sql.html e github.com/pgq/pgq/pgq/tree/master/functions (cada arquivo contém uma definição e descrição função correspondente).

Compartilhamento de carga


Para lidar com eventos simultaneamente por vários manipuladores, existe a extensão pgq_coop, que funciona como pgq e adiciona uma nova entidade chamada "sub consumidor", que receberá todos os eventos a partir do momento do registro do consumidor pai, é claro, exceto os que já foram processados.

 select * from pgq_coop.register_subconsumer({ } text, { } text, { } text); 

 select * from pgq_coop.next_batch({ } text, { } text, { } text); 

 select * from pgq_coop.next_batch({ } text, { } text, { } text, {        ,      } interval); 

 select * from pgq_coop.finish_batch({id } bigint); 

Leia sobre todos os recursos aqui .

Instalação


A extensão pgq e o daemon pgqd estão incluídos nos repositórios PGDG e são instalados de maneira muito simples na maioria das distribuições, por exemplo, no Debian:

sudo apt install postgresql-XX-pgq3 pgqd (XX é o número da versão).

O pgqd é um pequeno programa que pode ser descoberto sobre o uso do pgqd --help , não se esqueça de adicioná-lo à execução automática ( sudo systemctl enable pgqd.service , e a configuração padrão é /etc/pgqd.ini ).

Para começar a usar o PgQ, no banco de dados você só precisa conectar a extensão:

 create extension if not exists pgq; 


Com o pgq_coop, tudo é um pouco mais complicado, não está no repositório, mas não é difícil compilá-lo a partir dos fontes (exemplo para o Debian):

 sudo apt install postgresql-server-dev-XX git clone https://github.com/pgq/pgq-coop.git cd pgq-coop sudo make install 

e conecte a extensão usando

 create extension if not exists pgq_coop; 

Links úteis


Documentação pgq
Funções pgq
Funções Pgq_coop
Código fonte do Pgqd
conta do github com todos os projetos relacionados
Postgres da Wiki

Source: https://habr.com/ru/post/pt483014/


All Articles