Este artículo le dirá cómo usar Masstransit y Quartz para configurar un programador para enviar mensajes al bus. Un bus es una abstracción de una conexión débil entre quienes envían mensajes y quienes se suscriben a ellos a través del bus. El remitente y el receptor de los mensajes solo conocen el tipo de mensaje (esta es la interfaz), pero no saben nada el uno del otro.
El control de Quartz en Masstransit se realiza a través de una cola, asignamos una cola para esto:
Startup.cscfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue);
Ver todo el códigoEn el punto de acceso al sheduler (en mi ejemplo, este es el método del controlador), obtenemos el objeto
ISendEndpoint para la cola dada:
ISendEndpoint _sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}"));
Y enviamos un mensaje con configuraciones para la eliminación periódica de mensajes a una determinada cola:
_sheduler.ScheduledRecurringMessage = await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>( new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"), new PollExternalSystemSchedule(), new { Message = $"1 sec Quartz {User.Identity.Name}" });
PollExternalSystemSchedule configuración de
frecuencia public class PollExternalSystemSchedule : DefaultRecurringSchedule { public PollExternalSystemSchedule() { StartTime = DateTime.UtcNow; CronExpression = "* * * * * ? *";
La línea para establecer la frecuencia se puede generar
aquí .
Ya puede vincular a los consumidores con el autobús al que el programador enviará mensajes (los que reciben el mensaje):
cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e => { e.ConfigureConsumer<ShedulerCommandConsumer>(provider); });
El consumidor está vinculado a la dirección (dirección de la cola arriba) y la interfaz:
public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand> { private ILogger<ShedulerCommandConsumer> _logger; public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger) { _logger = loger; } public Task Consume(ConsumeContext<IRepeatingCommand> context) { _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} "); return Task.CompletedTask; } }
Para detener el programador, llame al:
await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage);
Código fuenteTambién puedo hablar sobre el trabajo de la máquina de estado Saga (en un proyecto real, los mensajes también pueden pasar por él).