
在上一篇文章中,我们研究了反应式体系结构的理论基础。 现在该讨论数据流,实现响应式Erlang / Elixir系统的方式以及其中的消息传递模式:
- 请求回复
- 请求分组响应
- 回应要求
- 发布-订阅
- 反向发布-订阅
- 任务分配
SOA,MSA和消息传递
SOA,MSA-定义用于构建系统的规则的系统体系结构,而消息传递则为其实现提供原语。
我不想推广这种或那种建筑系统的体系结构。 我将对特定项目和业务使用最有效和有用的做法。 无论我们选择哪种范式,最好都以Unix方式创建系统块:具有最小连接性的组件负责单个实体。 API方法使用实体执行最简单的操作。
消息传递-顾名思义,是一个消息代理。 它的主要目的是接收和发出消息。 他负责发送信息的接口,用于在系统内传输信息的逻辑通道的形成,路由和平衡,以及系统级别的故障处理。
正在开发的消息传递并不试图与Rabbitmq竞争或取代Rabbitmq。 其主要特点:
- 经销
可以在群集的所有节点上尽可能接近使用交换点的代码来创建交换点。 - 简单性。
专注于最小化样板代码和可用性。 - 最佳性能。
我们不是在尝试重复Rabbitmq的功能,而只是强调了体系结构和传输层,这在OTP中尽可能简单,从而将成本降至最低。 - 灵活性。
每个服务可以组合许多交换模板。 - 设计固有的容错能力。
- 可扩展性。
消息随着应用程序的增长而增长。 随着负载的增加,您可以将交换点移至单个计算机。
备注。 从代码组织的角度来看,元项目非常适合使用Erlang / Elixir的复杂系统。 所有项目代码都在一个存储库中-一个伞形项目。 同时,微服务尽可能地隔离,并执行负责单独实体的简单操作。 使用这种方法,很容易支持整个系统的API,只需进行更改,就可以方便地编写单元和集成测试。
系统组件直接或通过代理进行交互。 从消息传递的角度来看,每个服务都有几个生命阶段:
- 服务初始化。
在此阶段,将执行服务执行过程和相关性的配置和启动。 - 创建交换点。
该服务可以使用节点配置中指定的静态交换点,也可以动态创建交换点。 - 服务注册。
为了使该服务能够满足请求,必须在交换点进行注册。 - 正常运行。
服务产生有用的工作。 - 关机
有两种关闭方式:常规和紧急关闭。 使用常规服务时,它会与交换点断开连接并停止。 在紧急情况下,消息传递将运行故障转移方案之一。
它看起来很复杂,但是并不是代码中的所有内容都如此令人恐惧。 带注释的代码示例将在稍后的模板分析中给出。
交流交流
交换点是一种消息传递过程,实现与消息传递模板内的组件进行交互的逻辑。 在下面的所有示例中,组件通过交换点进行交互,交换点的组合形成消息传递。
消息交换模式(MEP)
在全球范围内,共享模式可以分为双向和单向。 前者意味着对收到的消息的响应,后者则不是。 客户端-服务器体系结构中双向模式的经典示例是请求-响应模式。 考虑模板及其修改。
请求–响应或RPC
当我们需要从另一个进程获取响应时,将使用RPC。 该过程可以在同一站点或不同大陆上启动。 下面是客户端和服务器通过消息传递进行交互的示意图。

