Fila de tarefas do PostgreSQL

Fila de elefantes - pixabay.com


As filas são usadas para organizar o processamento do fluxo de tarefas. Eles são necessários para o acúmulo e a distribuição de tarefas entre os artistas. As filas também podem fornecer requisitos adicionais para tarefas de processamento: garantia de entrega, garantia única, priorização etc.


Como regra, são utilizados sistemas de fila de mensagens prontas (MQ - fila de mensagens), mas às vezes você precisa organizar uma fila ad hoc ou outra especializada (por exemplo, uma fila prioritária e o atraso na reinicialização de tarefas que não foram processadas devido a exceções). A criação de tais filas será discutida abaixo.


Limitações de aplicabilidade


As soluções propostas são projetadas para lidar com o fluxo de tarefas semelhantes. Eles não são adequados para organizar pub / sub ou mensagens entre sistemas e componentes de acoplamento fraco.


Uma fila no topo de um banco de dados relacional funciona bem para cargas pequenas e médias (centenas de milhares de tarefas por dia, dezenas a centenas de executores), mas para threads grandes, é melhor usar uma solução especializada.


A essência do método em cinco palavras


select ... for update skip locked 

Linha de base


Para simplificar, a seguir, apenas os identificadores de tarefas exclusivos serão armazenados na tabela. Adicionar algum tipo de carga útil não deve ser difícil.


A tabela para a fila mais simples contém a própria tarefa e seu status:


 create table task ( id bigint not null primary key, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__idx on task (status); 

Adicionando uma tarefa:


 insert into task (id) values ($1) on conflict (id) do nothing; 

Obtendo a seguinte tarefa:


 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Conclusão da tarefa:


 update task set status = 2 where id = $1; 

Fila prioritária


No caso simples, o id da tarefa é sua prioridade. Somente a solicitação para a próxima tarefa é alterada - a order by id condição de classificação order by id com a ordem necessária das tarefas de processamento é adicionada. Você também precisa criar um índice composto por (status, id) .


Ou, por prioridade, é adicionada uma coluna separada:


 create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__priority__idx on task (status, priority); 

Adicionando uma tarefa:
 insert into task (id, priority) values ($1, $2) on conflict (id) do nothing; 

Obtendo a seguinte tarefa:
 with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

A coluna destacada permite alterar a prioridade da tarefa rapidamente.


Fila com uma repetição das tarefas "caídas"


Um erro ou uma exceção pode ocorrer durante a execução da tarefa. Nesses casos, a tarefa deve ser colocada na fila novamente. Às vezes, ainda é necessário adiar o tempo de sua execução repetida por algum tempo, por exemplo, se a exceção for devido à indisponibilidade temporária de um serviço de terceiros.


 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, delayed_to timestamp null, error_text text null ); create index task__status__delayed_to__idx on task (status, delayed_to); 

Como você pode ver, a lista de status foi expandida e novas colunas foram adicionadas:


  • attempt - número de tentativas; necessário tomar uma decisão sobre a necessidade de tentar novamente (limitar o número de tentativas) e selecionar um atraso antes de tentar novamente (por exemplo, cada tentativa subsequente é adiada por 10 * attempt minutos de 10 * attempt );
  • delayed_to - hora da próxima tentativa de concluir a tarefa;
  • error_text - texto do erro.

O texto do erro é necessário para agrupar por tipo de erro.


Um exemplo O sistema de monitoramento relata que milhares de tarefas com o status "erro" foram acumuladas na fila. Atendemos à solicitação:


 select error_text, count(*) from task where status = 3 group by 1 order by 2 desc; 

Para detalhes, acesse os registros dos artistas. Corrija a situação que causou o erro (se possível). Se necessário, aceleramos o reinício das tarefas definindo o status como 0 ou alterando o tempo da próxima tentativa.


Obtendo a seguinte nova tarefa:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

Pendente da seguinte tarefa devido a um erro:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

Conclusão bem sucedida da tarefa:
 update task set status = 2, delayed_to = null, error_text = null where id = $1; 

