了解消息代理。 通过ActiveMQ和Kafka学习消息传递的机制。 第2章ActiveMQ

继续翻译一本小书:
“了解消息经纪人”,
作者:Jakub Korab,出版商:O'Reilly Media,Inc.,出版日期:2017年6月,ISBN:9781492049296。

翻译完成

上一部分: 了解消息代理。 通过ActiveMQ和Kafka学习消息传递的机制。 第1章简介

第二章


Activemq


最好将ActiveMQ描述为经典的消息传递系统。 它写于2004年,旨在满足对开源消息代理的需求。 那时,如果您想在应用程序中使用消息传递,那么唯一的选择就是昂贵的商业产品。

ActiveMQ是作为Java消息服务(JMS)规范的实现而开发的。 做出此决定是为了满足在Apache Geronimo项目(一个开源J2EE应用服务器)中实现符合JMS的消息传递的要求。
实现JMS规范的消息传递系统(或有时称为消息定向的中间件)由以下组件组成:

经纪人

中间件分发消息的中央部分。

顾客

通过代理发送消息的软件。 反过来,它包含以下构件:

  • 使用JMS API进行编码。
  • JMS API是一组接口,用于根据JMS规范中提出的保证与代理进行交互。
  • 提供API实现并与代理进行交互的系统的客户端库。

