Suporte da fila do Hangfire

O Hangfire é uma biblioteca para .net (core), que permite a execução assíncrona de algum código no princípio de "acionar e esquecer". Um exemplo desse código pode ser enviar email, processamento de vídeo, sincronização com outro sistema etc. Além de "disparar e esquecer", há suporte para tarefas adiadas, bem como tarefas agendadas no formato Cron.


Atualmente, existem muitas dessas bibliotecas. Alguns dos benefícios do Hangfire são:


  • Configuração simples, API conveniente
  • Confiabilidade O Hangfire garante que a tarefa criada será executada pelo menos uma vez
  • Capacidade de executar tarefas em paralelo e excelente desempenho
  • Extensibilidade (aqui usaremos abaixo)
  • Documentação razoavelmente completa e compreensível
  • Painel no qual você pode ver todas as estatísticas sobre as tarefas

Não vou entrar em muitos detalhes, pois existem muitos artigos bons sobre o Hangfire e como usá-lo. Neste artigo, discutirei como usar o suporte de várias filas (ou conjuntos de tarefas), como corrigir a funcionalidade de repetição padrão e fazer com que cada fila tenha uma configuração individual.


Suporte existente para filas (pseudo)


Nota importante: no título, usei o termo pseudo-fila porque o Hangfire não garante que as tarefas sejam executadas em uma ordem específica. I.e. o princípio de "primeiro a entrar, primeiro a sair" não se aplica e não confiaremos nele. Além disso, o autor da biblioteca recomenda tornar as tarefas idempotentes, ou seja, constante contra a execução múltipla imprevista. Além disso, usarei apenas a palavra "fila", porque O Hangfire usa o termo "Fila".


O Hangfire possui suporte a filas simples. Embora não ofereça a flexibilidade de sistemas de fila de mensagens, como rabbitMQ ou Barramento de Serviço do Azure, geralmente é suficiente resolver uma ampla variedade de tarefas.


Cada tarefa possui a propriedade "Fila", ou seja, o nome da fila na qual deve ser executada. Por padrão, a tarefa é enviada para a fila com o nome "padrão", a menos que especificado de outra forma. É necessário suporte para várias filas para gerenciar separadamente a execução de tarefas de diferentes tipos. Por exemplo, podemos querer que as tarefas de processamento de vídeo caiam na fila "fila de vídeo" e enviem E-mails para a fila "fila de email". Assim, somos capazes de executar independentemente esses dois tipos de tarefas. Se queremos mover o processamento de vídeo para um servidor dedicado, podemos fazer isso facilmente executando um servidor Hangfire separado como um aplicativo de console que processará a fila "video_queue".


Vamos seguir praticando


A configuração do servidor Hangfire no núcleo do asp.net é a seguinte:


public void Configure(IApplicationBuilder app) { app.UseHangfireServer(new BackgroundJobServerOptions { WorkerCount = 2, Queues = new[] { "email_queue", "video_queue" } }); } 

Problema 1 - As tarefas de reprodução caem na fila padrão


Como mencionei acima, há uma fila padrão no Hangfire chamada "padrão". Se uma tarefa colocada na fila, por exemplo, "video_queue", falhar e precisar ser tentada novamente, será enviada para a fila "padrão" novamente e não para "video_queue" e, como resultado, nossa tarefa não será executada. a instância do servidor Hangfire que gostaríamos, se houver. Esse comportamento foi estabelecido por mim experimentalmente e provavelmente é um bug no próprio Hangfire.


Filtros de trabalho


O Hangfire nos oferece a possibilidade de expandir a funcionalidade com a ajuda dos chamados filtros (filtros de tarefas), que são similares em princípio aos filtros de ações no ASP.NET MVC. O fato é que a lógica interna do Hangfire é implementada como uma máquina de estado. Este é um mecanismo que transfere sequencialmente as tarefas no pool de um estado para outro (por exemplo, criado -> enfileirado -> processamento -> bem-sucedido) e os filtros permitem "interceptar" a tarefa que é executada cada vez que seu estado é alterado e manipulá-la. Um filtro é implementado como um atributo que pode ser aplicado a um único método, classe ou globalmente.


