Mass transit, Saga y RabbitMQ para implementar un administrador de procesos

Una vez, nos enfrentamos a la tarea de automatizar varios flujos de trabajo en una gran empresa. Para nosotros, esto significaba reunir unos 10 sistemas en el momento del lanzamiento. Además, todo tenía que estar conectado de forma asíncrona, escalable y confiable.


El proceso simplificado puede describirse como una secuencia de acciones en diferentes sistemas, que no pueden automatizarse completamente, ya que requiere la participación humana. Por ejemplo, para seleccionar ciertas acciones o coordinación elemental, que es necesaria para pasar a la siguiente etapa del proceso.


Para resolver este problema, decidimos utilizar la arquitectura de mensajería a través del bus de datos, y Mass Transit con su Saga junto con RabbitMQ nos queda perfectamente.


imagen

¿Cómo es Saga?


Saga es una implementación de la plantilla Process Manager del libro Enterprise Application Integration Templates , que le permite describir un proceso como una máquina de estado. Un evento llega a la entrada, Saga realiza una secuencia de acciones. Al mismo tiempo, en cualquier etapa de Saga, la decisión de una persona puede ser requerida. Luego crea una tarea en el rastreador y "se queda dormida" por un tiempo indefinido, esperando nuevos eventos.

Saga se basa en Automatonymous . Se describe declarativamente en una clase heredada de Mass transitStateMachine <>. Para Saga, debe describir todos los estados, eventos tomados y acciones tomadas cuando ocurren ciertos eventos. El estado actual se almacena en la base de datos.


Primero, describimos todos los estados y eventos de Saga y les damos nombres comprensibles. Se ve así:


public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; } public Event<TaskCreatedNotification> TaskCreated { get; set; } public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; } public Event<TaskDeclinedNotification> TaskDeclined { get; set; } public Event<TaskApprovedNotification> TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } } 

Hemos lanzado una clase parcial, donde declaramos una lista de todos los estados y eventos, y el método BuildStateMachine, que describe la correlación de eventos con Saga. Para hacer esto, se pasa un parámetro especial CorrelationId en cada evento; este es Guid, que se ejecuta entre todos los sistemas conectados y en los sistemas de monitoreo.


Por lo tanto, si surge algún problema, podemos restaurar la imagen completa de lo que está sucediendo mediante registros de todos los sistemas conectados. Enviamos CorrelationId en mensajes de Saga, los sistemas lo envían de vuelta en notificaciones para que podamos correlacionar el mensaje con una Saga específica.


Aquí hay un ejemplo de la clase de máquina de estado en sí:


 public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga> { public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand> WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } } 

El constructor describe los estados. La recepción de cada evento se realiza en un método separado para mantener la legibilidad. Toda la lógica de construir mensajes se lleva a cabo en los mensajes mismos, de lo contrario, con la creciente complejidad del sistema, Saga se hincha bastante rápido.


Se debe tener cuidado al desarrollar convenciones y mantener la legibilidad. Debido al imperativo de C #, es muy difícil declarar una descripción de estados y acciones en él. Incluso para máquinas de estado simples, comienza el verdadero infierno.


Ahora unas pocas palabras sobre SagaInstance. SagaInstance es una clase heredada de SagaStateMachineInstance. Se compone de objetos y campos que caracterizan la máquina de estados. En términos generales, este es el recuerdo de Saga. Almacenamos todos los datos de Saga que necesitará durante toda su vida. También en esta clase se describe la lógica de los cambios en estos datos a medida que trabaja.


Aquí hay un ejemplo:


 public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy<string> , IModifiedBy<string> { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection<RelatedTask> RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } } 

Un ejemplo muestra que en SagaInstance CorrelationId se almacena para la correlación de eventos con Saga y CurrentState para almacenar el estado actual de Saga.


Manejo de errores


¿Qué le sucede a Saga si se produce un error durante el procesamiento del mensaje? Este es un tema importante, porque todos quieren que la máquina de estado permanezca siempre consistente, incluso si algo salió mal. Y con Mass Transit, Saga está bien.


Como ya notó, en los ejemplos anteriores no hay un solo bloque catch para manejar excepciones. La razón es simple: no se necesitan allí. Si se produce una excepción durante el procesamiento del mensaje, el mensaje se devuelve a la cola y todos los cambios se revierten. Como estamos haciendo todas las manipulaciones de datos en la misma transacción que Saga, la transacción no se cerrará.