A tarefa falhou, haverá uma repetição em (5 * número de tentativas) minutos:
 update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1; 

A tarefa foi concluída com um erro fatal, não haverá nova tentativa:
 update task set status = 4, delayed_to = null, error_text = $2 where id = $1; 

A solicitação para a próxima tarefa é dividida em duas, para que o DBMS possa criar um plano de consulta eficaz para a fila de prioridade. Uma condição de seleção com or pode dar muito errado com a order by classificação order by .


Coleção de métricas


Adicione os seguintes atributos:


  • hora de criação da tarefa;
  • tempo de mudança de tarefa;
  • hora de início e término da tarefa.

 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); create index task__status__delayed_to__idx on task (status, delayed_to); create index task__updated__idx on task (updated); 

Consideramos as colunas adicionadas em todas as consultas.


Obtendo a seguinte nova tarefa:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

Pendente da seguinte tarefa devido a um erro:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

Conclusão bem sucedida da tarefa:
 update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

A tarefa falhou, haverá uma repetição em (5 * número de tentativas) minutos:
 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1; 

A tarefa foi concluída com um erro fatal, não haverá nova tentativa:
 update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1; 

Exemplos de por que isso pode ser necessário


Pesquise e reinicie tarefas pendentes:


 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour'; 

Removendo tarefas antigas:


 delete from task where updated < localtimestamp - interval '30 days'; 

Estatísticas para concluir tarefas:


 select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1; 

Reinicie as tarefas concluídas anteriormente


Por exemplo, um documento é atualizado, é necessário reindexá-lo para a pesquisa de texto completo.


 create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); 

Aqui, a coluna task_updated_at é adicionada para o tempo de atualização da tarefa, mas o campo created pode ser usado.


Incluindo ou atualizando (reiniciando) uma tarefa:


 insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at; 

O que está acontecendo aqui. Uma tarefa se torna "nova" se não estiver sendo concluída agora.


A solicitação para concluir a tarefa também verificará se foi alterada durante a execução.


As solicitações para a próxima tarefa são as mesmas da fila para coletar métricas.


Conclusão bem sucedida da tarefa:


 update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

A conclusão da tarefa com um erro: dependendo da tarefa. Você pode fazer um atraso incondicional na reinicialização. Você pode definir o status como "novo" ao atualizar.


Pipeline


A tarefa passa por várias etapas. Você pode criar uma fila separada para cada estágio. Ou você pode adicionar a coluna correspondente à tabela.


Um exemplo baseado na fila básica para não bagunçar o código. Todas as modificações descritas anteriormente podem ser aplicadas a essa fila sem problemas.


 create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status); 

Obtendo a seguinte tarefa em um determinado estágio:


 with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Conclusão da tarefa com a transição para o estágio indicado:


 update task set stage = $2, status = 2 where id = $1; 

Ou faça a transição para a próxima etapa em ordem:


 update task set stage = stage + 1, status = 2 where id = $1; 

Tarefas agendadas


Essa é uma variação da fila de repetição.


Cada tarefa pode ter seu próprio agendamento (na versão mais simples, a frequência do lançamento).


 create table task ( id bigint not null primary key, period integer not null, --     status integer not null default 0, -- 0 - , 1 -   next_run_time timestamp not null default localtimestamp ); create index task__status__next_run_time__idx on task (status, next_run_time); 

Adicionando uma tarefa:


 insert into task (id, period, next_run_time) values ($1, $2, $3); 

Obtendo a seguinte tarefa:


 with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Concluindo a tarefa e planejando a próxima execução:


 update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1 

Em vez de uma conclusão


Não há nada complicado na criação de uma fila de tarefas especializada usando ferramentas RDBMS.


A fila "criada por si" responderá até o mais selvagem praticamente qualquer requisito de negócios / domínio.


Bem, não devemos esquecer que, como qualquer outro banco de dados, a fila requer um ajuste cuidadoso do servidor, consultas e índices.

Source: https://habr.com/ru/post/pt481556/


All Articles