
As filas são usadas para organizar o processamento do fluxo de tarefas. Eles são necessários para o acúmulo e a distribuição de tarefas entre os artistas. As filas também podem fornecer requisitos adicionais para tarefas de processamento: garantia de entrega, garantia única, priorização etc.
Como regra, são utilizados sistemas de fila de mensagens prontas (MQ - fila de mensagens), mas às vezes você precisa organizar uma fila ad hoc ou outra especializada (por exemplo, uma fila prioritária e o atraso na reinicialização de tarefas que não foram processadas devido a exceções). A criação de tais filas será discutida abaixo.
Limitações de aplicabilidade
As soluções propostas são projetadas para lidar com o fluxo de tarefas semelhantes. Eles não são adequados para organizar pub / sub ou mensagens entre sistemas e componentes de acoplamento fraco.
Uma fila no topo de um banco de dados relacional funciona bem para cargas pequenas e médias (centenas de milhares de tarefas por dia, dezenas a centenas de executores), mas para threads grandes, é melhor usar uma solução especializada.
A essência do método em cinco palavras
select ... for update skip locked
Linha de base
Para simplificar, a seguir, apenas os identificadores de tarefas exclusivos serão armazenados na tabela. Adicionar algum tipo de carga útil não deve ser difícil.
A tabela para a fila mais simples contém a própria tarefa e seu status:
create table task ( id bigint not null primary key, status integer not null default 0
Adicionando uma tarefa:
insert into task (id) values ($1) on conflict (id) do nothing;
Obtendo a seguinte tarefa:
with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
Conclusão da tarefa:
update task set status = 2 where id = $1;
Fila prioritária
No caso simples, o id
da tarefa é sua prioridade. Somente a solicitação para a próxima tarefa é alterada - a order by id
condição de classificação order by id
com a ordem necessária das tarefas de processamento é adicionada. Você também precisa criar um índice composto por (status, id)
.
Ou, por prioridade, é adicionada uma coluna separada:
create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0
Adicionando uma tarefa: insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;
Obtendo a seguinte tarefa: with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
A coluna destacada permite alterar a prioridade da tarefa rapidamente.
Fila com uma repetição das tarefas "caídas"
Um erro ou uma exceção pode ocorrer durante a execução da tarefa. Nesses casos, a tarefa deve ser colocada na fila novamente. Às vezes, ainda é necessário adiar o tempo de sua execução repetida por algum tempo, por exemplo, se a exceção for devido à indisponibilidade temporária de um serviço de terceiros.
create table task ( id bigint not null primary key, status integer not null default 0,
Como você pode ver, a lista de status foi expandida e novas colunas foram adicionadas:
attempt
- número de tentativas; necessário tomar uma decisão sobre a necessidade de tentar novamente (limitar o número de tentativas) e selecionar um atraso antes de tentar novamente (por exemplo, cada tentativa subsequente é adiada por 10 * attempt
minutos de 10 * attempt
);delayed_to
- hora da próxima tentativa de concluir a tarefa;error_text
- texto do erro.
O texto do erro é necessário para agrupar por tipo de erro.
Um exemplo O sistema de monitoramento relata que milhares de tarefas com o status "erro" foram acumuladas na fila. Atendemos à solicitação:
select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;
Para detalhes, acesse os registros dos artistas. Corrija a situação que causou o erro (se possível). Se necessário, aceleramos o reinício das tarefas definindo o status como 0 ou alterando o tempo da próxima tentativa.
Obtendo a seguinte nova tarefa: with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id;
Pendente da seguinte tarefa devido a um erro: with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id;
Conclusão bem sucedida da tarefa: update task set status = 2, delayed_to = null, error_text = null where id = $1;
A tarefa falhou, haverá uma repetição em (5 * número de tentativas) minutos: update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1;
A tarefa foi concluída com um erro fatal, não haverá nova tentativa: update task set status = 4, delayed_to = null, error_text = $2 where id = $1;
A solicitação para a próxima tarefa é dividida em duas, para que o DBMS possa criar um plano de consulta eficaz para a fila de prioridade. Uma condição de seleção com or
pode dar muito errado com a order by
classificação order by
.
Coleção de métricas
Adicione os seguintes atributos:
- hora de criação da tarefa;
- tempo de mudança de tarefa;
- hora de início e término da tarefa.
create table task ( id bigint not null primary key, status integer not null default 0,
Consideramos as colunas adicionadas em todas as consultas.
Obtendo a seguinte nova tarefa: with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id;
Pendente da seguinte tarefa devido a um erro: with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id;
Conclusão bem sucedida da tarefa: update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1;
A tarefa falhou, haverá uma repetição em (5 * número de tentativas) minutos: update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1;
A tarefa foi concluída com um erro fatal, não haverá nova tentativa: update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1;
Exemplos de por que isso pode ser necessário
Pesquise e reinicie tarefas pendentes:
update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour';
Removendo tarefas antigas:
delete from task where updated < localtimestamp - interval '30 days';
Estatísticas para concluir tarefas:
select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1;
Reinicie as tarefas concluídas anteriormente
Por exemplo, um documento é atualizado, é necessário reindexá-lo para a pesquisa de texto completo.
create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0,
Aqui, a coluna task_updated_at
é adicionada para o tempo de atualização da tarefa, mas o campo created
pode ser usado.
Incluindo ou atualizando (reiniciando) uma tarefa:
insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at;
O que está acontecendo aqui. Uma tarefa se torna "nova" se não estiver sendo concluída agora.
A solicitação para concluir a tarefa também verificará se foi alterada durante a execução.
As solicitações para a próxima tarefa são as mesmas da fila para coletar métricas.
Conclusão bem sucedida da tarefa:
update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1;
A conclusão da tarefa com um erro: dependendo da tarefa. Você pode fazer um atraso incondicional na reinicialização. Você pode definir o status como "novo" ao atualizar.
Pipeline
A tarefa passa por várias etapas. Você pode criar uma fila separada para cada estágio. Ou você pode adicionar a coluna correspondente à tabela.
Um exemplo baseado na fila básica para não bagunçar o código. Todas as modificações descritas anteriormente podem ser aplicadas a essa fila sem problemas.
create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status);
Obtendo a seguinte tarefa em um determinado estágio:
with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
Conclusão da tarefa com a transição para o estágio indicado:
update task set stage = $2, status = 2 where id = $1;
Ou faça a transição para a próxima etapa em ordem:
update task set stage = stage + 1, status = 2 where id = $1;
Tarefas agendadas
Essa é uma variação da fila de repetição.
Cada tarefa pode ter seu próprio agendamento (na versão mais simples, a frequência do lançamento).
create table task ( id bigint not null primary key, period integer not null,
Adicionando uma tarefa:
insert into task (id, period, next_run_time) values ($1, $2, $3);
Obtendo a seguinte tarefa:
with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
Concluindo a tarefa e planejando a próxima execução:
update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1
Em vez de uma conclusão
Não há nada complicado na criação de uma fila de tarefas especializada usando ferramentas RDBMS.
A fila "criada por si" responderá até o mais selvagem praticamente qualquer requisito de negócios / domínio.
Bem, não devemos esquecer que, como qualquer outro banco de dados, a fila requer um ajuste cuidadoso do servidor, consultas e índices.