En general, la manipulación de algo distinto de Saga en la propia Saga es una mala práctica. De acuerdo con el libro "Plantillas de integración para aplicaciones empresariales", el administrador de procesos debe permanecer lo más delgado y tonto posible: simplemente dé comandos a los sistemas y monitoree el estado, pero él mismo no debe hacer nada.


Por supuesto, hay escenarios más complejos cuando necesita realizar algunas acciones compensatorias para manejar las excepciones. Luego, el controlador de máquina de estado ".Catch" se utiliza para detectar una excepción de cierto tipo y luego ejecutar la lógica de compensación.


Y si solo necesita prometer una excepción, entonces es mejor usar un observador (Observador).


Ahora imagine la situación en la que ya hemos ejecutado el comando Enviar durante el procesamiento del mensaje, después de lo cual se ha producido una excepción. ¿Qué pasará con el comando enviado en este paso? Después de todo, ¿todo lo que ha volado no puede ser devuelto? Pero aquí todo está pensado.


Al configurar el bus, puede habilitar la opción UseInMemoryOutbox. Esta opción le permite no enviar mensajes hasta que se complete el paso actual. Si se produce una excepción, los mensajes no se enviarán en absoluto. Aquí hay un extracto de la documentación:


 /// <summary> /// Includes an outbox in the consume filter path, which delays outgoing messages until the return path /// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be /// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published. /// </summary> /// <param name="configurator">The pipe configurator</param> public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator) 

Pruebas


A primera vista, probar una máquina de estado asíncrona sigue siendo un placer. Pero aquí todo está bien. Mass transit proporciona un buen marco de escritura de prueba que satisface plenamente todas nuestras necesidades para probar una máquina de estados.


El marco proporciona a InMemory una implementación del bus de datos (InMemoryTestHarness), que le permite enviar y recibir mensajes sin pasar RabbitMQ u otra cola.


Bueno, como ejemplo:


 [TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, StateMachine> Saga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get<MassTransitStateMachine<WorkflowSaga>>(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task<WorkflowSaga> InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send<IStartWorkflowCommand>(command); //    ,  consume    , // ,  Saga  ,    Assert.IsTrue(Harness.Consumed .Select<IStartWorkflowCommand>().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>(); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task<WorkflowSaga> InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select<TaskCreatedNotification>().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>> (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select<ISendEmailMessageWithTemplateCommand>().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } } 

Las pruebas se ejecutan bastante rápido. Por ejemplo, en la computadora de un solo desarrollador, 850 pruebas se ejecutan en aproximadamente 21 segundos.


Consejos útiles


En conclusión, damos una lista de consejos útiles basados ​​en nuestra experiencia.


  1. Los contratos y los esquemas de comunicación en autobús se colocan mejor en un nuget privado. Por lo tanto, no tendrá diferencias en los nombres en los lados emisores y receptores. También puede poner constantes con nombres de colas y hosts en nuget. Nuget es personalizable por día. Y también algunos controles de fuente admiten nuget, hay feeds privados pagados.


  2. Comprende las diferencias entre Enviar y Publicar. Use Enviar si tiene un suscriptor y sabe exactamente el nombre de la cola a la que envía el comando. Publicar está diseñado para enviar alertas de difusión. Detalles en el enlace .


  3. Si necesita crear un mensaje de Solicitud / Respuesta, es mejor agregar el nombre de la cola para la respuesta al contrato que usar el esquema de Solicitud / Respuesta de Mass transit, que Mass mision sugiere evitar. Dado que esto reduce en gran medida la fiabilidad. Estás perdiendo todos los beneficios de la asincronía. Pero si aún necesita obtener una respuesta en un tiempo limitado, es mejor usar una llamada directa. Esto está mejor escrito en el mismo libro, "Plantillas de integración de aplicaciones empresariales".


  4. La saga debería ser delgada. Intenta llevar toda la lógica pesada a otros sistemas. Y Saga debe saltar a través de los estados y difundir mensajes de izquierda a derecha.


  5. Agregue a todos los mensajes CorrelationId, que se ejecutará entre sistemas. Entonces es mucho más fácil analizar los registros y vincular todos los mensajes en una sola imagen. Masstransit también hace lo mismo. CorrelationId se agrega a los mensajes cuando se hereda de la interfaz CorrelatedBy.


    Configure registros y monitoreo en sus sistemas, nunca dolerá. Nuestra experiencia en este artículo .

Source: https://habr.com/ru/post/es412793/


All Articles