MassTransit, Saga und RabbitMQ zur Implementierung eines Prozessmanagers

Einmal standen wir vor der Aufgabe, verschiedene Workflows in einem großen Unternehmen zu automatisieren. Für uns bedeutete dies, zum Zeitpunkt des Starts etwa 10 Systeme zusammenzustellen. Darüber hinaus musste alles asynchron, skalierbar und zuverlässig verbunden werden.


Vereinfachte Prozesse können als eine Abfolge von Aktionen in verschiedenen Systemen beschrieben werden, die nicht vollständig automatisiert werden können, da sie die Teilnahme von Menschen erfordern. Zum Beispiel, um bestimmte Aktionen oder elementare Koordination auszuwählen, die notwendig sind, um zur nächsten Stufe des Prozesses überzugehen.


Um dieses Problem zu lösen, haben wir uns entschieden, die Messaging-Architektur über den Datenbus zu verwenden, und MassTransit mit seiner Saga in Verbindung mit RabbitMQ war perfekt für uns geeignet.


Bild

Wie ist Saga?


Saga ist eine Implementierung der Process Manager-Vorlage aus dem Buch Enterprise Application Integration Templates , mit der Sie einen Prozess als Zustandsmaschine beschreiben können. Ein Ereignis kommt am Eingang an, Saga führt eine Abfolge von Aktionen aus. Gleichzeitig kann in jeder Phase von Saga die Entscheidung einer Person erforderlich sein. Dann erstellt sie eine Aufgabe im Tracker und „schläft“ auf unbestimmte Zeit ein und wartet auf neue Ereignisse.

Saga basiert auf Automatonymous . Es wird deklarativ in einer von MassTransitStateMachine <> geerbten Klasse beschrieben. Für Saga müssen Sie alle Zustände, Ereignisse und Aktionen beschreiben, die ausgeführt werden, wenn bestimmte Ereignisse auftreten. Der aktuelle Status wird in der Datenbank gespeichert.


Zuerst beschreiben wir alle Zustände und Ereignisse von Saga und geben ihnen verständliche Namen. Es sieht so aus:


public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; } public Event<TaskCreatedNotification> TaskCreated { get; set; } public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; } public Event<TaskDeclinedNotification> TaskDeclined { get; set; } public Event<TaskApprovedNotification> TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } } 

Wir haben eine Teilklasse gestartet, in der wir eine Liste aller Zustände und Ereignisse deklarieren, sowie die BuildStateMachine-Methode, die die Korrelation von Ereignissen mit Saga beschreibt. Zu diesem Zweck wird in jedem Ereignis ein spezieller Parameter CorrelationId übergeben - dies ist Guid, der zwischen allen verbundenen Systemen und in Überwachungssystemen ausgeführt wird.


Wenn also Probleme auftreten, können wir das gesamte Bild des Geschehens durch Protokolle aller verbundenen Systeme wiederherstellen. Wir senden CorrelationId in Nachrichten von Saga, die Systeme senden sie in Benachrichtigungen zurück, damit wir die Nachricht mit einer bestimmten Saga korrelieren können.


Hier ist ein Beispiel für die Zustandsmaschinenklasse selbst:


 public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga> { public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand> WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } } 

Der Konstruktor beschreibt die Zustände. Der Empfang jedes Ereignisses erfolgt in einer separaten Methode, um die Lesbarkeit zu gewährleisten. Die gesamte Logik der Nachrichtenerstellung wird in den Nachrichten selbst ausgeführt, andernfalls schwillt Saga mit zunehmender Komplexität des Systems ziemlich schnell an.


Bei der Entwicklung von Konventionen und der Wahrung der Lesbarkeit ist Vorsicht geboten. Aufgrund der Notwendigkeit von C # ist es sehr schwierig, eine Beschreibung der darin enthaltenen Zustände und Aktionen zu deklarieren. Selbst für einfache Zustandsmaschinen beginnt die wahre Hölle.


Nun ein paar Worte zu SagaInstance. SagaInstance ist eine Klasse, die von SagaStateMachineInstance geerbt wurde. Es besteht aus Objekten und Feldern, die die Zustandsmaschine charakterisieren. Grob gesagt ist dies die Erinnerung an Saga. Wir speichern alle Saga-Daten, die sie während ihres gesamten Lebens benötigt. Auch in dieser Klasse wird die Logik der Änderungen dieser Daten im Laufe der Arbeit beschrieben.


Hier ist ein Beispiel:


 public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy<string> , IModifiedBy<string> { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection<RelatedTask> RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } } 

Ein Beispiel zeigt, dass in SagaInstance CorrelationId für die Korrelation von Ereignissen mit Saga und CurrentState zum Speichern des aktuellen Status von Saga gespeichert ist.


Fehlerbehandlung


Was passiert mit Saga, wenn während der Nachrichtenverarbeitung ein Fehler auftritt? Dies ist ein wichtiges Thema, da jeder möchte, dass die Zustandsmaschine immer konsistent bleibt, auch wenn etwas schief gelaufen ist. Und mit MassTransit geht es Saga gut.


Wie Sie bereits bemerkt haben, gibt es in den obigen Beispielen keinen einzigen try catch-Block, um Ausnahmen zu behandeln. Der Grund ist einfach: Sie werden dort nicht benötigt. Wenn während der Nachrichtenverarbeitung eine Ausnahme auftritt, wird die Nachricht an die Warteschlange zurückgegeben und alle Änderungen werden zurückgesetzt. Da wir alle Datenmanipulationen in derselben Transaktion wie Saga durchführen, wird die Transaktion nicht abgeschlossen.


