Hangfire Queue-Unterstützung

Hangfire ist eine Bibliothek für .net (Kern), die die asynchrone Ausführung von Code nach dem Prinzip "Feuer und Vergessen" ermöglicht. Ein Beispiel für einen solchen Code kann das Senden von E-Mails, die Videoverarbeitung, die Synchronisierung mit einem anderen System usw. sein. Neben "Feuer und Vergessen" werden auch verzögerte Aufgaben sowie geplante Aufgaben im Cron-Format unterstützt.


Derzeit gibt es viele solcher Bibliotheken. Einige der Vorteile von Hangfire sind:


  • Einfache Konfiguration, bequeme API
  • Zuverlässigkeit Hangfire garantiert, dass die erstellte Aufgabe mindestens einmal ausgeführt wird
  • Fähigkeit, Aufgaben parallel auszuführen und hervorragende Leistung
  • Erweiterbarkeit (hier werden wir es unten verwenden)
  • Ziemlich vollständige und verständliche Dokumentation
  • Dashboard, in dem Sie alle Statistiken zu den Aufgaben anzeigen können

Ich werde nicht zu sehr ins Detail gehen, da es viele gute Artikel über Hangfire und dessen Verwendung gibt. In diesem Artikel werde ich erläutern, wie die Unterstützung mehrerer Warteschlangen (oder Aufgabenpools) verwendet wird, wie die Standard-Wiederholungsfunktionalität behoben wird und jede Warteschlange eine individuelle Konfiguration erhält.


Bestehende Unterstützung für (Pseudo-) Warteschlangen


Wichtiger Hinweis: Im Titel habe ich den Begriff Pseudo-Warteschlange verwendet, da Hangfire nicht garantiert, dass Aufgaben in einer bestimmten Reihenfolge ausgeführt werden. Das heißt, Das Prinzip "First In First Out" gilt nicht und wir werden uns nicht darauf verlassen. Darüber hinaus empfiehlt der Autor der Bibliothek, Aufgaben idempotent zu machen, d. H. stetig gegen unvorhergesehene Mehrfachausführung. Weiter werde ich nur das Wort "Warteschlange" verwenden, weil Hangfire verwendet den Begriff "Warteschlange".


Hangfire bietet einfache Unterstützung für Warteschlangen. Obwohl es nicht die Flexibilität von Message Queue-Systemen wie rabbitMQ oder Azure Service Bus bietet, reicht es häufig aus, um eine Vielzahl von Aufgaben zu lösen.


Jede Aufgabe hat die Eigenschaft "Warteschlange", dh den Namen der Warteschlange, in der sie ausgeführt werden soll. Standardmäßig wird die Aufgabe mit dem Namen "Standard" an die Warteschlange gesendet, sofern nicht anders angegeben. Die Unterstützung mehrerer Warteschlangen ist erforderlich, um die Ausführung von Aufgaben unterschiedlicher Art separat zu verwalten. Beispielsweise möchten wir möglicherweise, dass die Videoverarbeitungsaufgaben in die Warteschlange "video_queue" fallen und E-Mails an die Warteschlange "email_queue" senden. Auf diese Weise können wir diese beiden Arten von Aufgaben unabhängig voneinander ausführen. Wenn Sie die Videoverarbeitung auf einen dedizierten Server verschieben möchten, können Sie dies einfach tun, indem Sie einen separaten Hangfire-Server als Konsolenanwendung ausführen, die die Warteschlange "video_queue" verarbeitet.


Lass uns weiter üben


Das Einrichten des Hangfire-Servers im asp.net-Kern ist wie folgt:


public void Configure(IApplicationBuilder app) { app.UseHangfireServer(new BackgroundJobServerOptions { WorkerCount = 2, Queues = new[] { "email_queue", "video_queue" } }); } 

Problem 1 - Wiederholungsaufgaben fallen in die Standardwarteschlange


Wie oben erwähnt, gibt es in Hangfire eine Standardwarteschlange namens "Standard". Wenn eine in der Warteschlange platzierte Aufgabe, z. B. "video_queue", fehlgeschlagen ist und erneut versucht werden muss, wird sie erneut an die "Standard" -Warteschlange und nicht an "video_queue" gesendet, sodass unsere Aufgabe überhaupt nicht ausgeführt wird die Instanz des Hangfire-Servers, die wir, wenn überhaupt, möchten. Dieses Verhalten wurde von mir experimentell festgestellt und ist wahrscheinlich ein Fehler in Hangfire.


Jobfilter


Hangfire bietet uns die Möglichkeit, die Funktionalität mithilfe von sogenannten Filtern ( Jobfiltern ) zu erweitern, die im Prinzip den Aktionsfiltern in ASP.NET MVC ähnlich sind. Tatsache ist, dass die interne Logik von Hangfire als State Machine implementiert ist. Dies ist eine Engine, die die Aufgaben im Pool nacheinander von einem Status in einen anderen überträgt (z. B. erstellt -> in die Warteschlange gestellt -> verarbeitet -> erfolgreich). Mithilfe von Filtern können wir die Task, die bei jeder Änderung ihres Status ausgeführt wird, "abfangen" und bearbeiten. Ein Filter wird als Attribut implementiert, das auf eine einzelne Methode, Klasse oder global angewendet werden kann.


