PostgreSQL食谱:异步任务计划程序

要准备异步任务调度程序,我们需要postgres本身及其扩展名pg_task 。 (我提供了到postgres分支的链接,因为我做了一些尚未塞入原始存储库的更改。您还可以使用现成的图像 。)

(原始PostgreSQL中的PL / pgSQL中存在一个错误,由于在用PL / pgSQL编写的任务中发生未捕获的异常时,由于我的调度程序无法正常工作,因此我的调度程序无法正常工作。我在这里描述了此错误并在此处用叉子修复了该错误。)

要安装调度程序,无需在(每个)数据库中创建扩展。 相反,只需将其添加到配置文件

shared_preload_libraries = 'pg_task' 

重新启动PostgreSQL后,调度程序将代表数据库用户在所有数据库中以及这些用户的默认模式中创建任务表。

如果希望调度程序仅针对特定数据库运行,则可以在配置文件中指定它们

 pg_task.database = 'database1,database2' 

如果您不想从数据库用户运行调度程序,则也可以将其设置为

 pg_task.database = 'database1:user1,database2:user2' 

如果需要为用户创建不在默认方案中的调度程序表,则可以这样设置

 pg_task_schema.database1 = schema3 

如果您还需要使用其他名称来命名调度程序表,则可以这样进行操作

 pg_task_table.database1 = table3 

默认情况下,调度程序每1000毫秒检查一次任务,但是可以如下更改

 pg_task_period.database1 = 100 pg_task_period.database2 = 10 

因此,调度程序将创建(如果尚未创建)(方案,如果需要的话)和带有此类列的任务表

 id BIGSERIAL NOT NULL PRIMARY KEY, -- ,   dt TIMESTAMP NOT NULL DEFAULT NOW(), --     (- -   ) start TIMESTAMP, --     stop TIMESTAMP, --      queue TEXT NOT NULL DEFAULT 'default', --    (      ) max INT, --        (,      ) pid INT, --  ,    request TEXT NOT NULL, --  SQL   response TEXT, --    state TEXT NOT NULL DEFAULT 'QUEUE', --   (- - ,    , , ...) timeout INTERVAL, --      delete BOOLEAN NOT NULL DEFAULT false, --     ,    repeat INTERVAL, --    drift BOOLEAN NOT NULL DEFAULT true --        

实际上,调度程序启动了多个后台工作流程:一个用于跟踪配置文件中的更改并在必要时启动/停止调度程序的其他后台进程。 每个数据库都有一个后台工作流程,用于检查每个数据库中的计划任务并在必要时启动任务。

例如,如果我们想尽快完成任务,那么我们执行SQL命令

 INSERT INTO task (request) VALUES ('SELECT now()') 

调度程序将任务的结果以文本形式写入结果列。 如果作为任务执行的结果,有几列,那么调度程序会将它们与列类型一起添加到标题中。 同样,作为任务的结果,可能会有几行,所有这些行都将添加到结果列。

例如,如果要完成一项任务,例如5分钟后,则将计划时间写在相应的列中

 INSERT INTO task (dt, request) VALUES (now() + '5 min':INTERVAL, 'SELECT now()') 

如果我们希望任务在特定时间完成,则将其写下来

 INSERT INTO task (dt, request) VALUES ('2019-07-01 00:00:00', 'SELECT now()') 

如果我们需要每5分钟执行一次任务,则我们这样写

 INSERT INTO task (repeat, request) VALUES ('5 min', 'SELECT now()') 

如果是这样写

 INSERT INTO task (repeat, request, drift) VALUES ('5 min', 'SELECT now()', false) 

那么任务将在任务完成后的5分钟内重复执行(默认情况下不会在计划的时间之后)。

如果在执行任务期间发生异常,则将其拦截并以文本形式写入结果列,并为任务分配相应的状态。

举个例子

 INSERT INTO task (request) VALUES ('SELECT 1/0') 

如果对于一个任务队列有必要同时执行不超过2个任务,那么我们可以通过以下命令创建任务

 INSERT INTO task (queue, max, request) VALUES ('queue', 2, 'SELECT now()') 

假设在此队列中,我们已经积累了很多任务,并且它们由2同时执行。如果使用以下命令创建任务

 INSERT INTO task (queue, max, request) VALUES ('queue', 3, 'SELECT now()') 

那么它将尽快执行此队列中所有其他任务,即 还是有点像优先

Source: https://habr.com/ru/post/zh-CN456722/


All Articles