本文将告诉您如何使用Masstransit和Quartz配置Sheduler以将消息发送到总线。 总线是发送消息的人和通过总线订阅消息的人之间的弱连接的抽象。 消息的发送者和接收者仅知道消息的类型(这是接口),但彼此之间一无所知。
Masstransit中的Quartz控制通过队列进行,为此我们分配一个队列:
启动文件cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue);
查看所有代码在访问sheduler时(在我的示例中,这是控制器方法),我们获得了给定队列的
ISendEndpoint对象:
ISendEndpoint _sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}"));
然后,我们将带有设置的定期邮件发送到特定队列的邮件发送出去:
_sheduler.ScheduledRecurringMessage = await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>( new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"), new PollExternalSystemSchedule(), new { Message = $"1 sec Quartz {User.Identity.Name}" });
PollExternalSystemSchedule频率设置
public class PollExternalSystemSchedule : DefaultRecurringSchedule { public PollExternalSystemSchedule() { StartTime = DateTime.UtcNow; CronExpression = "* * * * * ? *";
可以在
此处生成用于设置频率的行。
您已经可以将使用者绑定到Sheduler将向其发送消息的总线(接收消息的人):
cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e => { e.ConfigureConsumer<ShedulerCommandConsumer>(provider); });
使用者绑定到地址(上面的队列地址)和接口:
public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand> { private ILogger<ShedulerCommandConsumer> _logger; public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger) { _logger = loger; } public Task Consume(ConsumeContext<IRepeatingCommand> context) { _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} "); return Task.CompletedTask; } }
要停止Sheduler,请致电:
await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage);
源代码我还可以向您介绍Saga状态机的工作(在真实项目中,消息也可以通过它)。