
As filas de mensagens são usadas para executar: operações pendentes, interação entre serviços, "processamento em lote", etc. Existem soluções especializadas para organizar essas filas, como: RabbitMQ, ActiveMQ, ZeroMQ, etc., mas muitas vezes acontece que elas não são de grande necessidade, e sua instalação e suporte causarão mais dor e sofrimento do que benefícios. Suponha que você tenha um serviço ao se registrar, no qual um email é enviado ao usuário para confirmação e, se você usa o Postgres, está com sorte - no Postgres, quase pronto, existe uma extensão PgQ que fará todo o trabalho sujo para você.
Neste artigo, falarei sobre enfileiramento de mensagens (tarefas) no PostgreSQL usando a extensão PgQ. Este artigo será útil se você não tiver usado o PgQ ou estiver usando filas auto-escritas no Postgres.
Por que você precisa do PgQ, se você pode apenas criar um tablet e escrever tarefas lá? Parece possível, mas você precisará levar em consideração o acesso paralelo às tarefas, possíveis erros (o que acontecerá se o processo que processa a tarefa cair?), Além do desempenho (o PgQ é muito rápido e as soluções auto-escritas geralmente não são, especialmente se a transação estiver em andamento). o banco de dados não fecha durante toda a execução da tarefa), mas a razão mais importante pela qual, na minha opinião, é necessário usar o PgQ é que o PgQ já está escrito e funciona, e a solução auto-escrita ainda precisa ser escrita (UPD: sobre por que não vale a pena usar filas auto-escritas) , você pode ler, por exemplo,
aqui ).
(UPD: como o PgQ é executado no Postgres, todas as delícias das transações também podem ser usadas)
Mas o PgQ tem um grande ponto negativo - a falta de documentação, tento compensar essa falha com este artigo.
Dispositivo
O PgQ consiste em partes (pelo menos 2): 1 - a extensão pgq para o postgres, 2 - o daemon pgqd (sobre como instalá-los um pouco mais tarde).
Toda interação com a fila é realizada usando funções dentro do Postgres.
Por exemplo, para criar uma fila, você deve executar
select * from pgq.create_queue({ } text);
Após a criação da fila, você pode adicionar mensagens a ela.
select * from pgq.insert_event({ } text, { } text, { } text);
Agora precisamos aprender a receber mensagens gravadas. Para isso, existe uma entidade como "consumidor" (escreverei um consumidor), que recebe não mensagens (eventos), mas "lote" (lote). Um bach é um grupo de mensagens consecutivas, os baches são criados usando o pgqd. Periodicamente (o parâmetro “ticker_period” no arquivo de configuração), o pgqd pega todas as mensagens acumuladas e grava em um novo bach.
É importante que o pgqd não funcione e, em seguida, novos bachs não sejam criados, o que significa que os consumidores não têm nada para ler; também, se o pgqd não funcionar por um longo período de tempo e depois ligado, ele criará um grande bach a partir das mensagens acumuladas durante esse período; portanto, o pgqd não deve ser apenas desligue-o.
Registro de um consumidor (
Importante! Um consumidor receberá eventos registrados somente
após seu registro, portanto, você deve primeiro criar um consumidor e, em seguida, gravar eventos):
select * from pgq.register_consumer({ } text, { } text);
(semelhante a pgq.unregister_consumer)Cada consumidor receberá
absolutamente todos os eventos que ocorreram após sua criação (mesmo processados por outro consumidor), o que significa que provavelmente você precisará de apenas um consumidor por um turno. Em seguida, mostrarei como dividir a carga em vários servidores.
Para obter um bach, primeiro você precisa descobrir seu ID:
select * from pgq.next_batch({ } text, { } text);
A função pode retornar NULL se o consumidor tiver processado todos os bachs. Nesse caso, você só precisa esperar até o pgqd criar um novo bach.
Nesse caso, enquanto o bach não for processado, essa função retornará o mesmo valor.
Você pode obter todos os eventos do lote usando:
select * from pgq.get_batch_events({id } bigint);
(O bach pode estar vazio.)
Se ocorreu um erro ao processar um deles, você pode tentar processar este evento posteriormente:
select * from pgq.event_retry({id } bigint, {id } bigint, { } integer);
Para informar sobre o final do bach e ter a oportunidade de iniciar um novo, ele é usado
select * from pgq.finish_batch({id } bigint);
Obviamente, essas não são todas as funções da extensão, recomendo a leitura de
pgq.imtqy.com/extension/pgq/files/external-sql.html e
github.com/pgq/pgq/pgq/tree/master/functions (cada arquivo contém uma definição e descrição função correspondente).
Compartilhamento de carga
Para lidar com eventos simultaneamente por vários manipuladores, existe a extensão pgq_coop, que funciona como pgq e adiciona uma nova entidade chamada "sub consumidor", que receberá todos os eventos a partir do momento do registro do consumidor pai, é claro, exceto os que já foram processados.
select * from pgq_coop.register_subconsumer({ } text, { } text, { } text);
select * from pgq_coop.next_batch({ } text, { } text, { } text);
select * from pgq_coop.next_batch({ } text, { } text, { } text, { , } interval);
select * from pgq_coop.finish_batch({id } bigint);
Leia sobre todos os recursos
aqui .
Instalação
A extensão pgq e o daemon pgqd estão incluídos nos repositórios PGDG e são instalados de maneira muito simples na maioria das distribuições, por exemplo, no Debian:
sudo apt install postgresql-XX-pgq3 pgqd
(XX é o número da versão).
O pgqd é um pequeno programa que pode ser descoberto sobre o uso do
pgqd --help
, não se esqueça de adicioná-lo à execução automática (
sudo systemctl enable pgqd.service
, e a configuração padrão é
/etc/pgqd.ini
).
Para começar a usar o PgQ, no banco de dados você só precisa conectar a extensão:
create extension if not exists pgq;
Com o pgq_coop, tudo é um pouco mais complicado, não está no repositório, mas não é difícil compilá-lo a partir dos fontes (exemplo para o Debian):
sudo apt install postgresql-server-dev-XX git clone https://github.com/pgq/pgq-coop.git cd pgq-coop sudo make install
e conecte a extensão usando
create extension if not exists pgq_coop;
Links úteis
Documentação pgqFunções pgqFunções Pgq_coopCódigo fonte do Pgqdconta do github com todos os projetos relacionadosPostgres da Wiki