Shedding dengan Quartz di Masstransit

Artikel ini akan memberi tahu Anda cara menggunakan Masstransit dan Quartz untuk mengonfigurasi sheduler untuk mengirim pesan ke bus. Bus adalah abstraksi dari koneksi yang lemah antara mereka yang mengirim pesan dan mereka yang berlangganan melalui bus. Pengirim dan penerima pesan hanya tahu jenis pesan (ini adalah antarmuka), tetapi mereka tidak tahu apa-apa tentang satu sama lain.

Kontrol kuarsa di Masstransit berlangsung melalui antrian, kami mengalokasikan antrian untuk ini:

Startup.cs

cfg.UseInMemoryScheduler(sheduleConfig.QuartzQueue); 

Lihat semua kode

Pada titik akses ke sheduler (dalam contoh saya, ini adalah metode controller), kita mendapatkan objek ISendEndpoint untuk antrian yang diberikan:

 ISendEndpoint _sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"rabbitmq://{configs.Host}/{configs.QuartzQueue}")); 

Dan kami mengirim pesan dengan pengaturan untuk sheduling pesan berkala ke antrian tertentu:

 _sheduler.ScheduledRecurringMessage = await _sendEndpoint.ScheduleRecurringSend<IRepeatingCommand>( new Uri($"rabbitmq://{configs.Host}/{configs.DestinationQueue}"), new PollExternalSystemSchedule(), new { Message = $"1 sec Quartz {User.Identity.Name}" }); 

Pengaturan frekuensi PollExternalSystemSchedule

  public class PollExternalSystemSchedule : DefaultRecurringSchedule { public PollExternalSystemSchedule() { StartTime = DateTime.UtcNow; CronExpression = "* * * * * ? *";//1 sec } } 

Garis untuk mengatur frekuensi dapat dihasilkan di sini .

Anda sudah dapat mengikat konsumen ke bus tempat sheduler akan mengirim pesan (mereka yang menerima pesan):

 cfg.ReceiveEndpoint(host, sheduleConfig.DestinationQueue, e => { e.ConfigureConsumer<ShedulerCommandConsumer>(provider); }); 

Konsumen terikat ke alamat (alamat antrian di atas) dan antarmuka:

  public class ShedulerCommandConsumer : IConsumer<IRepeatingCommand> { private ILogger<ShedulerCommandConsumer> _logger; public ShedulerCommandConsumer(ILogger<ShedulerCommandConsumer> loger) { _logger = loger; } public Task Consume(ConsumeContext<IRepeatingCommand> context) { _logger.LogInformation($"Call consumer: {typeof(ShedulerCommandConsumer)} {context.Message.Message} {DateTime.UtcNow} "); return Task.CompletedTask; } } 

Untuk menghentikan sheduler, hubungi:

 await _sendEndpoint.CancelScheduledRecurringSend(_sheduler.ScheduledRecurringMessage); 

Kode sumber

Saya juga dapat memberi tahu Anda tentang karya mesin negara Saga (dalam proyek nyata, pesan juga dapat melewatinya).

Source: https://habr.com/ru/post/id469905/


All Articles