Auftragsparameter


Das ElectStateContext-Objekt wird als Argument an die Filtermethode übergeben. Dieses Objekt enthält vollständige Informationen zur aktuellen Aufgabe. Es verfügt unter anderem über die Methoden GetJobParameter <> (...) und SettJobParameter <> (...). Mit Jobparametern können Sie Informationen zu einer Aufgabe in einer Datenbank speichern. In den Auftragsparametern wird der Name der Warteschlange gespeichert, an die die Aufgabe ursprünglich gesendet wurde. Nur aus irgendeinem Grund werden diese Informationen bei der nächsten Wiederholung ignoriert.


Lösung


Wir haben also eine Aufgabe, die fehlerhaft endete und zur erneuten Ausführung in der richtigen Warteschlange gesendet werden sollte (in derselben, die ihr zum Zeitpunkt der ersten Erstellung zugewiesen wurde). Die Wiederholung einer Aufgabe, die mit einem Fehler abgeschlossen wurde, ist ein Übergang vom Status "Fehlgeschlagen" in den Status "In Warteschlange". Um das Problem zu lösen, erstellen Sie einen Filter, der beim Eintritt der Aufgabe in den Status "Enqueued" prüft, in welcher Warteschlange die Aufgabe ursprünglich gesendet wurde, und den Parameter "QueueName" auf den gewünschten Wert setzt:


 public class HangfireUseCorrectQueueFilter : JobFilterAttribute, IElectStateFilter { public void OnStateElection(ElectStateContext context) { if (context.CandidateState is EnqueuedState enqueuedState) { var queueName = context.GetJobParameter<string>("QueueName"); if (string.IsNullOrWhiteSpace(queueName)) { context.SetJobParameter("QueueName", enqueuedState.Queue); } else { enqueuedState.Queue = queueName; } } } } 

Fügen Sie unserer Konfiguration den folgenden Code hinzu, um den Standardfilter auf alle Aufgaben (d. H. Global) anzuwenden:


 GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 }); 

Ein weiterer kleiner Haken ist, dass die GlobalJobFilters-Auflistung standardmäßig eine Instanz der AutomaticRetryAttribute-Klasse enthält. Dies ist ein Standardfilter, der für die erneute Ausführung fehlgeschlagener Aufgaben verantwortlich ist. Er sendet die Aufgabe auch an die "Standard" -Warteschlange und ignoriert die ursprüngliche Warteschlange. Damit unser Fahrrad fahren kann, müssen Sie diesen Filter aus der Sammlung entfernen und unseren Filter die Verantwortung für die wiederholten Aufgaben übernehmen lassen. Infolgedessen sieht der Konfigurationscode folgendermaßen aus:


 var defaultRetryFilter = GlobalJobFilters.Filters .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute); if (defaultRetryFilter != null && defaultRetryFilter.Instance != null) { GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance); } GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 }); 

Es ist zu beachten, dass AutomaticRetryAttribute die Logik implementiert, das Intervall zwischen Versuchen automatisch zu vergrößern (das Intervall erhöht sich mit jedem nachfolgenden Versuch), und AutomaticRetryAttribute aus der GlobalJobFilters-Auflistung zu entfernen. Diese Funktionalität wird aufgegeben (siehe Implementierung der ScheduleAgainLater- Methode).


Wir haben also erreicht, dass unsere Aufgaben in verschiedenen Warteschlangen ausgeführt werden können. Auf diese Weise können wir ihre Ausführung unabhängig verwalten, einschließlich der Verarbeitung verschiedener Warteschlangen auf verschiedenen Computern. Erst jetzt wissen wir nicht, wie oft und in welchem ​​Intervall unsere Aufgaben im Fehlerfall wiederholt werden, da wir AutomaticRetryAttribute aus der Filtersammlung entfernt haben.


Problem 2 - Individuelle Einstellungen für jede Warteschlange


Wir möchten in der Lage sein, das Intervall und die Anzahl der Wiederholungen für jede Warteschlange separat zu konfigurieren. Wenn wir für eine Warteschlange keine expliziten Werte angegeben haben, möchten wir, dass die Standardwerte angewendet werden. Dazu implementieren wir einen anderen Filter und nennen ihn HangfireRetryJobFilter .


Im Idealfall sollte der Konfigurationscode ungefähr so ​​aussehen:


 GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter { Order = 2, ["email_queue"] = new HangfireQueueSettings { DelayInSeconds = 120, RetryAttempts = 3 }, ["video_queue"] = new HangfireQueueSettings { DelayInSeconds = 60, RetryAttempts = 5 } }); 

Lösung


HangfireQueueSettings Sie dazu zuerst die HangfireQueueSettings Klasse hinzu, die als Container für unsere Einstellungen dient.


 public sealed class HangfireQueueSettings { public int RetryAttempts { get; set; } public int DelayInSeconds { get; set; } } 

