Perdre du quartz à Masstransit

Cet article vous expliquera comment utiliser Masstransit et Quartz pour configurer un sheduler pour envoyer des messages au bus. Un bus est l'abstraction d'une connexion faible entre ceux qui envoient des messages et ceux qui s'y abonnent via le bus. L'expéditeur et le destinataire des messages ne connaissent que le type de message (c'est l'interface), mais ils ne se connaissent pas.

Le contrôle Quartz dans Masstransit s'effectue via une file d'attente, nous allouons une file d'attente pour cela:

Startup.cs

cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue); 

Voir tout le code

Au point d'accès au sheduler (dans mon exemple, c'est la méthode du contrôleur), nous obtenons l'objet ISendEndpoint pour la file d'attente donnée:

 ISendEndpoint _sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}")); 

Et nous envoyons un message avec des paramètres de délestage périodique des messages à une certaine file d'attente:

 _sheduler.ScheduledRecurringMessage = await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>( new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"), new PollExternalSystemSchedule(), new { Message = $"1 sec Quartz {User.Identity.Name}" }); 

Paramètres de fréquence PollExternalSystemSchedule

  public class PollExternalSystemSchedule : DefaultRecurringSchedule { public PollExternalSystemSchedule() { StartTime = DateTime.UtcNow; CronExpression = "* * * * * ? *";//1 sec } } 

La ligne de réglage de la fréquence peut être générée ici .

Vous pouvez déjà lier les consommateurs au bus auquel le sheduler enverra des messages (ceux qui reçoivent le message):

 cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e => { e.ConfigureConsumer<ShedulerCommandConsumer>(provider); }); 

Le consommateur est lié à l'adresse (adresse de file d'attente ci-dessus) et à l'interface:

  public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand> { private ILogger<ShedulerCommandConsumer> _logger; public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger) { _logger = loger; } public Task Consume(ConsumeContext<IRepeatingCommand> context) { _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} "); return Task.CompletedTask; } } 

Pour arrêter le sheduler, appelez:

 await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage); 

Code source

Je peux aussi vous parler du travail de la machine à états Saga (dans un vrai projet, les messages peuvent aussi le traverser).

Source: https://habr.com/ru/post/fr469905/


All Articles