MassTransit, Saga e RabbitMQ para implementar um gerenciador de processos

Uma vez, enfrentamos a tarefa de automatizar vários fluxos de trabalho em uma grande empresa. Para nós, isso significava reunir cerca de 10 sistemas no momento do lançamento. Além disso, tudo tinha que ser conectado de forma assíncrona, escalável e confiável.


O processo simplificado pode ser descrito como uma sequência de ações em diferentes sistemas, que não podem ser totalmente automatizados, pois requer participação humana. Por exemplo, para selecionar determinadas ações ou coordenação elementar, necessárias para passar para a próxima etapa do processo.


Para resolver esse problema, decidimos usar a arquitetura de mensagens através do barramento de dados, e o MassTransit com sua Saga em conjunto com o RabbitMQ nos convinha perfeitamente.


imagem

Como é a Saga?


Saga é uma implementação do modelo do Process Manager do livro Enterprise Application Integration Templates , que permite descrever um processo como uma máquina de estado. Um evento chega na entrada, Saga realiza uma sequência de ações. Ao mesmo tempo, em qualquer estágio da Saga, a decisão de uma pessoa pode ser necessária. Em seguida, ela cria uma tarefa no rastreador e "adormece" por tempo indeterminado, aguardando novos eventos.

Saga é baseado em Automatonymous . É declarativamente descrito em uma classe herdada de MassTransitStateMachine <>. Para o Saga, você precisa descrever todos os estados, eventos executados e ações executadas quando determinados eventos ocorrem. O estado atual é armazenado no banco de dados.


Primeiro, descrevemos todos os estados e eventos da Saga e damos nomes compreensíveis. É assim:


public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; } public Event<TaskCreatedNotification> TaskCreated { get; set; } public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; } public Event<TaskDeclinedNotification> TaskDeclined { get; set; } public Event<TaskApprovedNotification> TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } } 

Lançamos uma classe parcial, onde declaramos uma lista de todos os estados e eventos, e o método BuildStateMachine, que descreve a correlação de eventos com o Saga. Para fazer isso, um parâmetro especial CorrelationId é passado em cada evento - este é o Guid, que é executado entre todos os sistemas conectados e nos sistemas de monitoramento.


Portanto, se surgirem problemas, podemos restaurar toda a imagem do que está acontecendo pelos logs de todos os sistemas conectados. Enviamos CorrelationId em mensagens da Saga, os sistemas enviam de volta em notificações para que possamos correlacionar a mensagem com uma Saga específica.


Aqui está um exemplo da própria classe de máquina de estado:


 public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga> { public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand> WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } } 

O construtor descreve os estados. A recepção de cada evento é feita em um método separado, a fim de manter a legibilidade. Toda a lógica de construção de mensagens é realizada nas próprias mensagens; caso contrário, com a crescente complexidade do sistema, Saga aumenta rapidamente.


Deve-se tomar cuidado no desenvolvimento de convenções e na manutenção da legibilidade. Devido à imperatividade do C #, é muito difícil declarar uma descrição dos estados e ações nele. Mesmo para máquinas de estado simples, o verdadeiro inferno começa.


Agora, algumas palavras sobre o SagaInstance. SagaInstance é uma classe herdada de SagaStateMachineInstance. Consiste em objetos e campos que caracterizam a máquina de estado. Grosso modo, essa é a memória de Saga. Armazenamos todos os dados da Saga que ela precisará ao longo de sua vida. Também nesta classe é descrita a lógica das mudanças nesses dados no decorrer do trabalho.


Aqui está um exemplo:


 public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy<string> , IModifiedBy<string> { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection<RelatedTask> RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } } 

Um exemplo mostra que em SagaInstance CorrelationId é armazenado para correlação de eventos com Saga e CurrentState para armazenar o estado atual de Saga.


Tratamento de erros


O que acontece com o Saga se ocorrer um erro durante o processamento da mensagem? Esta é uma questão importante, porque todo mundo quer que a máquina de estado permaneça sempre consistente, mesmo que algo dê errado. E com o MassTransit, Saga está indo bem.


Como você já notou, nos exemplos acima, não há um único bloco catch de tentativa para lidar com exceções. O motivo é simples: eles não são necessários lá. Se ocorrer uma exceção durante o processamento da mensagem, a mensagem será retornada à fila e todas as alterações serão revertidas. Como estamos fazendo todas as manipulações de dados na mesma transação que o Saga, a transação não será fechada.