Im Allgemeinen ist die Manipulation von etwas anderem als Saga in Saga selbst eine schlechte Praxis. Laut dem Buch „Integrationsvorlagen für Unternehmensanwendungen“ sollte der Prozessmanager so dünn und dumm wie möglich bleiben: Geben Sie den Systemen nur Befehle und überwachen Sie den Status, aber er selbst sollte nichts tun.


Natürlich gibt es komplexere Szenarien, in denen Sie einige Ausgleichsaktionen ausführen müssen, um Ausnahmen zu behandeln. Dann wird der Zustandsmaschinenhandler ".Catch" verwendet, um eine Ausnahme eines bestimmten Typs abzufangen und dann eine Kompensationslogik auszuführen.


Und wenn Sie nur eine Ausnahme versprechen müssen, ist es besser, einen Beobachter (Observer) zu verwenden.


Stellen Sie sich nun die Situation vor, dass wir den Befehl Senden bereits während der Nachrichtenverarbeitung ausgeführt haben, wonach eine Ausnahme aufgetreten ist. Was passiert mit dem Befehl, der in diesem Schritt gesendet wird? Immerhin kann nicht alles, was weggeflogen ist, zurückgegeben werden? Aber hier ist alles durchdacht.


Bei der Konfiguration des Busses können Sie die Option UseInMemoryOutbox aktivieren. Mit dieser Option können Sie keine Nachrichten senden, bis der aktuelle Schritt abgeschlossen ist. Wenn eine Ausnahme auftritt, werden überhaupt keine Nachrichten gesendet. Hier ist ein Auszug aus der Dokumentation:


 /// <summary> /// Includes an outbox in the consume filter path, which delays outgoing messages until the return path /// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be /// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published. /// </summary> /// <param name="configurator">The pipe configurator</param> public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator) 

Tests


Auf den ersten Blick ist das Testen einer asynchronen Zustandsmaschine immer noch ein Vergnügen. Aber hier ist alles in Ordnung. MassTransit bietet ein gutes Framework für das Schreiben von Tests, das alle unsere Anforderungen zum Testen einer Zustandsmaschine vollständig erfüllt.


Das Framework bietet InMemory eine Implementierung des Datenbusses (InMemoryTestHarness), mit der Sie Nachrichten unter Umgehung von RabbitMQ oder einer anderen Warteschlange senden und empfangen können.


Nun, als Beispiel:


 [TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, StateMachine> Saga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get<MassTransitStateMachine<WorkflowSaga>>(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task<WorkflowSaga> InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send<IStartWorkflowCommand>(command); //    ,  consume    , // ,  Saga  ,    Assert.IsTrue(Harness.Consumed .Select<IStartWorkflowCommand>().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>(); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task<WorkflowSaga> InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select<TaskCreatedNotification>().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>> (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select<ISendEmailMessageWithTemplateCommand>().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } } 

Tests laufen ziemlich schnell. Auf dem Computer eines einzelnen Entwicklers werden beispielsweise 850 Tests in etwa 21 Sekunden ausgeführt.


Hilfreiche Ratschläge


Abschließend geben wir eine Liste nützlicher Tipps, die auf unseren Erfahrungen basieren.


  1. Verträge und Buskommunikationsschemata werden am besten in einem privaten Nuget platziert. Sie werden also keine Unterschiede in den Namen auf der sendenden und der empfangenden Seite haben. Sie können auch Konstanten mit Namenswarteschlangen und Hosts in Nuget einfügen. Nuget kann pro Tag angepasst werden. Und auch einige Quellcodeverwaltungen unterstützen Nuget, es gibt bezahlte private Feeds.


  2. Verstehen Sie die Unterschiede zwischen Senden und Veröffentlichen. Verwenden Sie Senden, wenn Sie einen Abonnenten haben und den Namen der Warteschlange, an die Sie den Befehl senden, genau kennen. Publish dient zum Senden von Broadcast-Warnungen. Details zum Link .


  3. Wenn Sie eine Anforderungs- / Antwortnachricht erstellen müssen, ist es besser, den Warteschlangennamen für die Antwort zum Vertrag hinzuzufügen, als das Anforderungs- / Antwortschema von MassTransit zu verwenden, das MassTransit selbst vermeiden möchte. Da dies die Zuverlässigkeit stark reduziert. Sie verlieren alle Vorteile der Asynchronität. Wenn Sie jedoch in einer begrenzten Zeit immer noch eine Antwort benötigen, ist es besser, einen direkten Anruf zu verwenden. Dies wird am besten im selben Buch „Enterprise Application Integration Templates“ geschrieben.


  4. Saga sollte dünn sein. Versuchen Sie, die gesamte schwere Logik auf andere Systeme zu übertragen. Und Saga muss durch die Zustände springen und Nachrichten nach links und rechts streuen.


  5. Zu allen Nachrichten hinzufügen CorrelationId, die zwischen Systemen ausgeführt wird. Dann ist es viel einfacher, die Protokolle zu analysieren und alle Nachrichten zu einem einzigen Bild zu verknüpfen. Masstransit macht das auch. CorrelationId wird Nachrichten beim Erben von der CorrelatedBy-Schnittstelle hinzugefügt.


    Richten Sie Protokolle und Überwachung in Ihren Systemen ein, es wird nie schaden. Unsere Erfahrung in diesem Artikel .

Source: https://habr.com/ru/post/de412793/


All Articles