PostgreSQL-Task-Warteschlange

Elefantenschlange - pixabay.com


Warteschlangen werden verwendet, um die Verarbeitung des Aufgabenflusses zu organisieren. Sie werden für die Anhäufung und Verteilung von Aufgaben unter den ausübenden Künstlern benötigt. Warteschlangen können auch zusätzliche Anforderungen für die Bearbeitung von Aufgaben enthalten: Zustellgarantie, einmalige Garantie, Priorisierung usw.


In der Regel werden vorgefertigte Message Queue-Systeme verwendet (MQ - Message Queue). Manchmal müssen Sie jedoch eine Ad-hoc-Warteschlange oder eine spezialisierte Warteschlange organisieren (z. B. eine Prioritätswarteschlange und ein verzögerter Neustart von Aufgaben, die aufgrund von Ausnahmen nicht verarbeitet wurden). Die Erstellung solcher Warteschlangen wird nachstehend erörtert.


Anwendbarkeitsbeschränkungen


Die vorgeschlagenen Lösungen sind für den Fluss ähnlicher Aufgaben ausgelegt. Sie eignen sich nicht zum Organisieren von Pub / Sub oder Messaging zwischen lose gekoppelten Systemen und Komponenten.


Eine Warteschlange über einer relationalen Datenbank eignet sich gut für kleine und mittlere Lasten (Hunderttausende von Aufgaben pro Tag, Zehn- bis Hunderttausende von Performern). Für große Threads ist es jedoch besser, eine spezielle Lösung zu verwenden.


Das Wesen der Methode in fünf Worten


select ... for update skip locked 

Basislinie


Der Einfachheit halber werden nachfolgend nur eindeutige Aufgabenidentifizierer in der Tabelle gespeichert. Das Hinzufügen einer Art von Nutzlast sollte nicht schwierig sein.


Die Tabelle für die einfachste Warteschlange enthält die Aufgabe selbst und ihren Status:


 create table task ( id bigint not null primary key, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__idx on task (status); 

Aufgabe hinzufügen:


 insert into task (id) values ($1) on conflict (id) do nothing; 

Die folgende Aufgabe erhalten:


 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Aufgabenerfüllung:


 update task set status = 2 where id = $1; 

Prioritätswarteschlange


Im einfachen Fall hat die Task- id Priorität. Nur die Anforderung für die nächste Aufgabe wird geändert - die Sortierbedingungsreihenfolge order by id mit der erforderlichen Reihenfolge der Verarbeitungsaufgaben wird hinzugefügt. Sie müssen auch einen zusammengesetzten Index nach (status, id) erstellen.


Oder es wird vorrangig eine separate Spalte hinzugefügt:


 create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__priority__idx on task (status, priority); 

Aufgabe hinzufügen:
 insert into task (id, priority) values ($1, $2) on conflict (id) do nothing; 

Die folgende Aufgabe erhalten:
 with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

In der hervorgehobenen Spalte können Sie die Priorität der Aufgabe im laufenden Betrieb ändern.


Warteschlange mit einer Wiederholung der "gefallenen" Aufgaben


Während der Ausführung der Aufgabe kann ein Fehler oder eine Ausnahme auftreten. In solchen Fällen muss die Aufgabe erneut in die Warteschlange gestellt werden. Manchmal ist es dennoch erforderlich, den Zeitpunkt der wiederholten Ausführung um einige Zeit zu verschieben, z. B. wenn die Ausnahme auf die vorübergehende Nichtverfügbarkeit eines Dienstes eines Drittanbieters zurückzuführen ist.


 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, delayed_to timestamp null, error_text text null ); create index task__status__delayed_to__idx on task (status, delayed_to); 

Wie Sie sehen, wurde die Liste der Status erweitert und neue Spalten hinzugefügt:


  • attempt - Anzahl der Versuche; Erforderlich, um eine Entscheidung über die Notwendigkeit eines erneuten Versuchs zu treffen (die Anzahl der Versuche zu begrenzen) und eine Verzögerung vor dem erneuten Versuch auszuwählen (z. B. wird jeder nachfolgende Versuch um 10 * attempt Versuchminuten verzögert).
  • delayed_to - Zeit des nächsten Versuchs, die Aufgabe abzuschließen;
  • error_text - Fehlertext.

Der Fehlertext wird zum Gruppieren nach Fehlertyp benötigt.


Ein Beispiel. Das Überwachungssystem meldet, dass sich Tausende von Aufgaben mit dem Status "Fehler" in der Warteschlange angesammelt haben. Wir erfüllen den Wunsch:


 select error_text, count(*) from task where status = 3 group by 1 order by 2 desc; 

Einzelheiten finden Sie in den Protokollen der ausübenden Künstler. Korrigieren Sie die Situation, die den Fehler verursacht hat (falls möglich). Bei Bedarf beschleunigen wir den Neustart von Tasks, indem wir den Status auf 0 setzen oder den Zeitpunkt des nächsten Versuchs verschieben.


Die folgende neue Aufgabe bekommen:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