客户端和代理通过应用程序层协议(也称为交互协议 相互通信 (图2-1) 。 JMS规范将该协议的细节留给了特定的实现。


图2-1。 JMS评论

JMS使用术语“ 提供者”来描述供应商对JMS API底层消息传递系统的实现,其中包括代理及其客户端库。

赞成实施JMS的选择对ActiveMQ作者做出的实施决策产生了深远的影响。 该规范本身就消息传递系统的客户端和与之通信的代理的职责提供了明确的指导,优先考虑了代理分发和传递消息的义务。 客户的主要职责是与他发送的消息的收件人(队列或主题)进行交互。 该规范本身旨在使API与代理的交互相对简单。

稍后我们将看到,该区域对ActiveMQ性能产生了重大影响。 除了代理程序的复杂性之外,Sun Microsystems提供的针对该规范的兼容性软件包还有许多细微差别,它们对性能都有自己的影响。 为了将ActiveMQ视为与JMS兼容,应该考虑所有这些细微差别。

沟通交流


尽管在JMS规范中对API和预期的行为进行了很好的定义,但是实际的客户端-经纪人通信协议被故意排除在规范之外,以便现有的代理可以与JMS兼容。 因此,ActiveMQ可以自由定义自己的交互协议OpenWire。 ActiveMQ JMS客户端库实现以及.Net和C ++:NMS和CMS中的对应版本均使用OpenWire,这是由Apache Software Foundation托管的ActiveMQ子项目。

随着时间的推移,ActiveMQ中增加了对其他交互协议的支持,从而增强了与其他语言和环境交互的能力:

AMQP 1.0

请勿将高级消息队列协议(ISO / IEC 19464:2014)与它的前身0.X混淆,后者在其他消息传递系统(尤其是RabbitMQ)中使用0.9.1实现。 AMQP 1.0是用于在两个节点之间交换消息的通用二进制协议。 它没有客户端或代理的概念,并且包含诸如流控制,事务和各种QoS之类的功能(不超过一次,至少一次和恰好一次)。

脚踩

简单/流文本定向消息协议,一种易于实现的协议,具有数十种不同语言的客户端实现。

Xmpp

可扩展的消息传递和状态协议。 (可扩展消息传递和状态协议)。 该基于XML的协议以前称为Jabber,最初是为聊天系统开发的,但已扩展到其原始用例之外,还包括发布-订阅消息传递。

MQTT

用于机器对机器(M2M)和物联网(IoT)应用程序的轻量级发布-订阅协议(ISO / IEC 20922:2016)。

ActiveMQ还支持在WebSockets上强加上述协议,从而在Web浏览器中的应用程序与代理中的目标之间提供全双工数据交换。

鉴于此,现在当我们谈论ActiveMQ时,我们不再专门引用基于JMS / NMS / CMS库和OpenWire协议的交互堆栈。 最适合此应用程序的语言,平台和外部库的组合和选择正变得越来越流行。 例如,JavaScript应用程序可以在使用Eclipse Paho MQTT库的浏览器中运行,以通过Web套接字将消息发送到ActiveMQ,并且这些消息由使用AMQP通过Apache Qpid Proton库的C ++服务器进程读取。 从这个角度来看,消息传递的格局正在变得越来越多样化。

展望未来,尤其是AMQP将比现在拥有更多的机会,因为既不是客户也不是经纪人的组件正在成为消息传递领域中更加熟悉的一部分。 例如, Apache Qpid Dispatch Router充当消息路由器,客户端直接连接到该消息路由器,从而允许不同的目的地处理不同的地址,并提供分片(分离)的可能性。

使用第三方库和外部组件时,请注意它们具有可变的质量,并且可能与ActiveMQ中提供的功能不兼容。 作为一个非常简单的示例-无法通过MQTT将消息发送到队列(无需在代理中设置路由)。 因此,您将需要花费一些时间来使用选项来确定最适合您的应用程序需求的消息传递系统的堆栈。

性能与可靠性之间的权衡


在深入研究ActiveMQ中点对点消息传递如何工作的细节之前,我们需要先谈谈所有具有大量数据处理功能的系统所面对的问题:性能与可靠性之间的权衡。

应该指导任何接受数据的系统(无论是消息代理还是数据库)在发生故障时如何处理这些数据。 故障有很多种形式,但为简单起见,我们将其范围缩小到系统断电并立即关闭的情况。 在这种情况下,我们需要推测系统中的数据将会发生什么。 如果数据(在这种情况下为消息)位于内存中或在硬件的易失性部分(例如在高速缓存中),则该数据将丢失。 但是,如果将数据发送到非易失性存储(例如磁盘),则在系统恢复工作后将再次可用。

从这个角度来看,如果我们不希望在代理发生故障时丢失消息,则需要将它们写入永久存储。 不幸的是,这种特定解决方案的成本很高。

请注意,将兆字节的数据写入磁盘比写入内存慢100-1000倍。 因此,应用程序开发人员必须确定消息的可靠性是否值得牺牲性能。 此类决策应根据使用场景做出。

性能和可靠性之间的权衡基于一系列选择。 可靠性越高,性能越低。 如果您决定降低系统可靠性,例如仅将消息存储在内存中,则生产率将大大提高。 默认情况下,JMS配置为开箱即用以提高可靠性。 有许多机制可让您配置代理并与代理进行交互,使其在此频谱中的某个位置最适合使用消息传递系统的特定情况。

此折衷适用于单个经纪人级别。 但是,在完成设置单个代理后,可以通过仔细检查消息流并在几个代理之间共享流量来扩展消息系统。 这可以通过为特定的收件人提供他们自己的代理,或者通过在应用程序级别或使用中间组件划分总的消息流来实现。 稍后,我们将更详细地考虑如何考虑经纪人的拓扑。

储存讯息


ActiveMQ附带了许多可插入的消息保留策略。 它们以持久性(persistence)适配器的形式出现,可以将其视为消息存储引擎。 这些包括基于磁盘的解决方案,例如KahaDB和LevelDB,以及通过JDBC使用数据库的能力。 由于前者是最常用的,因此我们将重点讨论它们。

当代理接收持久消息时,它们首先被写入日记中的磁盘。 日志是磁盘上的一种数据结构,您只能在其中添加数据并且由几个文件组成。 代理将传入的消息序列化为对象的独立于协议的表示形式,然后以二进制形式封送,然后将其写入日志的末尾。 该日志包含所有传入消息的日志,以及有关已被客户端确认为已读取的那些消息的信息。

持久性磁盘适配器支持索引文件,该文件跟踪以下转发的消息在日志中的位置。 读取日志文件中的所有消息后,它们将被ActiveMQ后台工作流程删除或存档。 如果该日志在代理故障期间损坏,则ActiveMQ将基于日志文件中的信息对其进行重建。

来自所有队列的消息被写入相同的日志文件,这意味着如果不读取一条消息,则无法清除整个文件(通常,默认值为32 MB或100 MB,具体取决于持久性适配器)。 随着时间的推移,这可能会导致磁盘空间不足的问题。
经典消息代理不适用于长期存储-阅读您的消息!
日志是用于存储和随后检索消息的极其有效的机制,因为磁盘访问对于这两种操作都是顺序的。 在传统的硬盘上,由于磁盘上的磁头仅继续在磁盘的旋转基板上继续读取或写入扇区,因此这可以最大程度地减少通过圆柱搜索磁盘的次数。 同样,在SSD上,顺序访问比随机访问要快得多,因为前者可以更好地利用驱动器的内存页。

磁盘性能因素


有许多因素决定磁盘的运行速度。 要了解这一点,请考虑通过简化的管道心理模型( 图2-2 )写入光盘的方法。


图2-2。 磁盘性能管模型

管道具有三个尺寸:

长度

对应于预期完成一项操作的等待时间 。 对于大多数本地驱动器来说,它是相当不错的,但是在本地驱动器实际上在线的云环境中,它可能成为主要的限制因素。 例如,在撰写本文时(2017年4月),亚马逊保证向其EBS存储的写入将“少于2毫秒”。 如果我们按顺序进行记录,则最大吞吐量为每秒500条记录。

幅宽

确定单个操作的承载能力或带宽 。 文件系统缓存通过将许多小记录合并到磁盘上执行的较小的一组较大的写操作中来使用此属性。

随着时间的带宽

该想法以一系列事件的形式表示,这些事件可以同时出现在管道中,用称为IOPS(每秒I / O操作数)的度量表示。 存储制造商和云提供商通常使用IOPS来衡量性能。 硬盘在不同的上下文中将具有不同的IOPS值:工作负载是否主要由读取,写入或这些的组合组成,以及这些操作是顺序的,任意的还是混合的。 从代理的角度来看,最有趣的IOPS测量是顺序读取和写入操作,因为它们对应于读取和写入日志日志。

消息代理的最大吞吐量取决于这些限制中的第一个限制,并且代理配置在很大程度上取决于您与磁盘交互的方式。 这不仅是例如配置代理的方式的一个因素,而且还取决于生产者如何与代理进行交互。 与所有与性能相关的内容一样,有必要在代表性的工作负载(即,尽可能接近真实消息)和将在PROM中使用的实际存储配置上测试代理。 这样做是为了了解系统在现实中的行为。

JMS API


在深入了解ActiveMQ与客户端通信的方式之前,我们首先需要学习JMS API。 API定义了一组由客户端代码使用的编程接口:

连接工厂

这是用于与代理建立连接的顶级接口。 在典型的消息传递应用程序中,此接口只有一个实例。 在ActiveMQ中,这是一个ActiveMQConnectionFactory。 在顶层,此设计告诉消息代理位置,以及如何与之交互的低级详细信息。 顾名思义,ConnectionFactory是用于创建Connection对象的机制。

连接方式

这是一个寿命很长的对象,大致类似于TCP连接-创建后,它通常存在于应用程序的整个生命周期中,直到关闭为止。 连接是线程安全的,并且可以同时使用多个线程。 连接对象使您可以创建会话对象。

届会

与代理进行交互时,这是一个流句柄。 会话对象不是线程安全的,这意味着多个线程不能同时访问它们。 会话是主要的事务描述符,程序员处于事务模式时可以使用它提交和回滚回滚消息。 使用此对象,您可以创建Message,MessageConsumer和MessageProducer对象,并获得指向Topic和Queue对象的指针(描述符)。

消息制作人

该界面允许您向收件人发送消息。

消息消费者

该接口允许开发人员接收消息。 有两种消息检索机制:

  • 注册MessageListener。 这是您已实现的消息处理程序接口,它将使用一个流顺序处理代理发布的所有消息。
  • 使用receive()方法轮询消息。

留言内容

在传输数据时,这可能是最重要的结构。 JMS中的消息包括两个方面:

  • 消息元数据。 该消息包含标题和属性。 两者都可以视为地图的元素。 标头是JMS规范定义的众所周知的元素,可以通过API直接使用,例如JMSDestination和JMSTimestamp。 属性是任意条消息信息,这些信息被设置为简化消息的处理或路由,而不必读取消息有效负载本身。 例如,您可以将标题设置为AccountID或OrderType。
  • 邮件正文。 在Session中,可以根据正文中发送的内容类型创建几种不同类型的消息,其中最常见的是字符串的TextMessage和二进制数据的BytesMessage。

队列如何工作:两脑的故事


一个有用的(尽管不准确的)ActiveMQ工作模型是两个大脑两半的模型。 一部分负责从生产者接收消息,另一部分负责将这些消息发送给消费者。 对于性能优化而言,关系实际上更为复杂,但是该模型足以满足基本的理解。

将消息发送到队列


让我们看一下发送消息时发生的交互。 图2-3为我们显示了代理接收消息的过程的简化模型。它并不完全对应于每种情况下的行为,但是非常适合获得基本了解。


图2-3。将消息发送到JMS

在客户端应用程序中,线程接收指向MessageProducer的指针。它使用估计的消息有效负载创建一个Message并调用MessageProducer.send(“ orders”,消息),并将队列作为消息的最终目标。由于程序员不想在代理中断时丢失消息,因此消息头JMSDeliveryMode被设置为PERSISTENT(默认行为)。

此时(1),发送流将调用客户端库并以OpenWire格式封送消息。然后,该消息将发送到代理。

在代理中,接收流从行中删除消息并将其解组到内部对象。然后,将消息对象传输到持久性适配器,该适配器使用Google协议缓冲区格式将消息封送,并将其写入存储设备(2)。
在将消息记录到存储中之后,持久性适配器应收到确认消息已被实际记录的消息(3)。这通常是整个交互过程中最慢的部分。稍后再讨论。

经纪人一旦确保已保存该消息,就会将确认响应(4)发送给客户端。之后,最初调用send()操作的客户端线程可以继续其工作。

持久性消息的未决确认是JMS API提供的保证的基础-如果您希望保存消息,那么对于消息是否首先由代理接收就可能也很重要。有多种原因可能导致这种情况无法实现,例如,已达到内存或磁盘限制。代理不是失败,而是挂起发送操作,迫使生产者等待直到有足够的系统资源可用于处理消息(称为生产者流控制的过程),或者他将否定确认发送给生产者,并引发异常。可以为每个经纪人定制确切的行为。

在这种简单的操作中,发生了大量的I / O交互:生产者和代理之间的两次网络操作,一次保存操作和一个确认步骤。保存操作可以是对磁盘的简单写入,也可以是到存储服务器的另一网络过渡。

这就引起了一个有关消息代理的重要问题:消息代理的工作与非常密集的I / O操作流相关,并且它们对所使用的基础结构(特别是磁盘)非常敏感。

让我们仔细看看上述交互中的确认步骤(3)。如果持久性适配器是基于文件的,则存储消息涉及写入文件系统。如果是这样,那为什么我需要确认写操作已经完成?完成录制的行为是否真的意味着发生了录制?
不完全是通常情况下,您学习的内容越深入,结果就会越复杂。在这种特殊情况下,缓存是元凶

缓存,无处不在的缓存


当诸如代理之类的操作系统进程将数据写入磁盘时,它与文件系统进行交互。文件系统是一种抽象过程,用于抽象与所使用的存储介质的交互细节,并为文件操作(例如OPEN,CLOSE,READ和WRITE)提供API。这些功能之一是通过将操作系统写入的数据缓冲到可以用一种方法保存到磁盘的块中,最大程度地减少写入操作的次数。看起来像与磁盘交互的文件系统写操作实际上已写入此缓冲区高速缓存

顺便说一句,这就是为什么您的计算机在您不安全弹出USB驱动器时会抱怨的原因-您复制的文件可能尚未真正写入!
一旦数据超出缓冲区高速缓存,它将进入下一个高速缓存级别,这次是在硬件级别- 磁盘控制器高速缓存。它们对于基于RAID的系统特别重要,并且具有与操作系统级别的缓存相同的功能:最小化驱动器本身所需的交互次数。这些缓存分为两类:

直写

收到后立即将写操作传输到磁盘。

回写

仅当缓冲区已满时达到特定阈值时,才在光盘上执行记录。

停电期间,存储在这些缓存中的数据很容易丢失,因为它们使用的内存通常是易失性的(volatile)。更昂贵的卡具有冗余电池组(BBU),它们支持高速缓存电源,直到整个系统可以恢复电源为止,之后数据将被写入磁盘。
最后一个缓存级别在磁盘本身上。磁盘缓存位于硬盘驱动器(标准硬盘驱动器和固态驱动器上)上,并且可以是直写式或回写式。大多数商用驱动器使用回写式缓存,并且易失性,这再次意味着在电源故障的情况下可能会丢失数据。

返回到消息代理,您需要完成确认步骤,以确保数据确实到达磁盘。不幸的是,与这些硬件缓冲区的交互取决于文件系统,因此ActiveMQ之类的过程所能做的就是向文件系统发送一个信号,表明它希望将所有系统缓冲区与使用中的设备同步。为此,代理调用java.io.FileDescriptor.sync()方法,该方法反过来启动POSIX fsync()操作。

此同步行为是JMS的要求,以确保所有标记为持久性的消息实际上都保存到磁盘上,并因此在接收到事务中的每个消息或一组相关消息之后执行。因此,磁盘执行sync()的速度对于代理的性能至关重要。

内部冲突


对所有队列使用一个日志会增加额外的复杂性。在任何给定时间,可能有多个生产者同时发送消息。代理有几个流,这些流从传入的套接字接收这些消息。每个线程必须将其消息保存到日志。由于多个线程无法同时写入同一文件,因为记录将彼此冲突,然后应使用互斥机制将记录排入队列。我们称此线程冲突

在处理下一条消息之前,必须完全记录并同步每条消息。此限制会同时影响代理中的所有队列。因此,接收消息的速度是写入磁盘所花费的时间,加上其他流完成记录所需要的等待时间。

ActiveMQ包含一个写缓冲区,接收流在其中写入其消息,以等待先前记录的完成。然后,当消息可用时,以一种动作写入缓冲区。完成后,将通知线程。因此,代理最大化了存储带宽的使用。

为了最大程度地减少线程冲突的影响,可以使用mKahaDB适配器为队列集分配其自己的日志。这种方法减少了写入等待时间,因为线程在任何时候都很可能会写入不同的日志,并且它们无需为争用一个日志文件而相互竞争。

交易次数


对所有队列使用单个日志的优点是,从经纪人的作者的角度来看,实现交易要容易得多。

让我们看一个示例,其中生产者将多个消息发送到多个队列。使用事务意味着要发送的整个消息集应被视为一个原子操作。在这种交互中,ActiveMQ客户端库能够进行一些优化,从而显着提高发送速度。

图2-4所示的操作中,生产者发送三个消息,所有消息都在不同的队列中。确认每条消息后,客户端将异步发送所有三个消息,即无需等待响应,而无需与代理进行通常的交互。这些消息存储在代理的内存中。一旦操作完成,生产者就将需要提交的消息通知其会话,这又迫使代理使用一次同步操作执行一项大记录。


图2-4。在事务中发送消息

在这种类型的操作中,ActiveMQ使用两种优化来提高速度:

  • 消除生产者下一次调度之前的等待时间
  • 将许多小磁盘操作组合成一个大磁盘-这使您可以使用磁盘总线的整个带宽

如果将此与每个队列存储在其自己的日志中的情况进行比较,那么代理将必须提供诸如协调所有记录之间的事务之类的内容。

从队列中减去消息


当消费者通过设置MessageListener来处理到达的消息或调用MessageConsumer.receive()方法(图2-5来表达其接受消息的意愿时,便开始读取消息


图2-5。通过JMS读取消息

当ActiveMQ知道使用方时,它(ActiveMQ)从存储到分发内存(1)逐页读取(页面)消息。然后,这些消息通常分几部分重定向(分派)到会计师(2),以减少网络交互的数量。代理跟踪哪些消息已被重定向以及哪个消费者。

使用者收到的消息不会立即由应用程序处理,而是放置在称为“内存”的存储区中。预取缓冲器(预取缓冲器)。此缓冲区的目的是简化消息流,以便代理可以在主管可以发送时向主管发布消息,而消费者可以一次有序地接收消息。

在进入预取缓冲区后的某个时刻,应用程序逻辑(X)读取消息,并将校对确认发送给代理(3)。消息处理和确认之间的时间安排是使用称为确认模式的JMS会话参数进行配置的,我们将在稍后进行讨论。
代理一旦接受消息传递确认,就会从内存和消息存储中将其删除(4)。术语“删除”在某种程度上具有误导性,因为实际上将确认记录写入日志,并且索引中的索引增加。包含消息的日志文件的实际删除将由垃圾收集器在后台线程中基于此信息执行。

上述行为是为了简化理解而进行的简化。实际上,ActiveMQ不仅从磁盘逐页读取数据,还使用代理的接收和重定向部分之间的游标机制,以尽可能减少与代理的存储库的交互。如上所述,分页是此机制中使用的模式之一。游标可以看作是应用程序级缓存,需要与代理的存储库保持同步。使用的一致性协议是使ActiveMQ调度机制比下一章描述的Kafka机制更加复杂的重要组成部分。

确认和交易方式


确定校对和确认之间的顺序的各种确认模式,对需要在客户端中实现的逻辑有重大影响。它们如下:

AUTO_ACKNOWLEDGE

这是最常用的模式,可能是因为它有单词AUTO。此模式强制客户端库在用receive()调用读取消息的同时确认消息。这意味着,如果由消息启动的业务逻辑引发异常,则该消息将丢失,因为该消息已在代理上被删除。如果通过侦听器读取了消息,则只有在侦听器成功完成工作之后,才会确认消息。

CLIENT_ACKNOWLEDGE

仅当使用者代码明确调用Message.acknowledge()方法时,才会发送确认。

DUPS_OK_ACKNOWLEDGE

这里,确认将在同时发送之前在使用方中进行缓冲,以减少网络流量。但是,如果客户端系统关闭,确认将丢失,并且消息将重新发送并再次处理。因此,该代码应考虑重复消息的可能性。

交易阅读工具补充了确认模式。创建会话时,可以将其标记为事务性的。这意味着程序员必须显式调用Session.commit()或Session.rollback()。在消费者方面,事务扩展了代码可以作为一个原子操作执行的交互范围。例如,您可以整体上读取和处理多条消息,或者从一个队列中减去一条消息,然后使用同一Session对象将其发送到另一条消息。

调度和几个消费者


到目前为止,我们一直在讨论与单个消费者一起阅读消息的行为。现在,让我们看看该模型如何适用于多个消费者。

当多个使用者订阅队列时,代理的默认行为是将循环消息发送给在预取缓冲区中有位置的那些使用者。消息将按照到达队列的顺序发送-这是唯一提供的FIFO保证(先进先出;先进先出)。

当消费者突然关闭时,发送给他但尚未确认的所有消息将重新发送给另一个可用的客户。

这就提出了一个重要的问题:即使在使用消费者交易的地方,也无法保证不会多次处理该消息。

考虑使用者内部的以下处理逻辑:

  1. 从队列中减去该消息。交易开始。
  2. 使用消息的内容调用Web服务。
  3. 事务已提交。确认将发送给经纪人。

如果客户端在步骤2和3之间完成,则消息的校对已经通过调用Web服务影响了其他系统。 Web服务调用是HTTP请求,因此不是事务性的。

此行为对所有排队系统都是正确的-即使它们是事务性的,也不能保证在其中处理消息时不会有副作用。 详细检查了消息的处理之后,我们可以自信地说:

没有消息传递一次这样的事情。

队列至少保证一次传递并且代码的敏感部分应始终考虑接收重复消息的可能性。 稍后我们将讨论消息传递客户端如何使用幂等阅读来跟踪已被查看的消息并避免重复。

讯息排序


对于以[A,B,C,D]顺序到达的一组消息,以及对于两个使用者C1和C2,消息的正态分布如下:

C1: [A, C]
C2: [B, D]

由于代理人不控制读取过程的操作并且处理顺序是并行的,因此它是不确定的。 如果C1慢于C2,则可以将初始消息集处理为[B,D,A,C]。

此行为可能会使初学者感到惊讶,他们期望消息将按顺序处理,并在此基础上开发自己的消息传递应用程序。 由同一发件人发送的消息必须按相对彼此的顺序进行处理(也称为因果排序 )的要求非常普遍。

以以下来自在线博彩的用例为例:

  1. 用户帐户已配置。
  2. 钱记入帐户。
  3. 下注从账户中提款。

在这里有意义的是,将按照发送消息的顺序来处理消息,以便考虑帐户的一般状态。 如果系统尝试从没有资金的帐户中取出资金,则会发生奇怪的事情。 当然,有一些解决方法。

专有客户模型包括将所有消息从队列发送到一位客户。 使用这种方法,将应用程序或线程的多个实例连接到队列时,将使用特殊的收件人参数对它们进行签名: my.queue?consumer.exclusive=true 。 当您连接一个垄断消费者时,他会收到所有消息。 连接第二个使用者时,直到第一个使用者断开连接后,它才会收到任何消息。 第二个使用者实际上是一个热储备,而第一个使用者现在将完全按照因果关系接收消息的顺序,将消息记录在日志中。
这种方法的缺点是,尽管消息处理是一致的,但是它是性能瓶颈,因为所有消息都必须由单个计算器处理。

为了更智能地理解此用例,您需要重新考虑问题。 是否需要按顺序处理所有消息? 在处理上述投标的情况下,仅需要顺序处理与一个帐户有关的消息。 ActiveMQ提供了一种称为JMS消息组的机制来处理这种情况。

消息组是一种分区机制,允许生产者将消息分配到将根据业务密钥顺序处理的组中。 此业务密钥在名为JMSXGroupID的消息属性中JMSXGroupID

在处理投标的情况下,自然键将是帐户标识符。
为了说明发送的工作方式,请考虑按以下顺序到达的一组消息:

 [(A, Group1), (B, Group1), (C, Group2), (D, Group3), (E, Group2)] 

当消息由ActiveMQ中的调度机制处理时,并且看到以前不存在的JMSXGroupID ,会将该密钥周期性地分配给使用者。 从现在开始,所有带有此密钥的消息都将发送给该​​会计师。

这里,将在两个使用者(C1和C2)之间分配组,如下所示:

 C1: [Group1, Group3] C2: [Group2] 

邮件将按照以下方式重定向和处理:

 C2: [B, D] C2: [(C, Group2), (E, Group2)] 

如果消费者崩溃了,那么分配给他的所有组将在其余消费者之间重新分配,并且所有未确认的消息将再次重定向。 因此,尽管我们可以保证所有相关消息都将按顺序处理,但是我们不能声称它们将由同一使用者处理。

高可用性


ActiveMQ通过基于共享存储的主从提供高可用性。 在此方案中,在单独的服务器上配置了两个或更多的代理(尽管通常是两个),并且它们的消息存储在位于外部位置的消息存储中。 消息存储不能被代理的多个实例同时使用,因此,它的(仓库)辅助功能是充当阻止机制,以确定哪个代理将获得独占访问权( 图2-6 )。


图2-6。 经纪人A是带头人;经纪人B作为奴隶处于待命状态

为了连接到存储库,第一个代理(代理A)充当领导者的角色,并打开其端口以进行消息通信。 当第二个代理(代理B)连接到存储库时,他尝试获取锁定,并且由于未成功,因此会停一小段时间,然后再尝试获取锁定。 这被称为驱动遏制。

同时,客户端会交替更改两个代理的地址,以尝试连接到入站端口(称为传输连接器)。 一旦主代理可用,客户端就会连接到其端口并可以发送和读取消息。
当代理A作为领导者,由于流程失败而失败时( 图2-7 ),将发生以下事件:

  1. 客户端断开连接,并立即尝试重新连接,从而交替使用两个代理的地址。
  2. 消息中的锁定被释放。 此时间取决于存储实现。
  3. 处于从属模式的代理B定期尝试获取锁,最终成功并承担了主角色的角色,打开了端口。
  4. 客户连接到代理B并继续他的工作。


图2-7。 代理A通过丢失与存储库的连接来终止。 经纪人B带头
与JMS / NMS / CMS实现一样,不能保证在客户端库中内置多个代理地址之间的交替逻辑。 如果该库仅提供重新连接到单个地址,则您可能需要在负载平衡器后面放置几个代理,该平衡器也应具有很高的可用性。
这种方法的主要缺点是,为了简化一个逻辑代理的工作,需要多个物理服务器。 在这种情况下,代理的两个服务器之一处于空闲状态,等待其伙伴断开连接后才能开始工作。

这种方法还增加了复杂性,即所使用的代理存储(无论是共享网络文件系统还是数据库)也必须具有高度可访问性。 这导致设备和经纪人设置管理的额外费用。 在这种情况下,很想重用基础架构的其他部分(例如数据库)使用的现有高可用性存储库,但这是一个错误。

重要的是要记住,磁盘是代理整体性能的主要限制因素。 如果磁盘本身被消息代理之外的其他进程同时使用,则该进程与磁盘的交互可能会减慢代理的录制速度,因此会减慢消息通过系统的速度。 这样的减速很难诊断,并且解决它们的唯一方法是将两个进程分离到不同的存储卷中。

为了确保代理的稳定运行,需要专用的专用存储。

垂直和水平缩放


在项目生命周期的某个时刻,您可能会遇到消息代理的性能限制。 这些限制通常与资源有关,特别是与使用的存储的ActiveMQ交互。 这些问题通常是由于收件人之间的消息量或带宽冲突而引起的,例如,当一个队列在高峰时段溢出代理时。

有很多方法可以从代理的基础结构中获得更高的性能:

  • 如果不需要,请不要使用持久性。 某些使用情况允许在崩溃期间丢失消息,尤其是当一个系统定期或按需通过队列将另一个完整快照状态发送给另一个系统时。
  • 在更快的驱动器上运行代理。 在实际情况下,记录的带宽在标准HDD和基于内存的替代方案之间存在明显差异。
  • 充分利用磁盘大小。 如上述磁盘管道交互模型所示,可以通过使用事务发送消息组来实现更高的吞吐量,从而将多个写操作组合为一个较大的写操作。
  • 使用流量分区。 您可以通过以下方式之一分割目标,从而提高吞吐量:

  1. 例如,一个代理中的多个磁盘对多个目录使用mKahaDB持久性适配器,每个目录都安装在单独的磁盘上。
  2. 几个代理,流量的划分是由客户端应用程序手动执行的。 ActiveMQ不为此目的提供任何本机功能。

代理性能问题的最常见原因之一就是尝试对一个实例执行过多操作。 通常,这种情况发生在以下情况下:将代理程序天真地划分为几个应用程序,而不考虑代理程序上的现有负载或不了解卷。 随着时间的流逝,一个经纪人的负担越来越重,直到他停止适当表现为止。

当系统架构师可以提出如图2-8所示的方案时,通常会在系统设计阶段出现问题。


图2-8。 消息基础结构的概念视图

目标是使多个应用程序通过ActiveMQ彼此异步通信。 不再指定目标,然后方案确定实际代理配置的基础。 这种方法称为通用数据管道。

它没有考虑上述概念设计和物理实现之间的基本分析步骤。 在进行特定配置的构建之前,有必要进行分析,然后将其用于证明物理项目的合理性。 此过程的第一步是确定哪些系统彼此交互-带有矩形和箭头的相当简单的图( 图2-9 )。


图2-9。 系统之间的草图消息流

获得批准后,您可以转到详细信息来回答以下问题:

  • 将使用多少个队列和主题?
  • 它们每个预期有什么消息量?
  • 每个收件人中的邮件有多大? 大消息可能会在分页过程中引起问题,从而导致超出内存限制并阻塞代理。
  • 消息流将全天统一还是由于批处理作业而导致峰值? 一个较少使用的队列中的大批处理可能会干扰及时向高性能目标写入磁盘。
  • 这些系统是在同一数据中心还是在不同数据中心? 远程通信涉及某种网络代理。

这个想法是定义单独的消息传递场景,这些场景可以由单个代理组合或拆分( 图2-10 )。
发生此类故障后,可以使用ActiveMQ性能模块相互结合以识别任何问题,从而模拟使用场景。


图2-10。 识别个别经纪人

确定适当数量的逻辑代理后,您可以确定如何使用高度可访问的配置和代理网络在物理级别上实现它们。

总结


在本章中,我们研究了ActiveMQ接收和分发消息的机制。 我们讨论了此体系结构支持的功能,包括相关消息和事务的粘性负载平衡。 同时,我们介绍了所有消息系统通用的一组概念,包括通信协议和杂志。 我们还详细研究了写入磁盘涉及的困难以及代理如何使用诸如数据包写入之类的技术来提高性能。 最后,我们研究了如何使ActiveMQ变得高度可用以及如何将其扩展到单个经纪人能力之外。

在下一章中,我们将研究Apache Kafka及其架构如何重新定义客户端和代理之间的关系,以提供带宽比常规消息代理大许多倍的健壮消息管道。 我们将讨论其用于实现此目标的功能,并简要考虑提供此功能的应用程序的体系结构。

下一部分: 了解消息代理。 通过ActiveMQ和Kafka学习消息传递的机制。 第三章。卡夫卡

翻译完成: tele.gg/middle_java

Source: https://habr.com/ru/post/zh-CN471268/


All Articles