MassTransit,Saga和RabbitMQ用于实施流程管理器

曾经,我们面临着使一家大公司的各种工作流程自动化的任务。 对我们来说,这意味着在启动时要组装大约10个系统。 而且,所有组件都必须异步,可扩展,可靠地连接。


简化的过程可以描述为不同系统中的一系列动作,由于需要人工参与,因此无法完全自动化。 例如,选择某些动作或基本协调,这对于进入过程的下一阶段是必需的。


为了解决此问题,我们决定通过数据总线使用消息传递体系结构,MassTransit及其Saga与RabbitMQ完美匹配。


图片

佐贺是什么样的?


Saga是Enterprise Application Integration Templates一书中Process Manager模板的实现,它使您可以将流程描述为状态机。 事件到达入口,Saga执行一系列动作。 同时,在佐贺县的任何阶段,都可能需要做出决定。 然后,她在跟踪器中创建一个任务,并无限期地“入睡”,等待新事件。

Saga基于自动机 。 在从MassTransitStateMachine <>继承的类中声明性地描述了它。 对于Saga,您需要描述所有状态,发生的事件以及发生某些事件时采取的措施。 当前状态存储在数据库中。


首先,我们描述佐贺的所有州和事件,并给它们起易于理解的名称。 看起来像这样:


public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public Event<IStartWorkflowCommand> StartWorkflowCommandReceived { get; set; } public Event<TaskCreatedNotification> TaskCreated { get; set; } public Event<TaskTakedToWorkNotification> TaskTakedToWork { get; set; } public Event<TaskDeclinedNotification> TaskDeclined { get; set; } public Event<TaskApprovedNotification> TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } } 

我们启动了一个局部类,在其中声明了所有状态和事件的列表,以及BuildStateMachine方法,该方法描述了事件与Saga的关联。 为此,在每个事件中传递一个特殊的参数CorrelationId-这是Guid,它在所有连接的系统之间和监视系统中运行。


因此,如果出现任何问题,我们可以从所有连接的系统中还原日志所发生情况的全部图景。 我们在来自Saga的消息中发送CorrelationId,系统将其发送回通知中,以便我们可以将消息与特定的Saga相关联。


这是状态机类本身的一个示例:


 public sealed partial class StateMachine : MassTransitStateMachine<WorkflowSaga> { public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder<WorkflowSaga, IStartWorkflowCommand> WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder<WorkflowSaga, TaskCreatedNotification> WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder<WorkflowSaga, TaskTakedToWorkNotification> WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder<WorkflowSaga, TaskApprovedNotification> WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder<WorkflowSaga, TaskDeclinedNotification> WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } } 

构造函数描述状态。 为了保持可读性,使用单独的方法来接收每个事件。 构造消息的整个逻辑是在消息本身中执行的,否则,随着系统复杂性的提高,Saga迅速膨胀。


在制定约定和保持可读性时应格外小心。 由于C#的必要性,很难在其中声明状态和动作的描述。 即使对于简单的状态机,真正的地狱也开始了。


现在谈谈SagaInstance。 SagaInstance是从SagaStateMachineInstance继承的类。 它由表征状态机的对象和字段组成。 粗略地说,这就是佐贺的记忆。 我们存储了她一生所需的所有Saga数据。 此类中还描述了在工作过程中此数据更改的逻辑。


这是一个例子:


 public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy<string> , IModifiedBy<string> { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection<RelatedTask> RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } } 

一个示例显示,在SagaInstance中存储了CorrelationId以将事件与Saga和CurrentState相关联以存储Saga的当前状态。


错误处理


如果在消息处理过程中发生错误,Saga会如何处理? 这是一个重要的问题,因为每个人都希望状态机始终保持一致,即使出了问题。 借助MassTransit,Saga的表现不错。


正如您已经注意到的,在上面的示例中,没有一个try catch块可以处理异常。 原因很简单:在那里不需要它们。 如果在消息处理期间发生异常,则消息将返回到队列,并且所有更改都将回滚。 由于我们在与Saga相同的事务中进行所有数据操作,因此不会关闭该事务。