由于消息传递是完全异步的,因此对于客户端,交换分为两个阶段:
要求提交
messaging:request(Exchange, ResponseMatchingTag, RequestDefinition, HandlerProcess).
交换 -交换点的唯一名称
ResponseMatchingTag-处理响应的本地标签。 例如,在发送属于不同用户的几个相同请求的情况下。
RequestDefinition-请求主体
HandlerProcess -PID处理程序。 此过程将收到服务器的响应。
响应处理
handle_info(#'$msg'{exchange = EXCHANGE, tag = ResponseMatchingTag,message = ResponsePayload}, State)
ResponsePayload-服务器响应。
对于服务器,该过程还包括两个阶段:
- 交换点初始化
- 处理传入的请求
让我们用代码说明这个模板。 假设我们需要实现一个提供唯一准确时间方法的简单服务。
服务器代码
在api.hrl中删除服务API的定义:
在time_controller.erl中定义一个服务控制器
客户代码
为了向服务发送请求,您可以在客户端中的任何位置调用消息传递请求API:
case messaging:request(?EXCHANGE, tag, #time_req{opts = #{}}, self()) of ok -> ok; _ ->
在分布式系统中,组件的配置可能会非常不同,并且在请求消息传递时可能尚未启动,否则服务控制器将无法准备服务请求。 因此,我们需要检查消息传递响应并处理失败情况。
成功发送后,客户端将收到来自服务的响应或错误。
在handle_info中处理这两种情况:
handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time{unixtime = Utime}}}, State) -> ?debugVal(Utime), {noreply, State}; handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time_error{code = ErrorCode}}}, State) -> ?debugVal({error, ErrorCode}), {noreply, State};
请求分组响应
最好不要允许传输大量消息。 整个系统的响应能力和稳定运行取决于此。 如果对请求的响应占用了大量内存,则必须细分为多个部分。

我将举几个这样的例子:
- 组件交换二进制数据,例如文件。 将答案分解成小部分,可以有效地处理任何大小的文件,而不会捕获内存溢出。
- 列表。 例如,我们需要从数据库中的一个巨大表中选择所有记录,然后将其转移到另一个组件。
我称这些答案为机车。 在任何情况下,1024个1 MB消息都比单个1 GB消息要好。
在Erlang集群中,我们获得了额外的收益-减少了交换点和网络的负载,因为答案会绕过交换点立即发送给收件人。
回应要求
这是用于构建交互式系统的RPC模式的相当罕见的修改。

发布-订阅(数据分发树)
面向事件的系统在数据可用时将数据传递给消费者。 因此,与拉动或轮询相比,系统更倾向于推送模型。 此功能使您不会通过不断查询和等待数据来浪费资源。
该图显示了将消息分发给订阅特定主题的消费者的过程。

使用此模板的经典示例是状态的分布:计算机游戏中的游戏世界,交易所的市场数据,数据馈送中的有用信息。
考虑订户代码:
init(_Args) ->
源可以在任何方便的地方调用发布后功能:
messaging:publish_message(Exchange, Key, Message).
Exchange-交换点的名称,
键 -路由键
消息 -有效负载
反向发布-订阅

通过扩展pub-sub,您可以获得一个便于记录的模式。 来源和消费者的集合可以完全不同。 该图显示了一个有一个消费者和许多来源的情况。
任务分配模式
在几乎每个项目中,都会出现延迟处理的任务,例如生成报告,传递通知,从第三方系统接收数据。 通过添加处理程序,可以轻松地扩展执行这些任务的系统的吞吐量。 对我们而言,剩下的就是形成处理程序集群,并在它们之间平均分配任务。
考虑3个处理程序示例出现的情况。 即使在任务分配阶段,也会出现分配公平和处理器溢出的问题。 循环分发将负责正义,并且为了避免处理程序溢出的情况,我们引入了prefetch_limit限制。 在瞬态模式下, prefetch_limit将阻止一个处理程序接收所有任务。
消息管理队列和处理优先级。 处理程序会在任务可用时接收任务。 任务可能成功或失败:
messaging:ack(Tack)
-在成功处理消息的情况下调用messaging:nack(Tack)
-在所有紧急情况下调用。 任务返回后,消息传递会将其转移到另一个处理程序。

假设在处理三个任务期间发生了复杂的故障:处理程序1,在接收到任务之后,在可以与交换点进行通信之前崩溃了。 在这种情况下,确认超时到期后的交换点会将作业转移到另一个处理程序。 处理程序3由于某种原因放弃了该任务并发送了nack,结果该任务也传递给另一个成功完成该处理程序的处理程序。
初步结果
我们分解了分布式系统的基本构建块,并对它们在Erlang / Elixir中的应用有了基本的了解。
通过组合基本模式,您可以构建复杂的范例来解决新出现的问题。
在本周期的最后部分,我们将考虑服务的组织,路由和平衡的一般问题,并讨论系统的可伸缩性和容错性的实际方面。
第二部分结束。
照片马里乌斯·克里斯滕森
websequencediagrams.com制作的插图