Derramando con Cuarzo en Masstransit

Este artículo le dirá cómo usar Masstransit y Quartz para configurar un programador para enviar mensajes al bus. Un bus es una abstracción de una conexión débil entre quienes envían mensajes y quienes se suscriben a ellos a través del bus. El remitente y el receptor de los mensajes solo conocen el tipo de mensaje (esta es la interfaz), pero no saben nada el uno del otro.

El control de Quartz en Masstransit se realiza a través de una cola, asignamos una cola para esto:

Startup.cs

cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue); 

Ver todo el código

En el punto de acceso al sheduler (en mi ejemplo, este es el método del controlador), obtenemos el objeto ISendEndpoint para la cola dada:

 ISendEndpoint _sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}")); 

Y enviamos un mensaje con configuraciones para la eliminación periódica de mensajes a una determinada cola:

 _sheduler.ScheduledRecurringMessage = await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>( new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"), new PollExternalSystemSchedule(), new { Message = $"1 sec Quartz {User.Identity.Name}" }); 

PollExternalSystemSchedule configuración de frecuencia

  public class PollExternalSystemSchedule : DefaultRecurringSchedule { public PollExternalSystemSchedule() { StartTime = DateTime.UtcNow; CronExpression = "* * * * * ? *";//1 sec } } 

La línea para establecer la frecuencia se puede generar aquí .

Ya puede vincular a los consumidores con el autobús al que el programador enviará mensajes (los que reciben el mensaje):

 cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e => { e.ConfigureConsumer<ShedulerCommandConsumer>(provider); }); 

El consumidor está vinculado a la dirección (dirección de la cola arriba) y la interfaz:

  public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand> { private ILogger<ShedulerCommandConsumer> _logger; public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger) { _logger = loger; } public Task Consume(ConsumeContext<IRepeatingCommand> context) { _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} "); return Task.CompletedTask; } } 

Para detener el programador, llame al:

 await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage); 

Código fuente

También puedo hablar sobre el trabajo de la máquina de estado Saga (en un proyecto real, los mensajes también pueden pasar por él).

Source: https://habr.com/ru/post/469905/


All Articles