一般而言,在Saga本身中操纵Saga以外的其他东西是不好的做法。 根据“企业应用程序的集成模板”一书,流程管理器应保持尽可能的精简和笨拙:仅向系统发出命令并监视状态,但他本人不应该做任何事情。


当然,当您需要执行一些补偿操作来处理异常时,会有更复杂的场景。 然后,使用“ .Catch”状态机处理程序来捕获某种类型的异常,然后执行补偿逻辑。


而且,如果您只需要承诺一个例外,那么最好使用观察者(Observer)。


现在想象一下在消息处理期间我们已经执行了Send命令的情况,此后发生了异常。 在此步骤发送的命令将发生什么? 毕竟,所有已经消失的东西都无法退还吗? 但是,这里一切都经过深思熟虑。


配置总线时,可以启用UseInMemoryOutbox选项。 此选项允许您在当前步骤完成之前不发送消息。 如果发生异常,将完全不发送消息。 以下是文档摘录:


 /// <summary> /// Includes an outbox in the consume filter path, which delays outgoing messages until the return path /// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be /// nearly complete with only the ack remaining. If an exception is thrown, the messages are not sent/published. /// </summary> /// <param name="configurator">The pipe configurator</param> public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator) 

测验


乍一看,测试异步状态机仍然是一种乐趣。 但是这里一切都很好。 MassTransit提供了一个很好的测试编写框架,可以完全满足我们测试状态机的所有需求。


该框架为InMemory提供了数据总线(InMemoryTestHarness)的实现,它使您可以绕过RabbitMQ或其他队列发送和接收消息。


好吧,例如:


 [TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarness<GabrielConfigurationRequestSaga, StateMachine> Saga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get<MassTransitStateMachine<WorkflowSaga>>(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga<WorkflowSaga, StateMachine>(StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task<WorkflowSaga> InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send<IStartWorkflowCommand>(command); //    ,  consume    , // ,  Saga  ,    Assert.IsTrue(Harness.Consumed .Select<IStartWorkflowCommand>().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List<RelatedPlannerTask>(); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task<WorkflowSaga> InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select<TaskCreatedNotification>().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer<MockConsumer<ISendEmailMessageWithTemplateCommand>> (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select<ISendEmailMessageWithTemplateCommand>().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } } 

测试运行非常快。 例如,在一台开发人员的计算机上,大约21秒内运行850个测试。


有用的提示


总之,我们根据经验提供了一些有用的提示。


  1. 合同和公交车通信方案最好放在私人空间中。 因此,发送方和接收方的名称不会有差异。 您还可以将带有命名队列和主机的常量放入nuget中。 Nuget每天可定制。 还有一些源代码控制支持nuget,有收费的私人供稿。


  2. 了解发送和发布之间的区别。 如果您有一个订阅者,并且确切知道要将命令发送到的队列的名称,请使用“发送”。 发布旨在发送广播警报。 链接上的详细信息。


  3. 如果您需要构建请求/响应消息,则最好为合同添加响应的队列名称,而不是使用MassTransit自己建议避免的MassTransit的请求/响应方案。 由于这大大降低了可靠性。 您正在失去异步的所有好处。 但是,如果您仍然需要在有限的时间内获得答案,则最好使用直接呼叫。 这最好写在同一本书《企业应用程序集成模板》中。


  4. 传奇应该是瘦的。 尝试将所有繁琐的逻辑传递给其他系统。 Saga必须跳过状态并向左右散布消息。


  5. 将所有消息CorrelationId添加到系统之间运行。 因此,分析日志并将所有消息链接成一张图片要容易得多。 大众运输也是如此。 从CorrelatedBy接口继承时,会将CorrelationId添加到消息中。


    在系统中设置日志和监视,将永远不会造成伤害。 我们在本文中的经验。

Source: https://habr.com/ru/post/zh-CN412793/


All Articles