Schuppen mit Quarz bei Masstransit

In diesem Artikel erfahren Sie, wie Sie mit Masstransit und Quartz einen Sheduler zum Senden von Nachrichten an den Bus konfigurieren. Ein Bus ist eine Abstraktion einer schwachen Verbindung zwischen denen, die Nachrichten senden, und denen, die sie über den Bus abonnieren. Der Absender und der Empfänger von Nachrichten kennen nur den Nachrichtentyp (dies ist die Schnittstelle), aber sie wissen nichts voneinander.

Das Quartz-Steuerelement in Masstransit erfolgt über eine Warteschlange. Dazu weisen wir eine Warteschlange zu:

Startup.cs

cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue); 

Alle Codes anzeigen

Beim Zugriff auf den Sheduler (in meinem Beispiel ist dies die Controller-Methode) erhalten wir das ISendEndpoint- Objekt für die angegebene Warteschlange:

 ISendEndpoint _sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}")); 

Und wir senden eine Nachricht mit Einstellungen für die regelmäßige Nachrichtenverschiebung an eine bestimmte Warteschlange:

 _sheduler.ScheduledRecurringMessage = await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>( new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"), new PollExternalSystemSchedule(), new { Message = $"1 sec Quartz {User.Identity.Name}" }); 

PollExternalSystemSchedule Frequenzeinstellungen

  public class PollExternalSystemSchedule : DefaultRecurringSchedule { public PollExternalSystemSchedule() { StartTime = DateTime.UtcNow; CronExpression = "* * * * * ? *";//1 sec } } 

Hier kann die Zeile zum Einstellen der Frequenz erzeugt werden .

Sie können die Verbraucher bereits an den Bus binden, an den der Sheduler Nachrichten sendet (diejenigen, die die Nachricht empfangen):

 cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e => { e.ConfigureConsumer<ShedulerCommandConsumer>(provider); }); 

Der Verbraucher ist an die Adresse (Warteschlangenadresse oben) und die Schnittstelle gebunden:

  public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand> { private ILogger<ShedulerCommandConsumer> _logger; public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger) { _logger = loger; } public Task Consume(ConsumeContext<IRepeatingCommand> context) { _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} "); return Task.CompletedTask; } } 

Um den Schuppen zu stoppen, rufen Sie an:

 await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage); 

Quellcode

Ich kann auch über die Arbeit der Saga-Zustandsmaschine sprechen (in einem realen Projekt können auch Nachrichten durchlaufen werden).

Source: https://habr.com/ru/post/de469905/


All Articles