PostgreSQL-Nachrichtenwarteschlangen mit PgQ



Nachrichtenwarteschlangen werden für folgende Aufgaben verwendet: ausstehende Vorgänge, Interaktion zwischen Diensten, Stapelverarbeitung usw. Es gibt spezielle Lösungen für das Organisieren solcher Warteschlangen, wie z. B. RabbitMQ, ActiveMQ, ZeroMQ usw., aber es kommt häufig vor, dass sie keinen großen Bedarf haben und ihre Installation und Unterstützung mehr Schmerzen und Leiden verursachen als Vorteile bringen. Angenommen, Sie haben bei der Registrierung einen Dienst, bei dem eine E-Mail zur Bestätigung an den Benutzer gesendet wird, und wenn Sie Postgres verwenden, haben Sie Glück - in Postgres gibt es fast sofort eine PgQ-Erweiterung, die die ganze Drecksarbeit für Sie erledigt.

In diesem Artikel werde ich über Message Queuing (Aufgaben) in PostgreSQL unter Verwendung der PgQ-Erweiterung sprechen. Dieser Artikel ist nützlich, wenn Sie PgQ nicht verwendet haben oder selbst geschriebene Warteschlangen über Postgres verwenden.

Warum brauchen Sie überhaupt PgQ, wenn Sie nur ein Tablet erstellen und dort Aufgaben schreiben können? Es scheint möglich, aber Sie müssen den parallelen Zugriff auf Aufgaben, mögliche Fehler (was passiert, wenn der Prozess, der die Aufgabe bearbeitet, abstürzt?) Sowie die Leistung berücksichtigen (PgQ ist sehr schnell und selbstgeschriebene Lösungen normalerweise nicht, insbesondere wenn die Transaktion ausgeführt wird) Die Datenbank wird nicht während der gesamten Ausführung der Aufgabe geschlossen. Der wichtigste Grund für die meiner Meinung nach notwendige Verwendung von PgQ ist jedoch, dass PgQ bereits geschrieben ist und funktioniert und die selbstgeschriebene Lösung noch geschrieben werden muss (UPD: Warum lohnt es sich nicht, selbstgeschriebene Warteschlangen zu verwenden? können Sie hier lesen).
(UPD: Da PgQ auf Postgres läuft, können auch alle Vorzüge von Transaktionen darin verwendet werden.)

Aber PgQ hat ein großes Minus - die fehlende Dokumentation versuche ich mit diesem Artikel zu kompensieren.

Gerät


PgQ besteht aus Teilen (mindestens 2): 1 - die pgq-Erweiterung für postgres, 2 - der pgqd-Daemon (etwa, um sie etwas später zu installieren).

Alle Interaktionen mit der Warteschlange werden mithilfe von Funktionen in Postgres ausgeführt.

Um beispielsweise eine Warteschlange zu erstellen, müssen Sie ausführen

select * from pgq.create_queue({ } text); 

Nachdem die Warteschlange erstellt wurde, können Sie ihr Nachrichten hinzufügen.

 select * from pgq.insert_event({ } text, { } text, {  } text); 

Jetzt müssen wir lernen, wie man aufgezeichnete Nachrichten empfängt. Hierfür gibt es eine Entität wie "Consumer" (ich werde einen Consumer schreiben), die keine Nachrichten (Ereignisse), sondern "Batch" (Batch) empfängt. Ein Cache ist eine Gruppe aufeinanderfolgender Nachrichten. Cache-Speicher werden mit pgqd erstellt. In regelmäßigen Abständen (der Parameter „ticker_period“ in der Konfigurationsdatei) nimmt pgqd alle angesammelten Nachrichten entgegen und schreibt sie in einen neuen Cache. Es ist wichtig, dass, wenn pgqd nicht funktioniert, keine neuen Bachs erstellt werden, was bedeutet, dass die Verbraucher nichts zu lesen haben. Wenn pgqd lange nicht funktioniert und dann eingeschaltet wird, wird aus den in dieser Zeit gesammelten Nachrichten ein großer Bach erstellt. Daher sollte pgqd nicht erstellt werden schalte es einfach aus.

