MassTransit, Saga, dan RabbitMQ untuk menerapkan manajer proses

Suatu kali, kami menghadapi tugas mengotomatisasi berbagai alur kerja di sebuah perusahaan besar. Bagi kami, ini berarti mengumpulkan sekitar 10 sistem pada saat peluncuran. Selain itu, semuanya harus terhubung secara tidak sinkron, terukur, andal.


Proses yang disederhanakan dapat digambarkan sebagai urutan tindakan dalam sistem yang berbeda, yang tidak dapat sepenuhnya otomatis, karena membutuhkan partisipasi manusia. Misalnya, untuk memilih tindakan tertentu atau koordinasi elementer, yang diperlukan untuk pindah ke tahap proses selanjutnya.


Untuk mengatasi masalah ini, kami memutuskan untuk menggunakan arsitektur perpesanan melalui bus data, dan MassTransit dengan Saga-nya bersamaan dengan RabbitMQ sangat cocok untuk kami.


gambar

Seperti apa Saga?


Saga adalah implementasi templat Manajer Proses dari buku Templat Integrasi Aplikasi Perusahaan , yang memungkinkan Anda untuk menggambarkan proses sebagai mesin keadaan. Suatu peristiwa tiba di pintu masuk, Saga melakukan serangkaian tindakan. Pada saat yang sama, pada setiap tahap Saga, keputusan seseorang mungkin diperlukan. Kemudian dia membuat tugas di pelacak dan "tertidur" untuk waktu yang tidak terbatas, menunggu acara baru.

Saga didasarkan pada Automatonymous . Ini secara deklaratif dijelaskan dalam kelas yang diwarisi dari MassTransitStateMachine <>. Untuk Saga, Anda perlu menjelaskan semua status, peristiwa yang diambil, dan tindakan yang diambil saat peristiwa tertentu terjadi. Keadaan saat ini disimpan dalam database.


Pertama, kami menggambarkan semua negara bagian dan acara Saga dan memberi mereka nama yang bisa dimengerti. Ini terlihat seperti ini:


public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; } public Event<TaskCreatedNotification> TaskCreated { get; set; } public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; } public Event<TaskDeclinedNotification> TaskDeclined { get; set; } public Event<TaskApprovedNotification> TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } } 

Kami telah meluncurkan kelas parsial, tempat kami mendeklarasikan daftar semua negara bagian dan acara, dan metode BuildStateMachine, yang menggambarkan korelasi peristiwa dengan Saga. Untuk melakukan ini, parameter khusus CorrelationId dilewatkan di setiap peristiwa - ini adalah Guid, yang berjalan antara semua sistem yang terhubung dan dalam sistem pemantauan.


Jadi, jika ada masalah yang muncul, kita dapat mengembalikan seluruh gambaran tentang apa yang terjadi dengan log dari semua sistem yang terhubung. Kami mengirim CorrelationId dalam pesan dari Saga, sistem mengirimkannya kembali dalam pemberitahuan sehingga kami dapat menghubungkan pesan tersebut dengan Saga tertentu.


Berikut adalah contoh dari kelas mesin negara itu sendiri:


 public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga> { public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand> WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } } 

Konstruktor menjelaskan negara. Penerimaan setiap acara dilakukan dengan metode terpisah untuk menjaga keterbacaan. Seluruh logika membangun pesan dilakukan dalam pesan itu sendiri, jika tidak, dengan meningkatnya kompleksitas sistem, Saga membengkak cukup cepat.


Perhatian harus diberikan dalam mengembangkan konvensi dan menjaga keterbacaan. Karena imperatifitas C #, sangat sulit untuk mendeklarasikan deskripsi negara dan tindakan di dalamnya. Bahkan untuk mesin keadaan sederhana, neraka sungguhan dimulai.


Sekarang beberapa kata tentang SagaInstance. SagaInstance adalah kelas yang diwarisi dari SagaStateMachineInstance. Ini terdiri dari objek dan bidang yang menjadi ciri mesin negara. Secara kasar, ini adalah memori Saga. Kami menyimpan semua data Saga yang dia butuhkan sepanjang hidupnya. Juga di kelas ini dijelaskan logika perubahan dalam data ini dalam perjalanan kerja.


Berikut ini sebuah contoh:


 public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy<string> , IModifiedBy<string> { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection<RelatedTask> RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } } 

Contoh menunjukkan bahwa dalam SagaInstance CorrelationId disimpan untuk korelasi peristiwa dengan Saga dan CurrentState untuk menyimpan keadaan Saga saat ini.


Menangani kesalahan


Apa yang terjadi pada Saga jika terjadi kesalahan selama pemrosesan pesan? Ini adalah masalah penting, karena semua orang ingin mesin negara untuk selalu tetap konsisten, bahkan jika ada yang salah. Dan dengan MassTransit, Saga baik-baik saja.


Seperti yang sudah Anda perhatikan, pada contoh di atas tidak ada satu pun blok coba tangkap untuk menangani pengecualian. Alasannya sederhana: mereka tidak diperlukan di sana. Jika pengecualian terjadi selama pemrosesan pesan, pesan dikembalikan ke antrian dan semua perubahan dibatalkan. Karena kami melakukan semua manipulasi data dalam transaksi yang sama dengan Saga, transaksi tidak akan ditutup.


