Une fois, nous avons été confrontés à la tâche d'automatiser divers flux de travail dans une grande entreprise. Pour nous, cela signifiait constituer une dizaine de systèmes au moment du lancement. De plus, tout devait être connecté de manière asynchrone, évolutive et fiable.
Un processus simplifié peut être décrit comme une séquence d'actions dans différents systèmes, qui ne peut pas être entièrement automatisé, car il nécessite la participation humaine. Par exemple, pour sélectionner certaines actions ou coordination élémentaire, ce qui est nécessaire pour passer à l'étape suivante du processus.
Pour résoudre ce problème, nous avons décidé d'utiliser l'architecture de messagerie via le bus de données, et MassTransit avec sa Saga en collaboration avec RabbitMQ nous convenait parfaitement.

À quoi ressemble Saga?
Saga est une implémentation du modèle Process Manager du livre
Enterprise Application Integration Templates , qui vous permet de décrire un processus comme une machine à états. Un événement arrive à l'entrée, Saga effectue une séquence d'actions. Dans le même temps, à n'importe quel stade de la saga, la décision d'une personne peut être requise. Elle crée ensuite une tâche dans le tracker et «s'endort» pendant une durée indéterminée, en attendant de nouveaux événements.
Saga est basée sur Automatonymous . Il est décrit de manière déclarative dans une classe héritée de MassTransitStateMachine <>. Pour Saga, vous devez décrire tous les états, événements pris et actions prises lorsque certains événements se produisent. L'état actuel est stocké dans la base de données.
Tout d'abord, nous décrivons tous les états et événements de Saga et leur donnons des noms compréhensibles. Cela ressemble à ceci:
public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; } public Event<TaskCreatedNotification> TaskCreated { get; set; } public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; } public Event<TaskDeclinedNotification> TaskDeclined { get; set; } public Event<TaskApprovedNotification> TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } }
Nous avons lancé une classe partielle, où nous déclarons une liste de tous les états et événements, et la méthode BuildStateMachine, qui décrit la corrélation des événements avec Saga. Pour ce faire, un paramètre spécial CorrelationId est transmis dans chaque événement - il s'agit de Guid, qui s'exécute entre tous les systèmes connectés et dans les systèmes de surveillance.
Ainsi, si des problèmes surviennent, nous pouvons restaurer l'image complète de ce qui se passe avec les journaux de tous les systèmes connectés. Nous envoyons CorrelationId dans les messages de Saga, les systèmes le renvoient dans les notifications afin que nous puissions corréler le message avec une Saga spécifique.
Voici un exemple de la classe de machine d'état elle-même:
public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga> { public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand> WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } }
Le constructeur décrit les états. La réception de chaque événement se fait selon une méthode distincte afin de maintenir la lisibilité. Toute la logique de construction des messages s'effectue dans les messages eux-mêmes, sinon, avec la complexité croissante du système, Saga se gonfle assez rapidement.
Il faut veiller à élaborer des conventions et à maintenir la lisibilité. En raison de l'impérativité de C #, il est très difficile de déclarer une description des états et des actions qu'il contient. Même pour les machines à états simples, le véritable enfer commence.
Maintenant, quelques mots sur SagaInstance. SagaInstance est une classe héritée de SagaStateMachineInstance. Il se compose d'objets et de champs qui caractérisent la machine à états. En gros, c'est le souvenir de Saga. Nous stockons toutes les données Saga dont elle aura besoin tout au long de sa vie. Dans cette classe est également décrite la logique des modifications de ces données au cours du travail.
Voici un exemple:
public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy<string> , IModifiedBy<string> { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection<RelatedTask> RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } }
Un exemple montre que dans SagaInstance CorrelationId est stocké pour la corrélation des événements avec Saga et CurrentState pour stocker l'état actuel de Saga.
Gestion des erreurs
Qu'arrive-t-il à Saga si une erreur se produit pendant le traitement des messages? Il s'agit d'un problème important, car tout le monde veut que la machine d'état reste toujours cohérente, même en cas de problème. Et avec MassTransit, Saga se porte bien.
Comme vous l'avez déjà remarqué, dans les exemples ci-dessus, il n'y a pas un seul bloc try catch pour gérer les exceptions. La raison est simple: ils n'y sont pas nécessaires. Si une exception se produit pendant le traitement du message, le message est renvoyé dans la file d'attente et toutes les modifications sont annulées. Étant donné que nous effectuons toutes les manipulations de données dans la même transaction que Saga, la transaction ne sera pas clôturée.
En général, la manipulation de quelque chose d'autre que Saga dans Saga elle-même est une mauvaise pratique. Selon le livre «Modèles d'intégration pour les applications d'entreprise», le gestionnaire de processus devrait rester aussi mince et stupide que possible: il suffit de donner des commandes aux systèmes et de surveiller l'état, mais lui-même ne devrait rien faire.
Bien sûr, il existe des scénarios plus complexes lorsque vous devez effectuer certaines actions de compensation pour gérer les exceptions. Ensuite, le gestionnaire de machine d'état «.Catch» est utilisé pour intercepter une exception d'un certain type, puis exécuter la logique de compensation.
Et si vous avez juste besoin d'annoncer une exception, il est préférable d'utiliser un observateur (Observer).
Imaginez maintenant la situation dans laquelle nous avons déjà exécuté la commande Envoyer pendant le traitement des messages, après quoi une exception s'est produite. Qu'adviendra-t-il de la commande envoyée à cette étape? Après tout, tout ce qui s'est envolé ne peut pas être retourné? Mais ici tout est pensé.
Lors de la configuration du bus, vous pouvez activer l'option UseInMemoryOutbox. Cette option vous permet de ne pas envoyer de messages tant que l'étape en cours n'est pas terminée. Si une exception se produit, les messages ne seront pas envoyés du tout. Voici un extrait de la documentation:
/// <summary> /// Includes an outbox in the consume filter path, which delays outgoing messages until the return path /// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be /// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published. /// </summary> /// <param name="configurator">The pipe configurator</param> public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator)
Les tests
À première vue, tester une machine à états asynchrone est toujours un plaisir. Mais ici, tout va bien. MassTransit fournit un bon cadre d'écriture de test qui répond pleinement à tous nos besoins pour tester une machine d'état.
Le cadre fournit à InMemory une implémentation du bus de données (InMemoryTestHarness), qui vous permet d'envoyer et de recevoir des messages en contournant RabbitMQ ou une autre file d'attente.
Eh bien, à titre d'exemple:
[TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, StateMachine> Saga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get<MassTransitStateMachine<WorkflowSaga>>(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task<WorkflowSaga> InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send<IStartWorkflowCommand>(command); // , consume , // , Saga , Assert.IsTrue(Harness.Consumed .Select<IStartWorkflowCommand>().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>(); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task<WorkflowSaga> InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select<TaskCreatedNotification>().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>> (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select<ISendEmailMessageWithTemplateCommand>().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } }
Les tests se déroulent assez rapidement. Par exemple, sur l'ordinateur d'un seul développeur, 850 tests s'exécutent en 21 secondes environ.
Conseils utiles
En conclusion, nous donnons une liste de conseils utiles basés sur notre expérience.
Les contrats et les systèmes de communication par bus sont mieux placés dans une pépite privée. Ainsi, vous n'aurez pas de différences de noms du côté de l'envoi et du côté de réception. Vous pouvez également mettre des constantes avec des files d'attente de noms et des hôtes dans nuget. Nuget est personnalisable par jour. Et aussi certains contrôles de source prennent en charge nuget, il existe des flux privés payants.
Comprendre les différences entre envoyer et publier. Utilisez Envoyer si vous avez un abonné et que vous connaissez exactement le nom de la file d'attente à laquelle vous envoyez la commande. Publier est conçu pour envoyer des alertes de diffusion. Détails sur le lien .
Si vous devez créer un message de demande / réponse, il est préférable d'ajouter le nom de file d'attente pour la réponse au contrat plutôt que d'utiliser le schéma de demande / réponse de MassTransit, que MassTransit lui-même suggère d'éviter. Étant donné que cela réduit considérablement la fiabilité. Vous perdez tous les avantages de l'asynchronie. Mais si vous avez toujours besoin d'obtenir une réponse dans un temps limité, il est préférable d'utiliser un appel direct. Ceci est mieux écrit dans le même livre, «Modèles d'intégration d'applications d'entreprise».
La saga devrait être mince. Essayez de transporter toute la logique lourde vers d'autres systèmes. Et Saga doit sauter à travers les états et disperser les messages de gauche à droite.
Ajoutez à tous les messages CorrelationId, qui s'exécutera entre les systèmes. Il est donc beaucoup plus facile d'analyser les journaux et de lier tous les messages en une seule image. Masstransit fait de même. CorrelationId est ajouté aux messages lors de l'héritage de l'interface CorrelatedBy.
Configurez les journaux et la surveillance dans vos systèmes, cela ne fera jamais de mal. Notre expérience dans cet article .