Recettes PostgreSQL: Planificateur de tâches asynchrones

Pour préparer le planificateur de tâches asynchrone, nous avons besoin de postgres lui-même et de son extension pg_task . (J'ai donné des liens vers ma fourchette de postgres, car j'ai apporté des modifications qui n'ont pas encore été entassées dans le référentiel d'origine. Vous pouvez également utiliser l' image prête à l' emploi .)

(Il y a une erreur dans PL / pgSQL dans PostgreSQL d'origine, à cause de laquelle mon ordonnanceur ne fonctionne pas correctement lorsqu'une exception non interceptée se produit dans une tâche écrite en PL / pgSQL. J'ai décrit cette erreur ici et l'ai corrigée dans ma fourchette ici .)

Pour installer le planificateur, vous n'avez pas besoin de créer une extension dans (chaque) base de données. Au lieu de cela, ajoutez-le simplement au fichier de configuration

shared_preload_libraries = 'pg_task' 

Après avoir redémarré PostgreSQL, le planificateur créera des tables de tâches dans toutes les bases de données au nom des utilisateurs de base de données et dans les schémas par défaut pour ces utilisateurs.

Si vous souhaitez que le planificateur s'exécute uniquement pour des bases de données spécifiques, vous pouvez les spécifier dans le fichier de configuration

 pg_task.database = 'database1,database2' 

Si vous ne souhaitez pas exécuter le planificateur à partir des utilisateurs de la base de données, cela peut également être défini comme

 pg_task.database = 'database1:user1,database2:user2' 

Si vous devez créer des tables de planificateur qui ne sont pas dans le schéma par défaut pour les utilisateurs, vous pouvez le définir comme ceci

 pg_task_schema.database1 = schema3 

Si vous devez également nommer la table du planificateur différemment, vous pouvez le faire comme ceci

 pg_task_table.database1 = table3 

Par défaut, le planificateur vérifie les tâches toutes les 1000 ms, mais cela peut être modifié comme suit

 pg_task_period.database1 = 100 pg_task_period.database2 = 10 

Ainsi, le planificateur crée (s'il n'est pas déjà créé) (un schéma, si nécessaire, et) une table de tâches avec de telles colonnes

 id BIGSERIAL NOT NULL PRIMARY KEY, -- ,   dt TIMESTAMP NOT NULL DEFAULT NOW(), --     (- -   ) start TIMESTAMP, --     stop TIMESTAMP, --      queue TEXT NOT NULL DEFAULT 'default', --    (      ) max INT, --        (,      ) pid INT, --  ,    request TEXT NOT NULL, --  SQL   response TEXT, --    state TEXT NOT NULL DEFAULT 'QUEUE', --   (- - ,    , , ...) timeout INTERVAL, --      delete BOOLEAN NOT NULL DEFAULT false, --     ,    repeat INTERVAL, --    drift BOOLEAN NOT NULL DEFAULT true --        

En fait, le planificateur démarre plusieurs workflows d'arrière-plan: l'un pour suivre les modifications dans le fichier de configuration et démarrer / arrêter, si nécessaire, les autres processus d'arrière-plan du planificateur. Et un flux de travail en arrière-plan pour chaque base de données pour vérifier les tâches planifiées dans chaque base de données et lancer des tâches si nécessaire.

Par exemple, si nous voulons terminer la tâche le plus rapidement possible, nous exécutons la commande SQL

 INSERT INTO task (request) VALUES ('SELECT now()') 

Le planificateur écrit le résultat de la tâche dans la colonne de résultat sous forme de texte. Si, à la suite de l'exécution de la tâche, il y a plusieurs colonnes, le planificateur les ajoute à l'en-tête avec les types de colonne. De plus, à la suite de la tâche, il peut y avoir plusieurs lignes, toutes seront ajoutées à la colonne de résultat.

Si nous voulons terminer une tâche, par exemple, après 5 minutes, alors nous écrivons l'heure prévue dans la colonne correspondante

 INSERT INTO task (dt, request) VALUES (now() + '5 min':INTERVAL, 'SELECT now()') 

et si nous voulons que la tâche soit terminée à un moment précis, nous l'écrirons

 INSERT INTO task (dt, request) VALUES ('2019-07-01 00:00:00', 'SELECT now()') 

Si nous avons besoin que la tâche soit effectuée toutes les 5 minutes, alors nous écrivons comme ceci

 INSERT INTO task (repeat, request) VALUES ('5 min', 'SELECT now()') 

si c'est le cas, écrivez

 INSERT INTO task (repeat, request, drift) VALUES ('5 min', 'SELECT now()', false) 

puis la tâche sera répétée 5 minutes après la fin de la tâche (et non après l'heure programmée, comme par défaut).

Si une exception se produit pendant l'exécution d'une tâche, elle est interceptée et écrite sous forme de texte dans la colonne de résultat, et l'état correspondant est affecté à la tâche.

Par exemple

 INSERT INTO task (request) VALUES ('SELECT 1/0') 

S'il est nécessaire que pour une file d'attente de tâches pas plus de 2 tâches soient exécutées simultanément, alors nous créons des tâches par la commande

 INSERT INTO task (queue, max, request) VALUES ('queue', 2, 'SELECT now()') 

Supposons que dans cette file d'attente, nous avons accumulé beaucoup de tâches et qu'elles soient exécutées simultanément par 2. Si vous créez une tâche par

 INSERT INTO task (queue, max, request) VALUES ('queue', 3, 'SELECT now()') 

puis il sera exécuté dès que possible de toutes les autres tâches de cette file d'attente, c'est-à-dire c'est toujours quelque chose comme la priorité

Source: https://habr.com/ru/post/fr456722/


All Articles