Secara umum, manipulasi sesuatu selain Saga dalam Saga itu sendiri adalah praktik yang buruk. Menurut buku "Template Integrasi untuk Aplikasi Perusahaan", manajer proses harus tetap setipis dan sebodoh mungkin: berikan saja perintah ke sistem dan pantau statusnya, tetapi dia sendiri tidak boleh melakukan apa pun.


Tentu saja, ada skenario yang lebih kompleks ketika Anda perlu melakukan beberapa tindakan kompensasi untuk menangani pengecualian. Kemudian pengendali mesin status ".Catch" digunakan untuk menangkap pengecualian dari tipe tertentu dan kemudian menjalankan logika kompensasi.


Dan jika Anda hanya perlu berjanji pengecualian, maka lebih baik menggunakan pengamat (Observer).


Sekarang bayangkan situasi bahwa kita telah menjalankan perintah Kirim selama pemrosesan pesan, setelah pengecualian terjadi. Apa yang akan terjadi pada perintah yang dikirim pada langkah ini? Lagi pula, semua yang telah terbang tidak dapat dikembalikan? Tapi di sini semuanya dipikirkan.


Saat mengkonfigurasi bus, Anda dapat mengaktifkan opsi UseInMemoryOutbox. Opsi ini memungkinkan Anda untuk tidak mengirim pesan sampai langkah saat ini selesai. Jika pengecualian terjadi, pesan tidak akan dikirim sama sekali. Berikut adalah kutipan dari dokumentasi:


 /// <summary> /// Includes an outbox in the consume filter path, which delays outgoing messages until the return path /// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be /// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published. /// </summary> /// <param name="configurator">The pipe configurator</param> public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator) 

Tes


Sepintas, pengujian mesin keadaan asinkron masih menyenangkan. Tapi di sini semuanya baik-baik saja. MassTransit menyediakan kerangka kerja penulisan tes yang baik yang sepenuhnya memenuhi semua kebutuhan kita untuk menguji mesin negara.


Kerangka kerja ini menyediakan InMemory dengan implementasi bus data (InMemoryTestHarness), yang memungkinkan Anda untuk mengirim dan menerima pesan yang melewati RabbitMQ atau antrian lainnya.


Nah, sebagai contoh:


 [TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, StateMachine> Saga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get<MassTransitStateMachine<WorkflowSaga>>(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task<WorkflowSaga> InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send<IStartWorkflowCommand>(command); //    ,  consume    , // ,  Saga  ,    Assert.IsTrue(Harness.Consumed .Select<IStartWorkflowCommand>().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>(); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task<WorkflowSaga> InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select<TaskCreatedNotification>().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>> (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select<ISendEmailMessageWithTemplateCommand>().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } } 

Tes berjalan cukup cepat. Misalnya, pada komputer pengembang tunggal, 850 tes berjalan dalam waktu sekitar 21 detik.


Tips Berguna


Sebagai kesimpulan, kami memberikan daftar tips yang berguna berdasarkan pengalaman kami.


  1. Kontrak dan skema komunikasi bus paling baik ditempatkan di nuget pribadi. Jadi Anda tidak akan memiliki perbedaan dalam nama di sisi pengirim dan penerima. Anda juga dapat meletakkan konstanta dengan penamaan antrian dan host di nuget. Nuget dapat disesuaikan setiap hari. Dan juga beberapa kontrol sumber mendukung nuget, ada feed pribadi berbayar.


  2. Memahami perbedaan antara Kirim dan Publikasikan. Gunakan Kirim jika Anda memiliki satu pelanggan dan Anda tahu persis nama antrian yang Anda kirimi perintah. Terbit dirancang untuk mengirim peringatan siaran. Detail pada tautan .


  3. Jika Anda perlu membuat pesan Permintaan / Respons, lebih baik menambahkan nama antrian untuk respons ke kontrak daripada menggunakan skema Permintaan / Respons dari MassTransit, yang disarankan oleh MassTransit sendiri untuk dihindari. Karena ini sangat mengurangi keandalan. Anda kehilangan semua manfaat asinkron. Tetapi jika Anda masih perlu mendapatkan jawaban dalam waktu terbatas, lebih baik menggunakan panggilan langsung. Ini paling baik ditulis dalam buku yang sama, "Templat Integrasi Aplikasi Perusahaan".


  4. Saga harus tipis. Cobalah untuk membawa semua logika berat ke sistem lain. Dan Saga harus melompati negara dan menyebarkan pesan ke kiri dan kanan.


  5. Tambahkan ke semua pesan CorrelationId, yang akan berjalan di antara sistem. Jadi jauh lebih mudah untuk menganalisis log dan menautkan semua pesan menjadi satu gambar. Masstransit juga melakukan hal yang sama. CorrelationId ditambahkan ke pesan saat mewarisi dari antarmuka CorrelatedBy.


    Mengatur log dan memonitor di sistem Anda, tidak akan ada salahnya. Pengalaman kami dalam artikel ini .

Source: https://habr.com/ru/post/id412793/


All Articles