Prise en charge de la file d'attente Hangfire

Hangfire est une bibliothèque pour .net (core), qui permet l'exécution asynchrone de certains codes sur le principe du "feu et oublie". Un exemple d'un tel code peut être l'envoi d'e-mails, le traitement vidéo, la synchronisation avec un autre système, etc. En plus de "tirer et oublier", il existe un support pour les tâches différées, ainsi que les tâches planifiées au format Cron.


Actuellement, il existe de nombreuses bibliothèques de ce type. Voici quelques-uns des avantages de Hangfire:


  • Configuration simple, API pratique
  • Fiabilité Hangfire garantit que la tâche créée sera exécutée au moins une fois
  • Capacité à effectuer des tâches en parallèle et d'excellentes performances
  • Extensibilité (ici, nous allons l'utiliser ci-dessous)
  • Documentation assez complète et compréhensible
  • Tableau de bord sur lequel vous pouvez voir toutes les statistiques sur les tâches

Je n'entrerai pas dans trop de détails, car il existe de nombreux bons articles sur Hangfire et comment l'utiliser. Dans cet article, je vais expliquer comment utiliser la prise en charge de plusieurs files d'attente (ou pools de tâches), comment corriger la fonctionnalité de nouvelle tentative standard et faire en sorte que chaque file d'attente ait une configuration individuelle.


Prise en charge existante des (pseudo) files d'attente


Remarque importante: dans le titre, j'ai utilisé le terme pseudo-file d'attente car Hangfire ne garantit pas que les tâches seront exécutées dans un ordre spécifique. C'est-à-dire le principe du «premier entré, premier sorti» ne s'applique pas et nous ne nous y fierons pas. De plus, l'auteur de la bibliothèque recommande de rendre les tâches idempotentes, c'est-à-dire stable contre l'exécution multiple imprévue. De plus, je n'utiliserai que le mot "queue", car Hangfire utilise le terme "file d'attente".


Hangfire a un support simple de file d'attente. Bien qu'il n'offre pas la flexibilité des systèmes Message Queue tels que rabbitMQ ou Azure Service Bus, il est souvent suffisant pour résoudre un large éventail de tâches.


Chaque tâche a la propriété "Queue", c'est-à-dire le nom de la file d'attente dans laquelle elle doit être exécutée. Par défaut, la tâche est envoyée à la file d'attente avec le nom "default", sauf indication contraire. La prise en charge de plusieurs files d'attente est nécessaire afin de gérer séparément l'exécution de tâches de différents types. Par exemple, nous pouvons souhaiter que les tâches de traitement vidéo tombent dans la file d'attente "video_queue" et envoient des e-mails à la file d'attente "email_queue". Ainsi, nous sommes en mesure d'effectuer indépendamment ces deux types de tâches. Si nous voulons déplacer le traitement vidéo vers un serveur dédié, nous pouvons facilement le faire en exécutant un serveur Hangfire séparé en tant qu'application console qui traitera la file d'attente "video_queue".


Passons à la pratique


La configuration du serveur Hangfire dans le noyau asp.net est la suivante:


public void Configure(IApplicationBuilder app) { app.UseHangfireServer(new BackgroundJobServerOptions { WorkerCount = 2, Queues = new[] { "email_queue", "video_queue" } }); } 

Problème 1 - Les tâches de relecture tombent dans la file d'attente par défaut


Comme je l'ai mentionné ci-dessus, il y a une file d'attente par défaut dans Hangfire appelée "par défaut". Si une tâche placée dans la file d'attente, par exemple, "video_queue", a échoué et doit être réessayée, elle sera à nouveau envoyée dans la file d'attente "par défaut" et non "video_queue" et, par conséquent, notre tâche ne sera pas exécutée du tout l'instance du serveur Hangfire que nous aimerions, le cas échéant. Ce comportement a été établi par moi expérimentalement et est probablement un bogue dans Hangfire lui-même.


Filtres de travail


Hangfire nous offre la possibilité d'étendre la fonctionnalité à l'aide de filtres dits ( filtres de travaux ), qui sont similaires en principe aux filtres d'actions dans ASP.NET MVC. Le fait est que la logique interne de Hangfire est implémentée comme une machine d'état. Il s'agit d'un moteur qui transfère séquentiellement les tâches du pool d'un état à un autre (par exemple, créé -> mis en file d'attente -> traitement -> réussi), et les filtres nous permettent "d'intercepter" la tâche qui est exécutée à chaque fois que son état change et de la manipuler. Un filtre est implémenté en tant qu'attribut qui peut être appliqué à une seule méthode, classe ou globalement.


