Pour préparer le planificateur de tâches asynchrone, nous avons besoin de
postgres lui-même et de son extension
pg_task . (J'ai donné des liens vers ma fourchette de postgres, car j'ai apporté des modifications qui n'ont pas encore été entassées dans le référentiel d'origine. Vous pouvez également utiliser l'
image prête à l'
emploi .)
(Il y a une erreur dans PL / pgSQL dans PostgreSQL d'origine, à cause de laquelle mon ordonnanceur ne fonctionne pas correctement lorsqu'une exception non interceptée se produit dans une tâche écrite en PL / pgSQL. J'ai décrit cette erreur
ici et l'ai corrigée dans ma fourchette
ici .)
Pour installer le planificateur, vous n'avez pas besoin de créer une extension dans (chaque) base de données. Au lieu de cela, ajoutez-le simplement au fichier de configuration
shared_preload_libraries = 'pg_task'
Après avoir redémarré PostgreSQL, le planificateur créera des tables de tâches dans toutes les bases de données au nom des utilisateurs de base de données et dans les schémas par défaut pour ces utilisateurs.
Si vous souhaitez que le planificateur s'exécute uniquement pour des bases de données spécifiques, vous pouvez les spécifier dans le fichier de configuration
pg_task.database = 'database1,database2'
Si vous ne souhaitez pas exécuter le planificateur à partir des utilisateurs de la base de données, cela peut également être défini comme
pg_task.database = 'database1:user1,database2:user2'
Si vous devez créer des tables de planificateur qui ne sont pas dans le schéma par défaut pour les utilisateurs, vous pouvez le définir comme ceci
pg_task_schema.database1 = schema3
Si vous devez également nommer la table du planificateur différemment, vous pouvez le faire comme ceci
pg_task_table.database1 = table3
Par défaut, le planificateur vérifie les tâches toutes les 1000 ms, mais cela peut être modifié comme suit
pg_task_period.database1 = 100 pg_task_period.database2 = 10
Ainsi, le planificateur crée (s'il n'est pas déjà créé) (un schéma, si nécessaire, et) une table de tâches avec de telles colonnes
id BIGSERIAL NOT NULL PRIMARY KEY,
En fait, le planificateur démarre plusieurs workflows d'arrière-plan: l'un pour suivre les modifications dans le fichier de configuration et démarrer / arrêter, si nécessaire, les autres processus d'arrière-plan du planificateur. Et un flux de travail en arrière-plan pour chaque base de données pour vérifier les tâches planifiées dans chaque base de données et lancer des tâches si nécessaire.
Par exemple, si nous voulons terminer la tâche le plus rapidement possible, nous exécutons la commande SQL
INSERT INTO task (request) VALUES ('SELECT now()')
Le planificateur écrit le résultat de la tâche dans la colonne de résultat sous forme de texte. Si, à la suite de l'exécution de la tâche, il y a plusieurs colonnes, le planificateur les ajoute à l'en-tête avec les types de colonne. De plus, à la suite de la tâche, il peut y avoir plusieurs lignes, toutes seront ajoutées à la colonne de résultat.
Si nous voulons terminer une tâche, par exemple, après 5 minutes, alors nous écrivons l'heure prévue dans la colonne correspondante
INSERT INTO task (dt, request) VALUES (now() + '5 min':INTERVAL, 'SELECT now()')
et si nous voulons que la tâche soit terminée à un moment précis, nous l'écrirons
INSERT INTO task (dt, request) VALUES ('2019-07-01 00:00:00', 'SELECT now()')
Si nous avons besoin que la tâche soit effectuée toutes les 5 minutes, alors nous écrivons comme ceci
INSERT INTO task (repeat, request) VALUES ('5 min', 'SELECT now()')
si c'est le cas, écrivez
INSERT INTO task (repeat, request, drift) VALUES ('5 min', 'SELECT now()', false)
puis la tâche sera répétée 5 minutes après la fin de la tâche (et non après l'heure programmée, comme par défaut).
Si une exception se produit pendant l'exécution d'une tâche, elle est interceptée et écrite sous forme de texte dans la colonne de résultat, et l'état correspondant est affecté à la tâche.
Par exemple
INSERT INTO task (request) VALUES ('SELECT 1/0')
S'il est nécessaire que pour une file d'attente de tâches pas plus de 2 tâches soient exécutées simultanément, alors nous créons des tâches par la commande
INSERT INTO task (queue, max, request) VALUES ('queue', 2, 'SELECT now()')
Supposons que dans cette file d'attente, nous avons accumulé beaucoup de tâches et qu'elles soient exécutées simultanément par 2. Si vous créez une tâche par
INSERT INTO task (queue, max, request) VALUES ('queue', 3, 'SELECT now()')
puis il sera exécuté dès que possible de toutes les autres tâches de cette file d'attente, c'est-à-dire c'est toujours quelque chose comme la priorité