PostgreSQL任务队列

大象队列-FreeImages.com


队列用于组织任务流的处理。 在表演者之间积累和分配任务时需要它们。 队列还可以为处理任务提供其他要求:交付保证,一次性保证,优先级等。


通常,使用现成的消息队列系统(MQ-消息队列),但是有时您需要组织一个临时队列或一些专门的队列(例如,优先级队列和由于异常而未处理的任务的延迟重启)。 这种队列的创建将在下面讨论。


适用范围


提出的解决方案旨在处理类似任务的流程。 它们不适合在松散耦合的系统和组件之间组织发布/订阅或消息传递。


关系数据库之上的队列对于中小型负载(每天成千上万的任务,数十到数百个执行者)工作得很好,但是对于大型线程,最好使用专门的解决方案。


五句话方法的精髓


select ... for update skip locked 

基准线


为了简单起见,此后仅将唯一的任务标识符存储在表中。 添加某种有效负载并不难。


最简单队列的表包含任务本身及其状态:


 create table task ( id bigint not null primary key, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__idx on task (status); 

添加任务:


 insert into task (id) values ($1) on conflict (id) do nothing; 

获得以下任务:


 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

任务完成:


 update task set status = 2 where id = $1; 

优先队列


在简单的情况下,任务id是其优先级。 仅更改对下一个任务的请求-添加order by id的排序条件order by id以及所需的处理任务顺序。 您还需要通过(status, id)创建一个复合索引。


或者,为了优先考虑,添加了一个单独的列:


 create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__priority__idx on task (status, priority); 

添加任务:
 insert into task (id, priority) values ($1, $2) on conflict (id) do nothing; 

获得以下任务:
 with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

高亮显示的列允许您即时更改任务的优先级。


重复执行“下降”任务


执行任务期间可能会发生错误或异常。 在这种情况下,该任务必须再次排队。 有时仍然有必要将其重复执行的时间推迟一段时间,例如,如果异常是由于第三方服务暂时不可用而导致的。


 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, delayed_to timestamp null, error_text text null ); create index task__status__delayed_to__idx on task (status, delayed_to); 

如您所见,状态列表已扩展,并添加了新列:


  • attempt - attempt次数; 需要决定是否需要重试(限制尝试次数)并选择重试之前的延迟时间(例如,每次后续尝试都会延迟10 * attempt分钟);
  • delayed_to下次尝试完成任务的时间;
  • error_text错误文本。

需要按错误类型分组错误文本。


一个例子。 监视系统报告队列中已累积了数千个状态为“错误”的任务。 我们满足要求:


 select error_text, count(*) from task where status = 3 group by 1 order by 2 desc; 

有关详细信息,请转到表演者的日志。 更正导致错误的情况(如果可能)。 如有必要,我们可以通过将状态设置为0或更改下一次尝试的时间来加快任务的重启速度。


获得以下新任务:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

由于错误而使以下任务挂起:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

成功完成任务:
 update task set status = 2, delayed_to = null, error_text = null where id = $1; 

任务失败,将在(5 *尝试次数)分钟内重复一次:
 update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1; 

任务完成并出现致命错误,将不会重试:
 update task set status = 4, delayed_to = null, error_text = $2 where id = $1; 

下一个任务的请求分为两部分,以便DBMS可以为优先级队列建立有效的查询计划。 带or的选择条件对的排序order by可能会出错。


指标收集


添加以下属性:


  • 任务创建时间;
  • 任务变更时间;
  • 任务的开始和结束时间。

 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); create index task__status__delayed_to__idx on task (status, delayed_to); create index task__updated__idx on task (updated); 

我们考虑所有查询中添加的列。


获得以下新任务:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

由于错误而使以下任务挂起:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

成功完成任务:
 update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

任务失败,将在(5 *尝试次数)分钟内重复一次:
 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1; 

任务完成并出现致命错误,将不会重试:
 update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1; 

为什么可能需要这样做的示例


搜索并重新启动悬空任务:


 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour'; 

删除旧任务:


 delete from task where updated < localtimestamp - interval '30 days'; 

完成任务的统计信息:


 select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1; 

重新启动先前完成的任务


例如,文档已更新,您需要为全文搜索重新索引它。


 create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); 

在此处,为任务更新时间添加了task_updated_at列,但是可以使用created字段。


添加或更新(重新启动)任务:


 insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at; 

这是怎么回事。 如果现在尚未完成任务,则该任务将变为“新”任务。


完成任务的请求还将检查其在执行过程中是否已更改。


下一个任务的请求与队列中收集指标的请求相同。


成功完成任务:


 update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

错误完成任务:取决于任务。 您可以无条件延迟重新启动,可以在更新时将其状态设置为“新”。


流水线


该任务经历了几个阶段。 您可以为每个阶段创建一个单独的队列。 或者,您可以将相应的列添加到表中。


一个基于基本队列的示例,以免使代码混乱。 所有先前描述的修改都可以毫无问题地应用于此队列。


 create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status); 

在给定阶段获取以下任务:


 with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

过渡到指定阶段的任务完成:


 update task set stage = $2, status = 2 where id = $1; 

或按顺序过渡到下一阶段:


 update task set stage = stage + 1, status = 2 where id = $1; 

计划任务


这是重复队列的变体。


每个任务可以有自己的时间表(最简单的版本是启动频率)。


 create table task ( id bigint not null primary key, period integer not null, --     status integer not null default 0, -- 0 - , 1 -   next_run_time timestamp not null default localtimestamp ); create index task__status__next_run_time__idx on task (status, next_run_time); 

添加任务:


 insert into task (id, period, next_run_time) values ($1, $2, $3); 

获得以下任务:


 with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

完成任务并计划下一次运行:


 update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1 

而不是结论


使用RDBMS工具创建专门的任务队列并不复杂。


“自制”队列将响应 即使是最狂野的 几乎任何业务/域要求。


好吧,我们不要忘记,与任何其他数据库一样,队列需要对服务器,查询和索引进行周到的调整。

Source: https://habr.com/ru/post/zh-CN481556/


All Articles