Hangfire是.net(核心)的库,它允许根据“即发即弃”的原则异步执行某些代码。 此类代码的示例可以是发送电子邮件,视频处理,与另一个系统的同步等。 除了“一劳永逸”外,还支持延迟任务以及Cron格式的计划任务。
当前,有许多这样的库。 Hangfire的一些好处是:
- 配置简单,API方便
- 可靠度 Hangfire保证创建的任务至少执行一次
- 能够并行执行任务并具有出色的性能
- 可扩展性(我们将在下面使用它)
- 相当完整且易于理解的文档
- 您可以在仪表板上查看有关任务的所有统计信息
我不会赘述太多,因为有很多关于Hangfire及其使用方法的好文章。 在本文中,我将讨论如何使用多个队列(或任务池)的支持,如何修复标准的重试功能以及如何使每个队列具有单独的配置。
对(伪)队列的现有支持
重要说明:在标题中,我使用了伪队列一词,因为Hangfire不能保证将以特定顺序执行任务。 即 “先进先出”的原则不适用,我们也不会依靠它。 此外,该库的作者建议使任务成为幂等,即 稳定地应对不可预见的多重执行。 此外,我将仅使用“队列”一词,因为 Hangfire使用术语“队列”。
Hangfire具有简单的队列支持。 尽管它不提供诸如RabbitMQ或Azure Service Bus之类的消息队列系统的灵活性,但它通常足以解决各种各样的任务。
每个任务都有“队列”属性,即应在其中执行的队列的名称。 默认情况下,除非另有说明,否则任务以名称“ default”发送到队列。 为了单独管理不同类型任务的执行,需要支持多个队列。 例如,我们可能希望视频处理任务落入“ video_queue”队列,然后将电子邮件发送到“ email_queue”队列。 因此,我们能够独立执行这两种类型的任务。 如果我们想将视频处理移至专用服务器,则可以通过将单独的Hangfire服务器作为控制台应用程序运行来轻松处理,以处理“ video_queue”队列。
让我们继续练习
在asp.net核心中设置Hangfire服务器的步骤如下:
public void Configure(IApplicationBuilder app) { app.UseHangfireServer(new BackgroundJobServerOptions { WorkerCount = 2, Queues = new[] { "email_queue", "video_queue" } }); }
问题1-重播任务属于默认队列
如上所述,Hangfire中有一个默认队列,称为“默认”。 如果某个任务排队,例如“ video_queue”失败,需要重试,那么它将再次发送到“默认”队列,而不是“ video_queue”,因此,我们的任务将根本不会执行我们想要的Hangfire服务器实例。 这种行为是我通过实验建立的,可能是Hangfire本身的错误。
职位筛选
Hangfire使我们能够借助所谓的过滤器( 作业过滤器 )扩展功能,这些过滤器在原理上类似于ASP.NET MVC中的“动作”过滤器。 事实是,Hangfire的内部逻辑是作为状态机实现的。 这是一种引擎,它顺序地将池中的任务从一种状态转移到另一种状态(例如,创建->排队->处理->成功),并且过滤器使我们可以在状态每次发生变化时“拦截”正在执行的任务并对其进行操作。 筛选器实现为可应用于单个方法,类或全局的属性。
工作参数
ElectStateContext对象作为参数传递给filter方法。 该对象包含有关当前任务的完整信息。 除其他外,它具有GetJobParameter <>(...)和SettJobParameter <>(...)方法。 作业参数使您可以将与任务相关的信息保存在数据库中。 在“作业参数”中存储了最初向其发送任务的队列的名称,仅出于某种原因,在下次重试期间将忽略此信息。
解决方案
因此,我们有一个错误结束的任务,应该在正确的队列中发送该任务以重新执行(与初次创建时分配给它的任务相同)。 重复执行完成但有错误的任务是从失败状态到排队状态的过渡。 要解决该问题,请创建一个过滤器,当任务进入“入队”状态时,它将检查任务最初发送到哪个队列中,并将“ QueueName”参数放入所需的值:
public class HangfireUseCorrectQueueFilter : JobFilterAttribute, IElectStateFilter { public void OnStateElection(ElectStateContext context) { if (context.CandidateState is EnqueuedState enqueuedState) { var queueName = context.GetJobParameter<string>("QueueName"); if (string.IsNullOrWhiteSpace(queueName)) { context.SetJobParameter("QueueName", enqueuedState.Queue); } else { enqueuedState.Queue = queueName; } } } }
为了将默认过滤器应用于所有任务(即全局),请将以下代码添加到我们的配置中:
GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });
另一个小问题是,默认情况下,GlobalJobFilters集合包含AutomaticRetryAttribute类的实例。 这是一个标准过滤器,负责重新执行失败的任务。 他还将任务发送到“默认”队列,而忽略了原始队列。 为了让我们的自行车骑行,您需要从集合中删除此过滤器,并让我们的过滤器负责重复的任务。 结果,配置代码将如下所示:
var defaultRetryFilter = GlobalJobFilters.Filters .FirstOrDefault(f => f.Instance is AutomaticRetryAttribute); if (defaultRetryFilter != null && defaultRetryFilter.Instance != null) { GlobalJobFilters.Filters.Remove(defaultRetryFilter.Instance); } GlobalJobFilters.Filters.Add(new HangfireUseCorrectQueueFilter { Order = 1 });
应该注意的是,AutomaticRetryAttribute实现了以下逻辑:自动增加尝试之间的间隔(间隔随着每次后续尝试而增加),并从GlobalJobFilters集合中删除AutomaticRetryAttribute,我们放弃了此功能(请参见ScheduleAgainLater方法的实现)。
因此,我们已经实现了可以在不同队列中执行任务的方法,这使我们能够独立管理其执行,包括在不同的计算机上处理不同的队列。 直到现在,我们才知道发生错误的情况下将重复执行任务多少次和间隔多少,因为我们从过滤器集合中删除了AutomaticRetryAttribute。
问题2-每个队列的单独设置
我们希望能够为每个队列分别配置间隔和重复次数,并且,如果对于某些队列,我们未明确指定值,则希望应用默认值。 为此,我们实现了另一个过滤器,并将其HangfireRetryJobFilter
。
理想情况下,配置代码应如下所示:
GlobalJobFilters.Filters.Add(new HangfireRetryJobFilter { Order = 2, ["email_queue"] = new HangfireQueueSettings { DelayInSeconds = 120, RetryAttempts = 3 }, ["video_queue"] = new HangfireQueueSettings { DelayInSeconds = 60, RetryAttempts = 5 } });
解决方案
为此,请首先添加HangfireQueueSettings
类,该类将用作我们设置的容器。
public sealed class HangfireQueueSettings { public int RetryAttempts { get; set; } public int DelayInSeconds { get; set; } }
然后,我们添加过滤器本身的实现,当错误后重复执行任务时,该实现将根据队列配置应用设置并监视重试次数:
public class HangfireRetryJobFilter : JobFilterAttribute, IElectStateFilter, IApplyStateFilter { private readonly HangfireQueueSettings _defaultQueueSettings = new HangfireQueueSettings { RetryAttempts = 3, DelayInSeconds = 10 }; private readonly IDictionary<string, HangfireQueueSettings> _settings = new Dictionary<string, HangfireQueueSettings>(); public HangfireQueueSettings this[string queueName] { get { return _settings.TryGetValue(queueName, out HangfireQueueSettings queueSettings) ? queueSettings : _defaultQueueSettings; } set { _settings[queueName] = value; } } public void OnStateElection(ElectStateContext context) { if (!(context.CandidateState is FailedState failedState)) {
代码注释:在实现HangfireRetryJobFilter
类时,以HangfireRetryJobFilter
的AutomaticRetryAttribute
类为基础,因此某些方法的实现与该类的相应方法部分重合。
问题3-如何将任务发送到特定队列?
我设法找到了两种将任务分配给队列的方法:已记录的和-否。
第一种方法 -将相应的属性挂在方法上
[Queue("video_queue")] public void SomeMethod() { } BackgroundJob.Enqueue(() => SomeMethod());
http://docs.hangfire.io/en/latest/background-processing/configuring-queues.html
第二种方法 (未记录)-使用BackgroundJobClient
类
var client = new BackgroundJobClient(); client.Create(() => MyMethod(), new EnqueuedState("video_queue"));
第二种方法的优点是,它不会在Hangfire中创建不必要的依赖关系,并允许您决定任务应在哪个过程中进行。 不幸的是,在官方文档中,我没有提到BackgroundJobClient
类以及如何应用它。 我在解决方案中使用了第二种方法,因此已在实践中对其进行了测试。
结论
在本文中,我们在Hangfire中使用了多个队列的支持来分离不同类型任务的处理。 我们实现了重复执行失败任务的机制,并可能为每个队列进行单独配置,使用作业过滤器扩展了Hangfire的功能,并且还学习了如何将任务发送到所需队列以执行。
我希望本文对某人有用。 我很乐意发表评论。
有用的链接
Hangfire文档
Hangfire源代码
Scott Hanselman-如何在ASP.NET中运行后台任务