Paramètres du travail


L'objet ElectStateContext est transmis en tant qu'argument à la méthode de filtrage. Cet objet contient des informations complètes sur la tâche en cours. Entre autres choses, il a les méthodes GetJobParameter <> (...) et SettJobParameter <> (...). Les paramètres de travail vous permettent d'enregistrer des informations relatives à une tâche dans une base de données. C'est dans les paramètres du travail que le nom de la file d'attente à laquelle la tâche a été envoyée à l'origine est stocké, mais pour une raison quelconque, ces informations sont ignorées lors d'une répétition ultérieure.


Solution


Nous avons donc une tâche qui s'est terminée par erreur et doit être envoyée pour être réexécutée dans la file d'attente de droite (dans la même qui lui a été affectée au moment de la création initiale). La répétition d'une tâche qui s'est terminée avec une erreur est une transition de l'état "échoué" à l'état "mis en file d'attente". Pour résoudre le problème, créez un filtre qui, lorsque la tâche entrera dans l'état "en file d'attente", vérifiera dans quelle file d'attente la tâche a été envoyée initialement et mettra le paramètre "QueueName" dans la valeur souhaitée:


 public class HangfireUseCorrectQueueFilter : JobFilterAttribute, IElectStateFilter { public void OnStateElection(ElectStateContext context) { if (context.CandidateState is EnqueuedState enqueuedState) { var queueName = context.GetJobParameter<string>("QueueName"); if (string.IsNullOrWhiteSpace(queueName)) { context.SetJobParameter("QueueName", enqueuedState.Queue); } else { enqueuedState.Queue = queueName; } } } } 

Afin d'appliquer le filtre par défaut à toutes les tâches (c'est-à-dire globalement), ajoutez le code suivant à notre configuration:


 GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 }); 

Un autre petit inconvénient est que la collection GlobalJobFilters contient par défaut une instance de la classe AutomaticRetryAttribute. Il s'agit d'un filtre standard chargé de réexécuter les tâches ayant échoué. Il envoie également la tâche à la file d'attente "par défaut", ignorant la file d'attente d'origine. Pour que notre vélo roule, vous devez retirer ce filtre de la collection et laisser notre filtre prendre la responsabilité des tâches répétées. Par conséquent, le code de configuration ressemblera à ceci:


 var defaultRetryFilter = GlobalJobFilters.Filters .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute); if (defaultRetryFilter != null && defaultRetryFilter.Instance != null) { GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance); } GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 }); 

Il convient de noter que AutomaticRetryAttribute implémente la logique d'augmenter automatiquement l'intervalle entre les tentatives (l'intervalle augmente à chaque tentative suivante), et en supprimant AutomaticRetryAttribute de la collection GlobalJobFilters, nous abandonnons cette fonctionnalité (voir la mise en œuvre de la méthode ScheduleAgainLater )


Ainsi, nous avons réalisé que nos tâches peuvent être effectuées dans différentes files d'attente, ce qui nous permet de gérer indépendamment leur exécution, y compris le traitement de différentes files d'attente sur différentes machines. Seulement maintenant, nous ne savons pas combien de fois et à quel intervalle nos tâches seront répétées en cas d'erreur, car nous avons supprimé AutomaticRetryAttribute de la collection de filtres.


Problème 2 - Paramètres individuels pour chaque file d'attente


Nous voulons pouvoir configurer l'intervalle et le nombre de répétitions séparément pour chaque file d'attente, et aussi, si pour une file d'attente nous n'avons pas spécifié de valeurs explicitement, nous voulons que les valeurs par défaut soient appliquées. Pour ce faire, nous implémentons un autre filtre et l'appelons HangfireRetryJobFilter .


Idéalement, le code de configuration devrait ressembler à ceci:


 GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter { Order = 2, ["email_queue"] = new HangfireQueueSettings { DelayInSeconds = 120, RetryAttempts = 3 }, ["video_queue"] = new HangfireQueueSettings { DelayInSeconds = 60, RetryAttempts = 5 } }); 

Solution


Pour ce faire, ajoutez d'abord la classe HangfireQueueSettings , qui servira de conteneur pour nos paramètres.


 public sealed class HangfireQueueSettings { public int RetryAttempts { get; set; } public int DelayInSeconds { get; set; } } 