Parâmetros do trabalho


O objeto ElectStateContext é passado como um argumento para o método de filtro. Este objeto contém informações completas sobre a tarefa atual. Entre outras coisas, ele possui os métodos GetJobParameter <> (...) e SettJobParameter <> (...). Os Parâmetros do trabalho permitem salvar informações relacionadas a uma tarefa em um banco de dados. É nos Parâmetros da tarefa que o nome da fila para a qual a tarefa foi originalmente enviada é armazenado, apenas por algum motivo essas informações são ignoradas durante a próxima nova tentativa.


Solução


Portanto, temos uma tarefa que terminou com erro e deve ser enviada para reexecução na fila certa (na mesma que foi atribuída a ela no momento da criação inicial). A repetição de uma tarefa que foi concluída com um erro é uma transição do estado "falhou" para o estado "enfileirado". Para resolver o problema, crie um filtro que, quando a tarefa entrar no estado "enfileirado", verifique em qual fila a tarefa foi enviada inicialmente e coloque o parâmetro "QueueName" no valor desejado:


 public class HangfireUseCorrectQueueFilter : JobFilterAttribute, IElectStateFilter { public void OnStateElection(ElectStateContext context) { if (context.CandidateState is EnqueuedState enqueuedState) { var queueName = context.GetJobParameter<string>("QueueName"); if (string.IsNullOrWhiteSpace(queueName)) { context.SetJobParameter("QueueName", enqueuedState.Queue); } else { enqueuedState.Queue = queueName; } } } } 

Para aplicar o filtro padrão a todas as tarefas (ou seja, globalmente), adicione o seguinte código à nossa configuração:


 GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 }); 

Outro pequeno problema é que a coleção GlobalJobFilters, por padrão, contém uma instância da classe AutomaticRetryAttribute. Este é um filtro padrão responsável pela reexecução de tarefas com falha. Ele também envia a tarefa para a fila "padrão", ignorando a fila original. Para que nossa bicicleta ande, é necessário remover esse filtro da coleção e deixar que o filtro assuma a responsabilidade pelas tarefas repetidas. Como resultado, o código de configuração ficará assim:


 var defaultRetryFilter = GlobalJobFilters.Filters .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute); if (defaultRetryFilter != null && defaultRetryFilter.Instance != null) { GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance); } GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 }); 

Deve-se observar que o AutomaticRetryAttribute implementa a lógica de aumentar automaticamente o intervalo entre as tentativas (o intervalo aumenta a cada tentativa subseqüente) e, removendo o AutomaticRetryAttribute da coleção GlobalJobFilters, abandonamos essa funcionalidade (consulte a implementação do método ScheduleAgainLater )


Portanto, concluímos que nossas tarefas podem ser executadas em filas diferentes, e isso nos permite gerenciar independentemente sua execução, incluindo o processamento de filas diferentes em máquinas diferentes. Somente agora não sabemos quantas vezes e em que intervalo nossas tarefas serão repetidas em caso de erro, pois removemos o AutomaticRetryAttribute da coleção de filtros.


Problema 2 - Configurações individuais para cada fila


Queremos poder configurar o intervalo e o número de repetições separadamente para cada fila e, também, se para alguma fila não especificamos valores explicitamente, queremos que os valores padrão sejam aplicados. Para fazer isso, implementamos outro filtro e o chamamos de HangfireRetryJobFilter .


Idealmente, o código de configuração deve se parecer com isso:


 GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter { Order = 2, ["email_queue"] = new HangfireQueueSettings { DelayInSeconds = 120, RetryAttempts = 3 }, ["video_queue"] = new HangfireQueueSettings { DelayInSeconds = 60, RetryAttempts = 5 } }); 

Solução


Para fazer isso, primeiro adicione a classe HangfireQueueSettings , que servirá como um contêiner para nossas configurações.


 public sealed class HangfireQueueSettings { public int RetryAttempts { get; set; } public int DelayInSeconds { get; set; } } 

