ذات مرة ، واجهنا مهمة أتمتة عمليات سير العمل المختلفة في شركة كبيرة. بالنسبة لنا ، كان هذا يعني تجميع حوالي 10 أنظمة في وقت الإطلاق. علاوة على ذلك ، كان يجب توصيل كل شيء بشكل غير متزامن ، وقابل للتوسع ، وموثوق.
يمكن وصف العملية المبسطة بأنها سلسلة من الإجراءات في أنظمة مختلفة ، والتي لا يمكن أن تكون مؤتمتة بالكامل ، لأنها تتطلب مشاركة بشرية. على سبيل المثال ، لتحديد إجراءات معينة أو التنسيق الأولي ، وهو أمر ضروري للانتقال إلى المرحلة التالية من العملية.
لحل هذه المشكلة ، قررنا استخدام بنية المراسلة عبر ناقل البيانات ، و MassTransit مع Saga بالتزامن مع RabbitMQ يناسبنا تمامًا.

كيف تبدو ساجا؟
Saga عبارة عن تطبيق لقالب إدارة العمليات من كتاب
قوالب تكامل تطبيق Enterprise ، والذي يسمح لك بوصف العملية كآلة حالة. يصل حدث عند المدخل ، وتقوم Saga بسلسلة من الإجراءات. في نفس الوقت ، في أي مرحلة من مراحل Saga ، قد يكون قرار الشخص مطلوبًا. ثم تقوم بإنشاء مهمة في أداة التتبع و "تغفو" لفترة غير محددة ، في انتظار أحداث جديدة.
تستند Saga على Automatonymous . يوصف بشكل صريح في فئة موروثة من MassTransitStateMachine <>. بالنسبة إلى Saga ، تحتاج إلى وصف جميع الحالات والأحداث المتخذة والإجراءات المتخذة عند حدوث أحداث معينة. يتم تخزين الحالة الحالية في قاعدة البيانات.
أولاً ، نصف جميع حالات وأحداث ساجا ونمنحهم أسماء مفهومة. يبدو هذا:
public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; } public Event<TaskCreatedNotification> TaskCreated { get; set; } public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; } public Event<TaskDeclinedNotification> TaskDeclined { get; set; } public Event<TaskApprovedNotification> TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } }
لقد أطلقنا فئة جزئية ، حيث نعلن قائمة بجميع الحالات والأحداث ، وأسلوب BuildStateMachine ، الذي يصف علاقة الأحداث مع Saga. للقيام بذلك ، يتم تمرير معلمة خاصة CorrelationId في كل حدث - وهذا هو التوجيه ، الذي يعمل بين جميع الأنظمة المتصلة وأنظمة المراقبة.
وبالتالي ، إذا ظهرت أي مشاكل ، فيمكننا استعادة الصورة الكاملة لما يحدث بالسجلات من جميع الأنظمة المتصلة. نرسل CorrelationId في رسائل من Saga ، وترسلها الأنظمة مرة أخرى في الإشعارات حتى نتمكن من ربط الرسالة بساجا محددة.
فيما يلي مثال لفئة آلة الدولة نفسها:
public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga> { public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand> WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } }
يصف المنشئ الولايات. يتم استقبال كل حدث بطريقة منفصلة من أجل الحفاظ على سهولة القراءة. يتم تنفيذ المنطق الكامل لبناء الرسائل في الرسائل نفسها ، وإلا ، مع التعقيد المتزايد للنظام ، تتضخم ساجا بسرعة كبيرة.
يجب توخي الحذر عند وضع الاتفاقيات والحفاظ على سهولة القراءة. بسبب حتمية C # ، من الصعب جدًا الإعلان عن وصف للحالات والأفعال فيه. حتى بالنسبة لآلات الحالة البسيطة ، يبدأ الجحيم الحقيقي.
الآن بضع كلمات حول SagaInstance. SagaInstance هي فئة موروثة من SagaStateMachineInstance. يتكون من كائنات وحقول تميز جهاز الحالة. تقريبا ، هذه هي ذكرى ساجا. نقوم بتخزين جميع بيانات Saga التي ستحتاجها طوال حياتها. أيضا في هذا الفصل يتم وصف منطق التغييرات في هذه البيانات في سياق العمل.
هنا مثال:
public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy<string> , IModifiedBy<string> { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection<RelatedTask> RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } }
يوضح أحد الأمثلة أنه في SagaInstance CorrelationId يتم تخزينه لربط الأحداث مع Saga و CurrentState لتخزين الحالة الحالية لـ Saga.
معالجة الخطأ
ماذا يحدث ل Saga إذا حدث خطأ أثناء معالجة الرسالة؟ هذه قضية مهمة ، لأن الجميع يريد أن تظل آلة الدولة ثابتة ، حتى لو حدث خطأ ما. ومع MassTransit ، ساجا على ما يرام.
كما لاحظت بالفعل ، في الأمثلة أعلاه ، لا توجد كتلة واحدة لتجربة الصيد للتعامل مع الاستثناءات. والسبب بسيط: ليس هناك حاجة إليها. في حالة حدوث استثناء أثناء معالجة الرسالة ، يتم إرجاع الرسالة إلى قائمة الانتظار ويتم إرجاع جميع التغييرات. نظرًا لأننا نقوم بجميع عمليات التلاعب بالبيانات في نفس معاملة Saga ، فلن يتم إغلاق المعاملة.
بشكل عام ، التلاعب بشيء غير Saga في Saga نفسها هو ممارسة سيئة. وفقًا لكتاب "قوالب التكامل لتطبيقات المؤسسة" ، يجب أن يظل مدير العمليات نحيفًا وغبيًا قدر الإمكان: ما عليك سوى إعطاء الأوامر للأنظمة ومراقبة الحالة ، ولكن لا يجب عليه فعل أي شيء.
بالطبع ، هناك سيناريوهات أكثر تعقيدًا عندما تحتاج إلى تنفيذ بعض إجراءات التعويض للتعامل مع الاستثناءات. ثم يتم استخدام معالج جهاز الحالة ".Catch" لالتقاط استثناء من نوع معين ثم تنفيذ منطق التعويض.
وإذا كنت بحاجة فقط إلى التعهد باستثناء ، فمن الأفضل استخدام مراقب (مراقب).
تخيل الآن الموقف الذي قمنا فيه بالفعل بتنفيذ الأمر إرسال أثناء معالجة الرسالة ، وبعد ذلك حدث استثناء. ماذا سيحدث للأمر المرسل في هذه الخطوة؟ بعد كل شيء ، لا يمكن إرجاع كل ما طار؟ ولكن هنا يعتقد أن كل شيء.
عند تكوين الناقل ، يمكنك تمكين خيار UseInMemoryOutbox. يسمح لك هذا الخيار بعدم إرسال رسائل حتى تكتمل الخطوة الحالية. في حالة حدوث استثناء ، لن يتم إرسال الرسائل على الإطلاق. هنا مقتطف من الوثائق:
/// <summary> /// Includes an outbox in the consume filter path, which delays outgoing messages until the return path /// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be /// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published. /// </summary> /// <param name="configurator">The pipe configurator</param> public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator)
الاختبارات
للوهلة الأولى ، لا يزال اختبار آلة الحالة غير المتزامنة متعة. ولكن هنا كل شيء على ما يرام. يوفر MassTransit إطارًا جيدًا لكتابة اختبار يلبي تمامًا جميع احتياجاتنا لاختبار جهاز الدولة.
يوفر الإطار لـ InMemory تنفيذ ناقل البيانات (InMemoryTestHarness) ، والذي يسمح لك بإرسال واستقبال الرسائل التي تتجاوز RabbitMQ أو قائمة انتظار أخرى.
حسنًا ، كمثال:
[TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, StateMachine> Saga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get<MassTransitStateMachine<WorkflowSaga>>(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task<WorkflowSaga> InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send<IStartWorkflowCommand>(command); // , consume , // , Saga , Assert.IsTrue(Harness.Consumed .Select<IStartWorkflowCommand>().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>(); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task<WorkflowSaga> InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select<TaskCreatedNotification>().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>> (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select<ISendEmailMessageWithTemplateCommand>().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } }
تعمل الاختبارات بسرعة كبيرة. على سبيل المثال ، على كمبيوتر مطور واحد ، يستغرق 850 اختبارًا حوالي 21 ثانية.
نصائح مفيدة
في الختام ، نقدم قائمة من النصائح المفيدة بناءً على تجربتنا.
من الأفضل وضع العقود ومخططات اتصالات الحافلات في كتلة خاصة. لذلك لن يكون لديك اختلافات في الأسماء على جانبي الإرسال والاستقبال. يمكنك أيضًا وضع ثوابت مع قوائم الانتظار والمضيفين في الكتلة. Nuget قابل للتخصيص في اليوم الواحد. كما تدعم بعض عناصر التحكم بالمصادر nuget ، هناك قنوات خاصة مدفوعة الأجر.
افهم الاختلافات بين الإرسال والنشر. استخدم إرسال إذا كان لديك مشترك واحد وكنت تعرف بالضبط اسم قائمة الانتظار التي ترسل إليها الأمر. تم تصميم النشر لإرسال تنبيهات البث. التفاصيل على الرابط .
إذا كنت بحاجة إلى إنشاء رسالة طلب / استجابة ، فمن الأفضل إضافة اسم قائمة الانتظار للاستجابة للعقد بدلاً من استخدام مخطط الطلب / الاستجابة من MassTransit ، والذي يقترح MassTransit نفسه تجنبه. لأن هذا يقلل بشكل كبير من الموثوقية. أنت تخسر كل مزايا التزامن. ولكن إذا كنت لا تزال بحاجة إلى الحصول على إجابة في وقت محدود ، فمن الأفضل استخدام مكالمة مباشرة. من الأفضل كتابة هذا في نفس الكتاب ، "قوالب تكامل تطبيق Enterprise".
يجب أن تكون الملحمة رقيقة. حاول حمل كل المنطق الثقيل إلى أنظمة أخرى. ويجب أن تقفز ساجا عبر الولايات وتبعثر الرسائل إلى اليسار واليمين.
إضافة إلى كافة الرسائل CorrelationId ، والتي سيتم تشغيلها بين الأنظمة. لذلك من الأسهل بكثير تحليل السجلات وربط جميع الرسائل في صورة واحدة. يفعل Masstransit نفس الشيء. تتم إضافة CorrelationId إلى الرسائل عند الوراثة من واجهة CorrelatedBy.
قم بإعداد السجلات والمراقبة في أنظمتك ، فلن يضر ذلك أبدًا. تجربتنا في هذه المقالة .