Die folgende Aufgabe steht aufgrund eines Fehlers aus:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

Erfolgreicher Abschluss der Aufgabe:
 update task set status = 2, delayed_to = null, error_text = null where id = $1; 

Die Aufgabe ist fehlgeschlagen, es wird eine Wiederholung in (5 * Anzahl Versuche) Minuten durchgeführt:
 update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1; 

Die Aufgabe wurde mit einem schwerwiegenden Fehler abgeschlossen. Es wird nicht wiederholt:
 update task set status = 4, delayed_to = null, error_text = $2 where id = $1; 

Die Anforderung für die nächste Task ist zweigeteilt, damit das DBMS einen effektiven Abfrageplan für die Prioritätswarteschlange erstellen kann. Eine Auswahlbedingung mit or kann bei der Sortierung nach sehr schief gehen.


Metrics-Auflistung


Fügen Sie die folgenden Attribute hinzu:


  • Erstellungszeit der Aufgabe;
  • Aufgabenwechselzeit;
  • Start- und Endzeit der Aufgabe.

 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); create index task__status__delayed_to__idx on task (status, delayed_to); create index task__updated__idx on task (updated); 

Wir berücksichtigen die hinzugefügten Spalten in allen Abfragen.


Die folgende neue Aufgabe bekommen:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

Die folgende Aufgabe steht aufgrund eines Fehlers aus:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

Erfolgreicher Abschluss der Aufgabe:
 update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

Die Aufgabe ist fehlgeschlagen, es wird eine Wiederholung in (5 * Anzahl Versuche) Minuten durchgeführt:
 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1; 

Die Aufgabe wurde mit einem schwerwiegenden Fehler abgeschlossen. Es wird nicht wiederholt:
 update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1; 

Beispiele, warum dies erforderlich sein könnte


Suchen und starten Sie baumelnde Aufgaben neu:


 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour'; 

Alte Aufgaben entfernen:


 delete from task where updated < localtimestamp - interval '30 days'; 

Statistiken zum Abschließen von Aufgaben:


 select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1; 

Starten Sie zuvor erledigte Aufgaben neu


Wenn ein Dokument beispielsweise aktualisiert wird, müssen Sie es für die Volltextsuche neu indizieren.


 create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); 

Hier wird die Spalte task_updated_at für die task_updated_at hinzugefügt, das created Feld kann jedoch verwendet werden.


Hinzufügen oder Aktualisieren (Neustarten) einer Aufgabe:


 insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at; 

Was ist hier los? Eine Aufgabe wird "neu", wenn sie gerade nicht erledigt wird.


Die Anforderung zum Abschließen der Aufgabe prüft auch, ob sie während der Ausführung geändert wurde.


Anforderungen für die nächste Aufgabe sind dieselben wie in der Warteschlange zum Sammeln von Metriken.


Erfolgreicher Abschluss der Aufgabe:


 update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

Die Beendigung der Aufgabe mit einem Fehler: abhängig von der Aufgabe. Sie können den Neustart bedingungslos verzögern und den Status beim Aktualisieren auf "neu" setzen.


Pipeline


Die Aufgabe durchläuft mehrere Stufen. Sie können für jede Stufe eine eigene Warteschlange erstellen. Oder Sie fügen der Tabelle die entsprechende Spalte hinzu.


Ein Beispiel, das auf der Basiswarteschlange basiert, um den Code nicht zu überladen. Alle zuvor beschriebenen Änderungen können problemlos auf diese Warteschlange angewendet werden.


 create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status); 

Die folgende Aufgabe zu einem bestimmten Zeitpunkt ausführen:


 with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Abschluss der Aufgabe mit dem Übergang in die angegebene Phase:


 update task set stage = $2, status = 2 where id = $1; 

Oder Übergang zur nächsten Stufe in der Reihenfolge:


 update task set stage = stage + 1, status = 2 where id = $1; 

Geplante Aufgaben


Dies ist eine Variation der Wiederholungswarteschlange.


Jede Aufgabe kann einen eigenen Zeitplan haben (in der einfachsten Version die Häufigkeit des Starts).


 create table task ( id bigint not null primary key, period integer not null, --     status integer not null default 0, -- 0 - , 1 -   next_run_time timestamp not null default localtimestamp ); create index task__status__next_run_time__idx on task (status, next_run_time); 

Aufgabe hinzufügen:


 insert into task (id, period, next_run_time) values ($1, $2, $3); 

Die folgende Aufgabe erhalten:


 with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Abschluss der Aufgabe und Planung des nächsten Laufs:


 update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1 

Anstelle einer Schlussfolgerung


Das Erstellen einer speziellen Task-Warteschlange mit RDBMS-Tools ist nicht kompliziert.


Die "selbstgemachte" Warteschlange wird antworten sogar die wildesten praktisch jede geschäftliche / Domain-Anforderung.


Nun, wir sollten nicht vergessen, dass die Warteschlange wie jede andere Datenbank eine sorgfältige Optimierung des Servers, der Abfragen und der Indizes erfordert.

Source: https://habr.com/ru/post/de481556/


All Articles