Em geral, a manipulação de algo diferente de Saga na própria Saga é uma má prática. De acordo com o livro “Modelos de integração para aplicativos corporativos”, o gerente de processos deve permanecer o mais fino e burro possível: basta dar comandos aos sistemas e monitorar o status, mas ele próprio não deve fazer nada.


Obviamente, há cenários mais complexos quando você precisa executar algumas ações de compensação para lidar com exceções. Em seguida, o manipulador de máquina de estado “.Catch” é usado para capturar uma exceção de um determinado tipo e, em seguida, executar a lógica de compensação.


E se você apenas precisa prometer uma exceção, é melhor usar um observador (Observer).


Agora imagine a situação em que já executamos o comando Enviar durante o processamento da mensagem, após o qual ocorreu uma exceção. O que acontecerá com o comando enviado nesta etapa? Afinal, tudo o que voou para longe não pode ser devolvido? Mas aqui tudo está pensado.


Ao configurar o barramento, você pode ativar a opção UseInMemoryOutbox. Esta opção permite que você não envie mensagens até que a etapa atual seja concluída. Se ocorrer uma exceção, as mensagens não serão enviadas. Aqui está um trecho da documentação:


 /// <summary> /// Includes an outbox in the consume filter path, which delays outgoing messages until the return path /// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be /// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published. /// </summary> /// <param name="configurator">The pipe configurator</param> public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator) 

Testes


À primeira vista, testar uma máquina de estado assíncrona ainda é um prazer. Mas aqui está tudo bem. O MassTransit fornece uma boa estrutura de escrita de teste que atende totalmente a todas as nossas necessidades para testar uma máquina de estado.


A estrutura fornece ao InMemory uma implementação do barramento de dados (InMemoryTestHarness), que permite enviar e receber mensagens ignorando o RabbitMQ ou outra fila.


Bem, como um exemplo:


 [TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, StateMachine> Saga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get<MassTransitStateMachine<WorkflowSaga>>(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task<WorkflowSaga> InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send<IStartWorkflowCommand>(command); //    ,  consume    , // ,  Saga  ,    Assert.IsTrue(Harness.Consumed .Select<IStartWorkflowCommand>().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>(); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task<WorkflowSaga> InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select<TaskCreatedNotification>().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>> (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select<ISendEmailMessageWithTemplateCommand>().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } } 

Os testes são bem rápidos. Por exemplo, no computador de um único desenvolvedor, 850 testes são executados em cerca de 21 segundos.


Dicas úteis


Em conclusão, damos uma lista de dicas úteis com base em nossa experiência.


  1. Contratos e esquemas de comunicação de barramento são melhor colocados em um nuget privado. Portanto, você não terá diferenças nos nomes nos lados de envio e recebimento. Você também pode colocar constantes com filas de nomes e hosts no nuget. Nuget é personalizável por dia. E também alguns controles de origem suportam nuget, existem feeds privados pagos.


  2. Entenda as diferenças entre Enviar e Publicar. Use Enviar se você tiver um assinante e souber exatamente o nome da fila para a qual você enviou o comando. A publicação foi projetada para enviar alertas de transmissão. Detalhes no link .


  3. Se você precisar criar uma mensagem de Solicitação / Resposta, é melhor adicionar o nome da fila da resposta ao contrato do que usar o esquema de Solicitação / Resposta do MassTransit, o que o próprio MassTransit sugere evitar. Uma vez que isso reduz muito a confiabilidade. Você está perdendo todos os benefícios da assincronia. Mas se você ainda precisar obter uma resposta em um tempo limitado, é melhor usar uma chamada direta. É melhor escrever isso no mesmo livro, "Modelos de integração de aplicativos corporativos".


  4. Saga deve ser magra. Tente levar toda a lógica pesada para outros sistemas. E Saga deve passar pelos estados e espalhar as mensagens para a esquerda e para a direita.


  5. Adicione a todas as mensagens CorrelationId, que serão executadas entre os sistemas. Portanto, é muito mais fácil analisar os logs e vincular todas as mensagens em uma única imagem. Masstransit também faz o mesmo. CorrelationId é adicionado às mensagens ao herdar da interface CorrelatedBy.


    Configure logs e monitoramento em seus sistemas, isso nunca será prejudicial. Nossa experiência neste artigo .

Source: https://habr.com/ru/post/pt412793/


All Articles