Ensuite, nous ajoutons l'implémentation du filtre lui-même qui, lorsque les tâches sont répétées après une erreur, appliquera les paramètres en fonction de la configuration de la file d'attente et surveillera le nombre de tentatives:


 public class HangfireRetryJobFilter : JobFilterAttribute, IElectStateFilter, IApplyStateFilter { private readonly HangfireQueueSettings _defaultQueueSettings = new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 }; private readonly IDictionary<string, HangfireQueueSettings> _settings = new Dictionary<string, HangfireQueueSettings>(); public HangfireQueueSettings this[string queueName] { get { return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) ? queueSettings : _defaultQueueSettings; } set { _settings[queueName] = value; } } public void OnStateElection(ElectStateContext context) { if (!(context.CandidateState is FailedState failedState)) { // This filter accepts only failed job state. return; } var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1; var queueName = context.GetJobParameter<string>("QueueName"); if (retryAttempt <= this[queueName].RetryAttempts) { ScheduleAgainLater(context, retryAttempt, failedState, queueName); } else { TransitionToDeleted(context, failedState, queueName); } } public void OnStateApplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.NewState is ScheduledState && context.NewState.Reason != null && context.NewState.Reason.StartsWith("Retry attempt")) { transaction.AddToSet("retries", context.BackgroundJob.Id); } } public void OnStateUnapplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.OldStateName == ScheduledState.StateName) { transaction.RemoveFromSet("retries", context.BackgroundJob.Id); } } private void ScheduleAgainLater( ElectStateContext context, int retryAttempt, FailedState failedState, string queueName) { context.SetJobParameter("RetryCount", retryAttempt); var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds); const int maxMessageLength = 50; var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…" : failedState.Exception.Message; // If attempt number is less than max attempts, we should // schedule the job to run again later. var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}"; context.CandidateState = delay == TimeSpan.Zero ? (IState)new EnqueuedState { Reason = reason } : new ScheduledState(delay) { Reason = reason }; } private void TransitionToDeleted( ElectStateContext context, FailedState failedState, string queueName) { context.CandidateState = new DeletedState { Reason = this[queueName].RetryAttempts > 0 ? "Exceeded the maximum number of retry attempts." : "Retries were disabled for this job." }; } } 

Remarque sur le code: lors de l'implémentation de la classe HangfireRetryJobFilter , la classe AutomaticRetryAttribute de HangfireRetryJobFilter été prise comme base, par conséquent, l'implémentation de certaines méthodes coïncide partiellement avec les méthodes correspondantes de cette classe.

Problème 3 - Comment envoyer une tâche à une file d'attente spécifique?


J'ai réussi à trouver deux façons d'affecter la tâche à la file d'attente: documentée et - non.


1ère méthode - accrochez l'attribut correspondant à la méthode


 [Queue("video_queue")] public void SomeMethod() { } BackgroundJob.Enqueue(() => SomeMethod()); 

http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html


2ème méthode (non documentée) - utilisez la classe BackgroundJobClient


 var client = new BackgroundJobClient(); client.Create(() => MyMethod(), new EnqueuedState("video_queue")); 

L'avantage de la deuxième méthode est qu'elle ne crée pas de dépendances inutiles sur Hangfire et vous permet de décider pendant quel processus la tâche doit aller. Malheureusement, dans la documentation officielle, je n'ai trouvé aucune mention de la classe BackgroundJobClient et comment l'appliquer. J'ai utilisé la deuxième méthode dans ma solution, elle est donc testée en pratique.


Conclusion


Dans cet article, nous avons utilisé la prise en charge de plusieurs files d'attente dans Hangfire pour séparer le traitement de différents types de tâches. Nous avons implémenté notre mécanisme pour répéter les tâches terminées sans succès avec la possibilité d'une configuration individuelle pour chaque file d'attente, élargissant les fonctionnalités de Hangfire à l'aide des filtres de travaux, et avons également appris comment envoyer des tâches pour exécution à la file d'attente souhaitée.


J'espère que cet article sera utile à quelqu'un. Je me ferai un plaisir de commenter.


Liens utiles


Documentation Hangfire
Code source de Hangfire
Scott Hanselman - Comment exécuter des tâches d'arrière-plan dans ASP.NET

Source: https://habr.com/ru/post/fr434364/


All Articles