PostgreSQL-Rezepte: Asynchroner Taskplaner

Um den asynchronen Taskplaner vorzubereiten, benötigen wir postgres selbst und seine Erweiterung pg_task . (Ich habe Links zu meinen Fork-Postgres angegeben, da ich einige Änderungen vorgenommen habe, die noch nicht in das ursprüngliche Repository verschoben werden konnten. Sie können auch das vorgefertigte Image verwenden .)

(Es gibt einen Fehler in PL / pgSQL im ursprünglichen PostgreSQL, aufgrund dessen mein Scheduler nicht richtig funktioniert, wenn eine nicht erfasste Ausnahme in einer in PL / pgSQL geschriebenen Aufgabe auftritt. Ich habe diesen Fehler hier beschrieben und in meiner Verzweigung hier behoben.)

Um den Scheduler zu installieren, müssen Sie keine Erweiterung in (jeder) Datenbank erstellen. Fügen Sie es stattdessen einfach der Konfigurationsdatei hinzu

shared_preload_libraries = 'pg_task' 

Nach dem Neustart von PostgreSQL erstellt der Scheduler im Auftrag von Datenbankbenutzern und in den Standardschemata für diese Benutzer Aufgabentabellen in allen Datenbanken.

Wenn der Scheduler nur für bestimmte Datenbanken ausgeführt werden soll, können Sie diese in der Konfigurationsdatei angeben

 pg_task.database = 'database1,database2' 

Wenn Sie den Scheduler nicht von Datenbankbenutzern ausführen möchten, kann dies auch als festgelegt werden

 pg_task.database = 'database1:user1,database2:user2' 

Wenn Sie Schedulertabellen erstellen müssen, die nicht im Standardschema für Benutzer enthalten sind, können Sie dies folgendermaßen festlegen

 pg_task_schema.database1 = schema3 

Wenn Sie die Schedulertabelle auch anders benennen müssen, können Sie dies folgendermaßen tun

 pg_task_table.database1 = table3 

Standardmäßig überprüft der Scheduler alle 1000 ms Aufgaben. Dies kann jedoch wie folgt geändert werden

 pg_task_period.database1 = 100 pg_task_period.database2 = 10 

Der Scheduler erstellt also (falls nicht bereits erstellt) (ggf. ein Schema und) eine Aufgabentabelle mit solchen Spalten

 id BIGSERIAL NOT NULL PRIMARY KEY, -- ,   dt TIMESTAMP NOT NULL DEFAULT NOW(), --     (- -   ) start TIMESTAMP, --     stop TIMESTAMP, --      queue TEXT NOT NULL DEFAULT 'default', --    (      ) max INT, --        (,      ) pid INT, --  ,    request TEXT NOT NULL, --  SQL   response TEXT, --    state TEXT NOT NULL DEFAULT 'QUEUE', --   (- - ,    , , ...) timeout INTERVAL, --      delete BOOLEAN NOT NULL DEFAULT false, --     ,    repeat INTERVAL, --    drift BOOLEAN NOT NULL DEFAULT true --        

Tatsächlich startet der Scheduler mehrere Hintergrund-Workflows: einen zum Verfolgen von Änderungen in der Konfigurationsdatei und zum Starten / Stoppen der anderen Hintergrundprozesse des Schedulers, falls erforderlich. Ein Hintergrundworkflow für jede Datenbank zum Überprüfen geplanter Aufgaben in jeder Datenbank und zum Starten von Aufgaben, falls erforderlich.

Wenn wir beispielsweise die Aufgabe so schnell wie möglich erledigen möchten, führen wir den SQL-Befehl aus

 INSERT INTO task (request) VALUES ('SELECT now()') 

Der Scheduler schreibt das Ergebnis der Aufgabe in Textform in die Ergebnisspalte. Wenn infolge der Aufgabenausführung mehrere Spalten vorhanden sind, fügt der Scheduler diese zusammen mit den Spaltentypen zum Header hinzu. Aufgrund der Aufgabe können auch mehrere Zeilen vorhanden sein, die alle zur Ergebnisspalte hinzugefügt werden.

Wenn wir eine Aufgabe beispielsweise nach 5 Minuten erledigen möchten, schreiben wir die geplante Zeit in die entsprechende Spalte

 INSERT INTO task (dt, request) VALUES (now() + '5 min':INTERVAL, 'SELECT now()') 

und wenn wir möchten, dass die Aufgabe zu einem bestimmten Zeitpunkt abgeschlossen wird, schreiben wir sie auf

 INSERT INTO task (dt, request) VALUES ('2019-07-01 00:00:00', 'SELECT now()') 

Wenn die Aufgabe alle 5 Minuten ausgeführt werden muss, schreiben wir so

 INSERT INTO task (repeat, request) VALUES ('5 min', 'SELECT now()') 

wenn ja schreiben

 INSERT INTO task (repeat, request, drift) VALUES ('5 min', 'SELECT now()', false) 

Dann wird die Aufgabe 5 Minuten nach Abschluss der Aufgabe wiederholt (und nicht standardmäßig nach der geplanten Zeit).

Wenn während der Ausführung einer Aufgabe eine Ausnahme auftritt, wird diese abgefangen und in Textform in die Ergebnisspalte geschrieben, und der Aufgabe wird der entsprechende Status zugewiesen.

Zum Beispiel

 INSERT INTO task (request) VALUES ('SELECT 1/0') 

Wenn für eine Aufgabenwarteschlange nicht mehr als 2 Aufgaben gleichzeitig ausgeführt werden müssen, erstellen wir Aufgaben mit dem Befehl

 INSERT INTO task (queue, max, request) VALUES ('queue', 2, 'SELECT now()') 

Angenommen, wir haben in dieser Warteschlange viele Aufgaben gesammelt und sie werden gleichzeitig von 2 ausgeführt. Wenn Sie mit dem Befehl eine Aufgabe erstellen

 INSERT INTO task (queue, max, request) VALUES ('queue', 3, 'SELECT now()') 

dann wird es so bald wie möglich von allen anderen Aufgaben in dieser Warteschlange ausgeführt, d.h. Es ist immer noch so etwas wie Priorität

Source: https://habr.com/ru/post/de456722/


All Articles