Anschließend fügen wir die Implementierung des Filters selbst hinzu. Wenn die Aufgaben nach einem Fehler wiederholt werden, werden die Einstellungen abhängig von der Konfiguration der Warteschlange angewendet und die Anzahl der Wiederholungsversuche überwacht:


 public class HangfireRetryJobFilter : JobFilterAttribute, IElectStateFilter, IApplyStateFilter { private readonly HangfireQueueSettings _defaultQueueSettings = new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 }; private readonly IDictionary<string, HangfireQueueSettings> _settings = new Dictionary<string, HangfireQueueSettings>(); public HangfireQueueSettings this[string queueName] { get { return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) ? queueSettings : _defaultQueueSettings; } set { _settings[queueName] = value; } } public void OnStateElection(ElectStateContext context) { if (!(context.CandidateState is FailedState failedState)) { // This filter accepts only failed job state. return; } var retryAttempt = context.GetJobParameter<int>("RetryCount") + 1; var queueName = context.GetJobParameter<string>("QueueName"); if (retryAttempt <= this[queueName].RetryAttempts) { ScheduleAgainLater(context, retryAttempt, failedState, queueName); } else { TransitionToDeleted(context, failedState, queueName); } } public void OnStateApplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.NewState is ScheduledState && context.NewState.Reason != null && context.NewState.Reason.StartsWith("Retry attempt")) { transaction.AddToSet("retries", context.BackgroundJob.Id); } } public void OnStateUnapplied( ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.OldStateName == ScheduledState.StateName) { transaction.RemoveFromSet("retries", context.BackgroundJob.Id); } } private void ScheduleAgainLater( ElectStateContext context, int retryAttempt, FailedState failedState, string queueName) { context.SetJobParameter("RetryCount", retryAttempt); var delay = TimeSpan.FromSeconds(this[queueName].DelayInSeconds); const int maxMessageLength = 50; var exceptionMessage = failedState.Exception.Message.Length > maxMessageLength ? failedState.Exception.Message.Substring(0, maxMessageLength - 1) + "…" : failedState.Exception.Message; // If attempt number is less than max attempts, we should // schedule the job to run again later. var reason = $"Retry attempt {retryAttempt} of {this[queueName].RetryAttempts}: {exceptionMessage}"; context.CandidateState = delay == TimeSpan.Zero ? (IState)new EnqueuedState { Reason = reason } : new ScheduledState(delay) { Reason = reason }; } private void TransitionToDeleted( ElectStateContext context, FailedState failedState, string queueName) { context.CandidateState = new DeletedState { Reason = this[queueName].RetryAttempts > 0 ? "Exceeded the maximum number of retry attempts." : "Retries were disabled for this job." }; } } 

Hinweis zum Code: Bei der Implementierung der HangfireRetryJobFilter Klasse wurde die AutomaticRetryAttribute Klasse von HangfireRetryJobFilter als Grundlage verwendet, daher stimmt die Implementierung einiger Methoden teilweise mit den entsprechenden Methoden dieser Klasse überein.

Problem 3 - Wie sende ich eine Aufgabe an eine bestimmte Warteschlange?


Es gelang mir, zwei Möglichkeiten zu finden, um die Aufgabe der Warteschlange zuzuweisen: dokumentiert und - nein.


1. Methode - Hängen Sie das entsprechende Attribut an die Methode


 [Queue("video_queue")] public void SomeMethod() { } BackgroundJob.Enqueue(() => SomeMethod()); 

http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html


2. Methode (undokumentiert) - Verwenden Sie die BackgroundJobClient Klasse


 var client = new BackgroundJobClient(); client.Create(() => MyMethod(), new EnqueuedState("video_queue")); 

Der Vorteil der zweiten Methode besteht darin, dass keine unnötigen Abhängigkeiten von Hangfire entstehen und Sie entscheiden können, in welchem ​​Prozess die Aufgabe ausgeführt werden soll. Leider wurde in der offiziellen Dokumentation die BackgroundJobClient Klasse und deren Anwendung nicht erwähnt. Ich habe die zweite Methode in meiner Lösung verwendet, daher wird sie in der Praxis getestet.


Fazit


In diesem Artikel haben wir die Unterstützung mehrerer Warteschlangen in Hangfire verwendet, um die Verarbeitung verschiedener Aufgabentypen zu trennen. Wir haben unseren Mechanismus zum Wiederholen nicht erfolgreich abgeschlossener Aufgaben mit der Möglichkeit der individuellen Konfiguration für jede Warteschlange implementiert, die Funktionalität von Hangfire mithilfe von Jobfiltern erweitert und gelernt, wie Aufgaben zur Ausführung an die gewünschte Warteschlange gesendet werden.


Ich hoffe, dieser Artikel ist für jemanden nützlich. Ich werde gerne einen Kommentar abgeben.


Nützliche Links


Hangfire-Dokumentation
Hangfire-Quellcode
Scott Hanselman - Ausführen von Hintergrundaufgaben in ASP.NET

Source: https://habr.com/ru/post/de434364/


All Articles