Registrierung eines Verbrauchers ( Wichtig! Ein Verbraucher erhält Ereignisse, die erst nach seiner Registrierung aufgezeichnet wurden. Daher sollten Sie zuerst einen Verbraucher erstellen und erst dann Ereignisse schreiben):

 select * from pgq.register_consumer({ } text, { } text); 

(ähnlich wie pgq.unregister_consumer)
Jeder Verbraucher erhält absolut alle Ereignisse , die nach seiner Erstellung aufgetreten sind (auch von einem anderen Verbraucher verarbeitet), was bedeutet, dass Sie höchstwahrscheinlich nur einen Verbraucher für eine Runde benötigen. Als nächstes erkläre ich Ihnen, wie Sie die Last auf mehrere Server aufteilen.

Um einen Bach zu bekommen, müssen Sie zuerst seine ID herausfinden:

 select * from pgq.next_batch({ } text, { } text); 

Die Funktion kann NULL zurückgeben, wenn der Consumer alle Bachs verarbeitet hat. In diesem Fall müssen Sie nur warten, bis pgqd einen neuen Bach erstellt.

In diesem Fall gibt diese Funktion denselben Wert zurück, solange der Cache nicht verarbeitet wird.
Sie können alle Ereignisse im Stapel abrufen, indem Sie Folgendes verwenden:

 select * from pgq.get_batch_events({id } bigint); 

(Der Bach ist möglicherweise leer.)

Wenn bei der Verarbeitung eines dieser Ereignisse ein Fehler aufgetreten ist, können Sie versuchen, dieses Ereignis später zu verarbeiten:

 select * from pgq.event_retry({id } bigint, {id } bigint, {   } integer); 

Um über das Ende des Bachs zu informieren und die Möglichkeit zu bekommen, einen neuen zu starten, wird dieser genutzt

 select * from pgq.finish_batch({id } bigint); 

Natürlich sind dies nicht alle Funktionen in der Erweiterung. Ich empfehle pgq.imtqy.com/extension/pgq/files/external-sql.html und github.com/pgq/pgq/tree/master/functions (jede Datei enthält eine Definition und eine Beschreibung) entsprechende Funktion).

Lastverteilung


Damit Ereignisse von mehreren Handlern gleichzeitig verarbeitet werden können, gibt es die Erweiterung pgq_coop, die wie pgq funktioniert und eine neue Entität namens "sub consumer" hinzufügt, die natürlich alle Ereignisse ab dem Zeitpunkt der Registrierung des übergeordneten Verbrauchers empfängt, mit Ausnahme derjenigen, die bereits verarbeitet wurden.

 select * from pgq_coop.register_subconsumer({ } text, { } text, { } text); 

 select * from pgq_coop.next_batch({ } text, { } text, { } text); 

 select * from pgq_coop.next_batch({ } text, { } text, { } text, {        ,      } interval); 

 select * from pgq_coop.finish_batch({id } bigint); 

Lesen Sie hier alles über die Funktionen.

Installation


Die pgq-Erweiterung und der pgqd-Daemon sind in den PGDG-Repositorys enthalten und werden in den meisten Distributionen sehr einfach installiert, zum Beispiel in Debian:

sudo apt install postgresql-XX-pgq3 pgqd (XX ist die Versionsnummer).

pgqd ist ein kleines Programm, das über die Verwendung von pgqd --help Sie nicht, es zu sudo systemctl enable pgqd.service hinzuzufügen ( sudo systemctl enable pgqd.service und die Standardkonfiguration ist /etc/pgqd.ini ).

Um PgQ zu verwenden, müssen Sie in der Datenbank nur die Erweiterung verbinden:

 create extension if not exists pgq; 


Mit pgq_coop ist alles etwas komplizierter, es befindet sich nicht im Repository, aber es ist nicht schwierig, es aus den Quellen zu kompilieren (Beispiel für Debian):

 sudo apt install postgresql-server-dev-XX git clone https://github.com/pgq/pgq-coop.git cd pgq-coop sudo make install 

und verbinden Sie die Nebenstelle mit

 create extension if not exists pgq_coop; 

Nützliche Links


Pgq-Dokumentation
Pgq-Funktionen
Pgq_coop funktioniert
Pgqd-Quellcode
github account mit allen verwandten projekten
Wiki Postgres

Source: https://habr.com/ru/post/de483014/


All Articles