在Masstransit用石英脱落

本文将告诉您如何使用Masstransit和Quartz配置Sheduler以将消息发送到总线。 总线是发送消息的人和通过总线订阅消息的人之间的弱连接的抽象。 消息的发送者和接收者仅知道消息的类型(这是接口),但彼此之间一无所知。

Masstransit中的Quartz控制通过队列进行,为此我们分配一个队列:

启动文件

cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue); 

查看所有代码

在访问sheduler时(在我的示例中,这是控制器方法),我们获得了给定队列的ISendEndpoint对象:

 ISendEndpoint _sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}")); 

然后,我们将带有设置的定期邮件发送到特定队列的邮件发送出去:

 _sheduler.ScheduledRecurringMessage = await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>( new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"), new PollExternalSystemSchedule(), new { Message = $"1 sec Quartz {User.Identity.Name}" }); 

PollExternalSystemSchedule频率设置

  public class PollExternalSystemSchedule : DefaultRecurringSchedule { public PollExternalSystemSchedule() { StartTime = DateTime.UtcNow; CronExpression = "* * * * * ? *";//1 sec } } 

可以在此处生成用于设置频率的行。

您已经可以将使用者绑定到Sheduler将向其发送消息的总线(接收消息的人):

 cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e => { e.ConfigureConsumer<ShedulerCommandConsumer>(provider); }); 

使用者绑定到地址(上面的队列地址)和接口:

  public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand> { private ILogger<ShedulerCommandConsumer> _logger; public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger) { _logger = loger; } public Task Consume(ConsumeContext<IRepeatingCommand> context) { _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} "); return Task.CompletedTask; } } 

要停止Sheduler,请致电:

 await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage); 

源代码

我还可以向您介绍Saga状态机的工作(在真实项目中,消息也可以通过它)。

Source: https://habr.com/ru/post/zh-CN469905/


All Articles