Em seguida, adicionamos a implementação do próprio filtro, que, quando as tarefas são repetidas após um erro, aplicará as configurações dependendo da configuração da fila e monitorará o número de tentativas:


 public class HangfireRetryJobFilter : JobFilterAttribute, IElectStateFilter, IApplyStateFilter { private readonly HangfireQueueSettings _defaultQueueSettings = new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 }; private readonly IDictionary<string, HangfireQueueSettings> _settings = new Dictionary<string, HangfireQueueSettings>(); public HangfireQueueSettings this[string queueName] { get { return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) ? queueSettings : _defaultQueueSettings; } set { _settings[queueName] = value; } } public void OnStateElection(ElectStateContext context) { if (!(context.CandidateState is FailedState failedState)) { // This filter accepts only failed job state. return; } var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1; var queueName = context.GetJobParameter<string>("QueueName"); if (retryAttempt <= this[queueName].RetryAttempts) { ScheduleAgainLater(context, retryAttempt, failedState, queueName); } else { TransitionToDeleted(context, failedState, queueName); } } public void OnStateApplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.NewState is ScheduledState && context.NewState.Reason != null && context.NewState.Reason.StartsWith("Retry attempt")) { transaction.AddToSet("retries", context.BackgroundJob.Id); } } public void OnStateUnapplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.OldStateName == ScheduledState.StateName) { transaction.RemoveFromSet("retries", context.BackgroundJob.Id); } } private void ScheduleAgainLater( ElectStateContext context, int retryAttempt, FailedState failedState, string queueName) { context.SetJobParameter("RetryCount", retryAttempt); var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds); const int maxMessageLength = 50; var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…" : failedState.Exception.Message; // If attempt number is less than max attempts, we should // schedule the job to run again later. var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}"; context.CandidateState = delay == TimeSpan.Zero ? (IState)new EnqueuedState { Reason = reason } : new ScheduledState(delay) { Reason = reason }; } private void TransitionToDeleted( ElectStateContext context, FailedState failedState, string queueName) { context.CandidateState = new DeletedState { Reason = this[queueName].RetryAttempts > 0 ? "Exceeded the maximum number of retry attempts." : "Retries were disabled for this job." }; } } 

Nota para o código: ao implementar a classe HangfireRetryJobFilter , a classe AutomaticRetryAttribute do HangfireRetryJobFilter foi tomada como base; portanto, a implementação de alguns métodos coincide parcialmente com os métodos correspondentes dessa classe.

Problema 3 - Como enviar uma tarefa para uma fila específica?


Consegui encontrar duas maneiras de atribuir a tarefa à fila: documentado e - não.


1º método - pendure o atributo correspondente no método


 [Queue("video_queue")] public void SomeMethod() { } BackgroundJob.Enqueue(() => SomeMethod()); 

http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html


Segundo método (não documentado) - use a classe BackgroundJobClient


 var client = new BackgroundJobClient(); client.Create(() => MyMethod(), new EnqueuedState("video_queue")); 

A vantagem do segundo método é que ele não cria dependências desnecessárias no Hangfire e permite que você decida durante qual processo a tarefa deve ser executada. Infelizmente, na documentação oficial, não encontrei menção da classe BackgroundJobClient e como aplicá-la. Eu usei o segundo método na minha solução, portanto ele é testado na prática.


Conclusão


Neste artigo, usamos o suporte de várias filas no Hangfire para separar o processamento de diferentes tipos de tarefas. Implementamos nosso mecanismo para repetir tarefas concluídas sem êxito com a possibilidade de configuração individual para cada fila, expandindo a funcionalidade do Hangfire usando filtros de tarefas e também aprendemos como enviar tarefas para a fila desejada para execução.


Espero que este artigo seja útil para alguém. Ficarei feliz em comentar.


Links úteis


Documentação do Hangfire
Código-fonte do Hangfire
Scott Hanselman - Como executar tarefas em segundo plano no ASP.NET

Source: https://habr.com/ru/post/pt434364/


All Articles