File d'attente des tâches PostgreSQL

Éléphant File D'attente - pixabay.com


Les files d'attente sont utilisées pour organiser le traitement du flux de tâches. Ils sont nécessaires pour l'accumulation et la répartition des tâches entre les interprètes. Les files d'attente peuvent également fournir des exigences supplémentaires pour les tâches de traitement: garantie de livraison, garantie unique, priorisation, etc.


En règle générale, des systèmes de file d'attente de messages prêts à l'emploi sont utilisés (MQ - file d'attente de messages), mais parfois vous devez organiser une file d'attente ad hoc ou une file d'attente spécialisée (par exemple, une file d'attente prioritaire et un redémarrage retardé des tâches qui n'ont pas été traitées en raison d'exceptions). La création de telles files d'attente sera discutée ci-dessous.


Limitations d'applicabilité


Les solutions proposées sont conçues pour gérer le flux de tâches similaires. Ils ne conviennent pas à l'organisation de pub / sub ou de messagerie entre des systèmes et des composants à couplage lâche.


Une file d'attente au-dessus d'une base de données relationnelle fonctionne bien pour les petites et moyennes charges (des centaines de milliers de tâches par jour, des dizaines à des centaines d'artistes), mais pour les gros threads, il est préférable d'utiliser une solution spécialisée.


L'essence de la méthode en cinq mots


select ... for update skip locked 

Ligne de base


Par souci de simplicité, ci-après, seuls les identificateurs de tâche uniques seront stockés dans le tableau. L'ajout d'une sorte de charge utile ne devrait pas être difficile.


Le tableau de la file d'attente la plus simple contient la tâche elle-même et son état:


 create table task ( id bigint not null primary key, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__idx on task (status); 

Ajout d'une tâche:


 insert into task (id) values ($1) on conflict (id) do nothing; 

Obtenir la tâche suivante:


 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Achèvement de la tâche:


 update task set status = 2 where id = $1; 

File d'attente prioritaire


Dans le cas simple, l' id tâche est sa priorité. Seule la demande pour la tâche suivante est modifiée - l' order by id conditions de tri order by id avec l'ordre requis des tâches de traitement est ajouté. Vous devez également créer un index composite par (status, id) .


Ou, pour la priorité, une colonne distincte est ajoutée:


 create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__priority__idx on task (status, priority); 

Ajout d'une tâche:
 insert into task (id, priority) values ($1, $2) on conflict (id) do nothing; 

Obtenir la tâche suivante:
 with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

La colonne en surbrillance vous permet de modifier la priorité de la tâche à la volée.


File d'attente avec une répétition des tâches "tombées"


Une erreur ou une exception peut se produire lors de l'exécution de la tâche. Dans de tels cas, la tâche doit être à nouveau mise en file d'attente. Parfois, il est encore nécessaire de reporter le temps de son exécution répétée pendant un certain temps, par exemple, si l'exception est due à l'indisponibilité temporaire d'un service tiers.


 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, delayed_to timestamp null, error_text text null ); create index task__status__delayed_to__idx on task (status, delayed_to); 

Comme vous pouvez le voir, la liste des statuts s'est étendue et de nouvelles colonnes ont été ajoutées:


  • attempt - nombre de tentatives; nécessaire pour prendre une décision sur la nécessité de réessayer (limiter le nombre de tentatives) et pour sélectionner un délai avant de réessayer (par exemple, chaque tentative suivante est retardée de 10 * attempt minutes de 10 * attempt );
  • delayed_to - heure de la prochaine tentative de terminer la tâche;
  • error_text - texte d'erreur.

Le texte d'erreur est nécessaire pour regrouper par type d'erreur.


Un exemple. Le système de surveillance signale que des milliers de tâches avec le statut "erreur" se sont accumulées dans la file d'attente. Nous répondons à la demande:


 select error_text, count(*) from task where status = 3 group by 1 order by 2 desc; 

Pour plus de détails, accédez aux journaux des interprètes. Corrigez la situation à l'origine de l'erreur (si possible). Si nécessaire, nous accélérons le redémarrage des tâches en mettant l'état à 0 ou en décalant l'heure de la prochaine tentative.


Obtention de la nouvelle tâche suivante:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

Obtention de la tâche suivante en attente en raison d'une erreur:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

Achèvement réussi de la tâche:
 update task set status = 2, delayed_to = null, error_text = null where id = $1; 

La tâche a échoué, il y aura une répétition dans (5 * nombre de tentatives) minutes:
 update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1; 

La tâche s'est terminée avec une erreur fatale, il n'y aura pas de nouvelle tentative:
 update task set status = 4, delayed_to = null, error_text = $2 where id = $1; 

La demande pour la tâche suivante est divisée en deux afin que le SGBD puisse créer un plan de requête efficace pour la file d'attente prioritaire. Une condition de sélection avec or peut aller très mal avec l' order by tri order by .


Collection de métriques


Ajoutez les attributs suivants:


  • temps de création de tâche;
  • temps de changement de tâche;
  • heure de début et de fin de la tâche.

 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); create index task__status__delayed_to__idx on task (status, delayed_to); create index task__updated__idx on task (updated); 

