Colas de mensajes PostgreSQL usando PgQ



Las colas de mensajes se utilizan para realizar: operaciones pendientes, interacción de servicios entre sí, "procesamiento por lotes", etc. Existen soluciones especializadas para organizar tales colas, tales como: RabbitMQ, ActiveMQ, ZeroMQ, etc., pero a menudo sucede que no son de gran necesidad, y su instalación y soporte causarán más dolor y sufrimiento que bien. Suponga que tiene un servicio al registrarse en el que se envía un correo electrónico al usuario para su confirmación, y si usa Postgres, entonces tiene suerte: en Postgres, casi listo para usar, hay una extensión PgQ que hará todo el trabajo sucio por usted.

En este artículo hablaré sobre la cola de mensajes (tareas) en PostgreSQL usando la extensión PgQ. Este artículo será útil si no ha utilizado PgQ o está utilizando colas autoescritas encima de Postgres.

¿Por qué necesita PgQ en absoluto, si solo puede crear una tableta y escribir tareas allí? Parecería posible, pero tendrá que tener en cuenta el acceso paralelo a las tareas, los posibles errores (¿qué sucederá si el proceso que procesa la tarea cae?), Así como el rendimiento (PgQ es muy rápido, y las soluciones autoescritas generalmente no lo son, especialmente si la transacción está en la base de datos no se cierra durante la ejecución completa de la tarea), pero la razón más importante por la que, en mi opinión, es necesario usar PgQ es que PgQ ya está escrito y funciona, y la solución autoescrita aún necesita ser escrita (UPD: sobre por qué no vale la pena usar colas autoescritas) , puedes leer, por ejemplo, aquí ).
(UPD: dado que PgQ se ejecuta sobre Postgres, también se pueden usar todas las delicias de las transacciones)

Pero PgQ tiene un gran inconveniente: la falta de documentación, trato de compensar esta deficiencia con este artículo.

Dispositivo


PgQ consta de partes (al menos 2): 1 - la extensión pgq para postgres, 2 - el demonio pgqd (sobre cómo instalarlas un poco más tarde).

Toda interacción con la cola se lleva a cabo utilizando funciones dentro de Postgres.

Por ejemplo, para crear una cola, debe ejecutar

select * from pgq.create_queue({ } text); 

Después de crear la cola, puede agregarle mensajes.

 select * from pgq.insert_event({ } text, { } text, {  } text); 

Ahora necesitamos aprender a recibir mensajes grabados. Para esto, existe una entidad como "consumidor" (escribiré un consumidor), que no recibe mensajes (eventos), sino "lote" (lote). Un bach es un grupo de mensajes consecutivos, los baches se crean usando pgqd. Periódicamente (el parámetro "ticker_period" en el archivo de configuración) pgqd toma todos los mensajes acumulados y los escribe en un nuevo bach. Es importante si pgqd no funciona, entonces no se crean nuevos bachs, lo que significa que los consumidores no tienen nada que leer, también si pgqd no funcionó durante mucho tiempo y luego se encendió, creará un gran bach de los mensajes acumulados durante este tiempo, por lo tanto, pgqd no debe ser solo apágalo.

Registro de un consumidor ( ¡Importante! Un consumidor recibirá eventos grabados solo después de su registro, por lo tanto, primero debe crear un consumidor, y solo luego escribir eventos):

 select * from pgq.register_consumer({ } text, { } text); 

(similar a pgq.unregister_consumer)
Cada consumidor recibirá absolutamente todos los eventos que ocurrieron después de su creación (incluso procesados ​​por otro consumidor), lo que significa que lo más probable es que solo necesite un consumidor por turno. A continuación, le diré cómo dividir la carga en varios servidores.

Para obtener un bach, primero debe encontrar su ID:

 select * from pgq.next_batch({ } text, { } text); 

La función puede devolver NULL si el consumidor ha procesado todos los bachs. En este caso, solo tiene que esperar hasta que pgqd cree un nuevo bach.

En este caso, mientras el bach no se procesa, esta función devolverá el mismo valor.
Puede obtener todos los eventos en el lote usando:

 select * from pgq.get_batch_events({id } bigint); 

(El bach puede estar vacío).

Si se produjo un error al procesar uno de ellos, puede intentar procesar este evento más tarde:

 select * from pgq.event_retry({id } bigint, {id } bigint, {   } integer); 

Para informar sobre el final del bach y tener la oportunidad de comenzar uno nuevo, se utiliza

 select * from pgq.finish_batch({id } bigint); 

Por supuesto, estas no son todas las funciones en la extensión, recomiendo leer pgq.imtqy.com/extension/pgq/files/external-sql.html y github.com/pgq/pgq/tree/master/functions (cada archivo contiene una definición y descripción función correspondiente).

Compartir la carga


Para procesar eventos simultáneamente por varios manejadores, hay una extensión pgq_coop que funciona como pgq y agrega una nueva entidad "subconsumidor", que recibirá todos los eventos desde el momento del registro del consumidor principal, por supuesto, excepto aquellos ya procesados.

 select * from pgq_coop.register_subconsumer({ } text, { } text, { } text); 

 select * from pgq_coop.next_batch({ } text, { } text, { } text); 

 select * from pgq_coop.next_batch({ } text, { } text, { } text, {        ,      } interval); 

 select * from pgq_coop.finish_batch({id } bigint); 

Lea sobre todas las características aquí .

Instalación


La extensión pgq y el demonio pgqd se incluyen en los repositorios PGDG y se instalan en la mayoría de las distribuciones de manera muy simple, por ejemplo, en Debian:

sudo apt install postgresql-XX-pgq3 pgqd (XX es el número de versión).

pgqd es un pequeño programa que se puede encontrar sobre el uso de pgqd --help , no olvide agregarlo a la ejecución automática ( sudo systemctl enable pgqd.service , y la configuración predeterminada es /etc/pgqd.ini ).

Para comenzar a usar PgQ, en la base de datos solo necesita conectar la extensión:

 create extension if not exists pgq; 


Con pgq_coop, todo es un poco más complicado, no está en el repositorio, pero no es difícil compilarlo desde las fuentes (ejemplo para Debian):

 sudo apt install postgresql-server-dev-XX git clone https://github.com/pgq/pgq-coop.git cd pgq-coop sudo make install 

y conecte la extensión usando

 create extension if not exists pgq_coop; 

Enlaces utiles


Documentación Pgq
Funciones Pgq
Funciones Pgq_coop
Código fuente Pgqd
cuenta github con todos los proyectos relacionados
Wiki postgres

Source: https://habr.com/ru/post/483014/


All Articles