Nous considérons les colonnes ajoutées dans toutes les requêtes.


Obtention de la nouvelle tâche suivante:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

Obtention de la tâche suivante en attente en raison d'une erreur:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

Achèvement réussi de la tâche:
 update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

La tâche a échoué, il y aura une répétition dans (5 * nombre de tentatives) minutes:
 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1; 

La tâche s'est terminée avec une erreur fatale, il n'y aura pas de nouvelle tentative:
 update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1; 

Exemples de pourquoi cela pourrait être nécessaire


Recherchez et redémarrez les tâches pendantes:


 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour'; 

Suppression des anciennes tâches:


 delete from task where updated < localtimestamp - interval '30 days'; 

Statistiques pour l'exécution des tâches:


 select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1; 

Redémarrez les tâches précédemment terminées


Par exemple, un document est mis à jour, vous devez le réindexer pour une recherche en texte intégral.


 create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); 

Ici, la colonne task_updated_at est ajoutée pour l'heure de mise à jour de la tâche, mais le champ created peut être utilisé.


Ajouter ou mettre à jour (redémarrer) une tâche:


 insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at; 

Que se passe-t-il ici. Une tâche devient «nouvelle» si elle n'est pas terminée maintenant.


La demande de terminer la tâche vérifiera également si elle a été modifiée pendant l'exécution.


Les demandes pour la tâche suivante sont les mêmes que dans la file d'attente de collecte des métriques.


Achèvement réussi de la tâche:


 update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

L'achèvement de la tâche avec une erreur: selon la tâche. Vous pouvez faire un délai inconditionnel dans le redémarrage, vous pouvez définir le statut sur "nouveau" lors de la mise à jour.


Pipeline


La tâche passe par plusieurs étapes. Vous pouvez créer une file d'attente distincte pour chaque étape. Ou vous pouvez ajouter la colonne correspondante au tableau.


Un exemple basé sur la file d'attente de base pour ne pas encombrer le code. Toutes les modifications décrites précédemment peuvent être appliquées à cette file d'attente sans aucun problème.


 create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status); 

Obtenir la tâche suivante à un stade donné:


 with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Achèvement de la tâche avec le passage à l'étape indiquée:


 update task set stage = $2, status = 2 where id = $1; 

Ou passez à l'étape suivante dans l'ordre:


 update task set stage = stage + 1, status = 2 where id = $1; 

Tâches planifiées


Il s'agit d'une variante de la file d'attente de répétition.


Chaque tâche peut avoir son propre planning (dans la version la plus simple, la fréquence de lancement).


 create table task ( id bigint not null primary key, period integer not null, --     status integer not null default 0, -- 0 - , 1 -   next_run_time timestamp not null default localtimestamp ); create index task__status__next_run_time__idx on task (status, next_run_time); 

Ajout d'une tâche:


 insert into task (id, period, next_run_time) values ($1, $2, $3); 

Obtenir la tâche suivante:


 with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Terminer la tâche et planifier la prochaine exécution:


 update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1 

Au lieu d'une conclusion


Il n'y a rien de compliqué à créer une file d'attente de tâches spécialisée à l'aide des outils SGBDR.


La file d'attente "self-made" répondra même le plus sauvage pratiquement toutes les exigences commerciales / de domaine.


Eh bien, nous ne devons pas oublier que, comme toute autre base de données, la file d'attente nécessite un réglage réfléchi du serveur, des requêtes et des index.

Source: https://habr.com/ru/post/fr481556/


All Articles