了解消息代理。 通过ActiveMQ和Kafka学习消息传递的机制。 第三章。卡夫卡

继续翻译一本小书:
“了解消息经纪人”,
作者:Jakub Korab,出版商:O'Reilly Media,Inc.,出版日期:2017年6月,ISBN:9781492049296。

翻译完成: tele.gg/middle_java

上一部分: 了解消息代理。 通过ActiveMQ和Kafka学习消息传递的机制。 第2章ActiveMQ

第三章


卡夫卡


Kafka是在LinkedIn上开发的,旨在规避传统消息代理的某些限制,并避免为不同的点对点交互而配置多个消息代理,这在本书第28页的“垂直和水平缩放”部分中进行了介绍。 LinkedIn严重依赖单向吸收大量数据,例如页面点击和访问日志,同时允许多个系统使用这些数据。 上午,在不影响其他生产商或konsyumerov的性能。 实际上,存在Kafka的原因是为了获得通用数据管道描述的消息传递体系结构。

鉴于这个最终目标,自然会产生其他要求。 卡夫卡必须:

  • 极快
  • 提供更大的消息传递吞吐量
  • 支持发布者-订阅者和点对点模型
  • 不要随着增加消费者而放慢脚步。 例如,ActiveMQ中的队列和主题的性能都随着目标方使用者数量的增加而降低
  • 可水平扩展; 如果一条持久消息只能以最大磁盘速度执行此操作,则要提高性能,有必要超越一个代理实例的限制
  • 描绘对消息存储和检索的访问

为了实现所有这些,Kafka采用了一种体系结构,该体系结构重新定义了客户端和消息传递代理的角色和职责。 JMS模型非常关注代理,由代理负责消息的分发,客户只需要担心发送和接收消息。 另一方面,Kafka是面向客户的,客户承担了传统经纪人的许多功能,例如在消费者之间公平分配相关消息,以换取一个非常快速和可扩展的经纪人。 对于使用传统消息传递系统的人,使用Kafka要求从根本上改变态度。
这种工程方向导致了消息传递基础结构的创建,该消息传递基础结构与常规代理相比可以将吞吐量提高许多数量级。 就像我们将看到的那样,这种方法充满折衷,这意味着Kafka不适合某些类型的负载和已安装的软件。

统一目的地模型


为了满足上述要求,Kafka将发布订阅和点对点消息传递组合在一种收件人主题中 。 对于使用消息传递系统的人来说,这是令人困惑的,其中“主题”一词是指一种广播机制,从该机制(从主题中)读取不可靠(不持久)。 如本书简介中所定义,Kafka主题应被视为目的地的一种混合类型。
在本章的其余部分中,除非我们另外明确指定,否则术语“主题”将指代Kafka主题。

为了完全理解主题的行为方式以及它们提供的保证,我们首先需要考虑如何在Kafka中实现它们。
卡夫卡中的每个主题都有自己的日记。
向Kafka发送消息的生产者会附加到该杂志上,而消费者会使用不断向前移动的指针来阅读该杂志。 Kafka会定期删除日志的最旧部分,无论是否已阅读这些部分中的消息。 Kafka设计的核心部分是,代理程序不关心是否读取消息-这是客户的责任。
Kafka文档中找不到术语“ journal”和“ index”。 这些众所周知的术语在这里用于帮助理解。

此模型与ActiveMQ完全不同,在ActiveMQ中,所有队列中的消息都存储在一个日志中,并且代理在读取消息后将消息标记为已删除。
现在,让我们更深入地了解主题杂志。
《卡夫卡杂志》由几个分区组成( 图3-1 )。 Kafka保证每个分区的订购严格。 这意味着将以相同顺序读取写入分区的消息。 每个分区都实现为滚动(日志)日志文件,其中包含由其生产者发送给主题所有消息的子集 。 默认情况下,创建的主题包含一个分区。 分区是Kafka进行水平缩放的中心思想。


图3-1。 分区卡夫卡

当生产者向Kafka主题发送消息时,他决定将消息发送到哪个分区。 我们将在以后更详细地考虑这一点。

阅读讯息


想要读取消息的客户端控制名为“ 使用者组”的命名指针,该指针指示分区中消息的偏移量 。 偏移量是一个位置递增的位置,该位置在分区的开头从0开始。 API中通过用户定义的标识符group_id引用的这组使用者对应于一个逻辑使用者或系统

大多数消息传递系统通过多个实例和线程从收件人读取数据,以并行处理消息。 因此,通常会有许多共享同一组消费者的消费者实例。

阅读问题可以表示为:

  • 该主题有几个分区
  • 多个消费者群体可以同时使用该主题。
  • 一组消费者可以具有多个单独的实例。

这是一个不平凡的多对多问题。 要了解Kafka如何处理消费者群体,消费者实例和分区之间的关系,让我们看一下一系列日益复杂的阅读脚本。

消费者和消费者群体


让我们以单分区主题为起点( 图3-2 )。


图3-2。 消费者从分区读取

当使用者实例使用其自己的group_id连接到该主题时,将为其分配要读取的分区以及该分区中的偏移量。 此偏移量的位置在客户端中配置为指向最新位置(最新消息)或最早位置(最旧消息)的指针。 消费者从主题中请求(轮询)消息,从而导致他们从日记中顺序读取。
偏移位置会定期提交回Kafka,并另存为_consumer_offsets内部主题中的消息。 与常规代理不同,已读消息仍不会删除,并且客户端可以倒转偏移量以重新处理已查看的消息。

使用另一个group_id连接第二个逻辑使用者时,它控制与第一个指针无关的第二个指针( 图3-3 )。 因此,Kafka主题充当一个队列,在该队列中有一个消费者,作为常规主题的发布者-订阅者(pub-sub)订阅了多个消费者,其附加优点是所有消息都可以保存并可以处理多次。


图3-3。 不同消费者群中的两个消费者从同一分区读取

消费者组中的消费者


如上一节所述,当使用者的一个实例从分区读取数据时,它将完全控制指针并处理消息。
如果几个使用者的实例使用相同的group_id连接到具有一个分区的主题,则最后连接的实例将获得对指针的控制权,此后它将接收所有消息( 图3-4 )。


图3-4。 同一组使用者中的两个使用者从同一分区读取

消费者的实例数超过分区数的这种处理模式可以被视为一种垄断消费者。 如果您需要消费者实例的“主动-被动”(或“热-热”)群集,这将很有用,尽管多个消费者(“主动-主动”或“热-热”)的并行操作比消费者更典型在待机模式下。
与常规JMS队列的行为相比,上述消息分发行为可能令人惊讶。 在此模型中,发送到队列的消息将在两个使用者之间平均分配。

通常,当我们创建几个编译器实例时,我们要么对消息进行并行处理,要么提高读取速度,要么提高读取过程的稳定性。 由于只有使用者的一个实例可以从分区读取数据,因此如何在Kafka中实现?

一种方法是使用使用者的一个实例来读取所有消息并将其发送到线程池。 尽管此方法增加了处理吞吐量,但它增加了使用方逻辑的复杂性,无助于提高读取系统的稳定性。 如果使用者的一个实例由于电源故障或类似事件而关闭,则校对将停止。

解决Kafka中此问题的规范方法是使用更多分区。

分区


分区是使主题的读取和扩展超出代理的一个实例的带宽的并行化的主要机制。 为了更好地理解这一点,我们来看一个情况,该主题有两个分区,一个使用者订阅了该主题( 图3-5 )。


图3-5。 一位消费者从多个分区读取

在这种情况下,给顾问提供了对两个分区中与其group_id对应的指针的控制,然后开始从两个分区读取消息。
当为同一group_id向该主题添加额外的计算器时,Kafka从第一个分区向第二个分区重新分配(重新分配)一个分区。 之后,将从该主题的一个分区中减去使用者的每个实例( 图3-6 )。

为了确保在20个线程中并行处理消息,您至少需要20个分区。 如果会有更少的分区,那么您将仍然有不需要处理的使用者,如前面在专有监视器的讨论中所述。


图3-6。 同一组使用者中的两个使用者从不同的分区读取

与支持JMS队列所需的消息分发相比,此方案大大降低了Kafka代理的复杂性。 无需注意以下几点:

  • 哪个使用者应基于循环分布,当前预取缓冲区容量或先前的消息(如JMS消息组)接收下一条消息。
  • 发生故障时,向哪些使用者发送了什么消息,应该重新发送给它们。

Kafka经纪人要做的就是在顾问请求时,始终向顾问发送消息。

但是,并行化校对和重新发送不成功消息的要求不会消失-对它们的责任只是从经纪人传递给客户。 这意味着必须将它们纳入您的代码中。

传送讯息


决定消息发送到哪个分区的责任是消息的产生者。 要了解执行此操作的机制,您首先需要考虑我们实际发送的确切内容。

在JMS中,我们使用具有元数据(标头和属性)和包含有效内容的主体的消息结构,而在Kafka中,消息是键值对 。 消息有效负载作为值发送。 另一方面,密钥主要用于分区,并且必须包含特定业务逻辑的密钥才能将相关消息放入同一分区。

在第2章中,我们讨论了在线博彩场景,何时应该由单个消费者按顺序处理相关事件:

  1. 用户帐户已配置。
  2. 钱记入帐户。
  3. 下注从账户中提款。

如果每个事件都是发送给该主题的消息,则在这种情况下,帐户标识符将是自然键。
使用Kafka Producer API发送消息时,消息将传递到分区函数,该分区函数根据消息和Kafka集群的当前状态,返回应将消息发送到的分区的标识符。 此功能是通过Partitioner接口用Java实现的。

该界面如下:

interface Partitioner { int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); } 

如果未指定密钥来确定分区,则Partitioner实现将在密钥或轮询上使用默认的通用哈希算法。 在大多数情况下,此默认值效果很好。 但是,将来您会想要编写自己的。

编写自己的分区策略


让我们看一个示例,当您希望将元数据与消息有效载荷一起发送时。 在我们的示例中,有效负载是用于向游戏帐户存款的指令。 一条指令是我们要保证在传输期间不会修改的东西,并且我们要确保只有受信任的高级系统才能启动该指令。 在这种情况下,发送和接收系统会同意使用签名来验证消息。
在常规JMS中,我们只需定义消息签名属性并将其添加到消息中即可。 但是,Kafka并没有为我们提供一种传输元数据的机制-仅密钥和值。

由于该值是银行转账的有效负载(银行转账的有效负载),我们希望保持其完整性,因此我们别无选择,只能确定密钥中使用的数据结构。 假设我们需要一个用于分区的帐户标识符,因为与该帐户相关的所有消息都必须按顺序处理,因此我们将提出以下JSON结构:

 { "signature": "541661622185851c248b41bf0cea7ad0", "accountId": "10007865234" } 

因为签名值将根据有效负载而变化,所以默认的Partitioner接口哈希策略将无法可靠地对相关消息进行分组。 因此,我们将需要编写自己的策略,该策略将分析此密钥并共享accountId的值。
Kafka在存储库中包括用于检测消息损坏的校验和,并具有一整套安全功能。 即使这样,有时也会出现行业特定要求,例如上述要求。

用户分区策略应确保所有相关消息都位于同一分区中。 尽管这看起来很简单,但是由于订购相关消息的重要性以及主题中分区数的固定程度,要求可能会变得复杂。

主题中的分区数可以随时间变化,因为如果流量超出了最初的期望,可以添加分区数。 因此,消息密钥可以与它们最初发送到的分区关联,这意味着状态的一部分必须在生产者实例之间分配。

要考虑的另一个因素是分区之间消息的均匀分布。 通常,密钥在消息中的分布不均匀,并且哈希函数不能保证一小组密钥对消息的公平分配。
重要的是要注意,无论您决定如何拆分消息,分隔符本身都可能需要重用。

考虑在不同地理位置的Kafka群集之间进行数据复制的要求。 为此,Kafka附带了一个称为MirrorMaker的命令行工具,该工具可用于从一个群集读取消息并将其传输到另一个群集。

MirrorMaker必须了解复制主题的键,才能在集群之间进行复制时保持消息之间的相对顺序,因为在两个集群中该主题的分区数量可能不一致。

自定义分区策略相对很少见,因为默认散列或循环轮询在大多数情况下都能成功工作。 但是,如果您需要严格保证顺序,或者需要从有效负载中提取元数据,则应该仔细研究分区。

Kafka的可伸缩性和性能优势来自将传统经纪人的部分职责转移给客户。 在这种情况下,将决定在多个并行工作的使用者之间分配潜在的相关消息。
JMS经纪人还必须处理此类要求。 有趣的是,通过JMS消息组实现的将相关消息发送到同一帐户的机制(一种粘性负载平衡(SLB)平衡策略)也要求发送者将消息标记为相关。 对于JMS,经纪人负责将这组相关消息发送给许多客户之一,如果客户跌落,则负责转让该组的所有权。

生产者协议


发送消息时,分区并不是唯一要考虑的事情。 让我们看一下Java API中Producer类的send()方法:

 Future < RecordMetadata > send(ProducerRecord < K, V > record); Future < RecordMetadata > send(ProducerRecord < K, V > record, Callback callback); 

应当立即注意到,这两种方法都返回Future,这表示未立即执行发送操作。 结果,事实证明消息(ProducerRecord)被写入每个活动分区的发送缓冲区,并在Kafka客户端库的后台流中传输到代理。 尽管这使工作变得异常快速,但这意味着如果程序停止运行,则经验不足的应用程序可能会丢失消息。

与往常一样,由于性能,有一种方法可以使发送操作更可靠。 该缓冲区的大小可以设置为0,发送应用程序的线程将被迫等待,直到消息发送到代理为止,如下所示:

 RecordMetadata metadata = producer.send(record).get(); 

再次关于阅读消息


阅读消息还有其他需要考虑的困难。 与JMS API可以启动消息侦听器以响应消息不同, Consumer Kafka接口仅轮询。 让我们仔细看看用于此目的的poll()方法:

 ConsumerRecords < K, V > poll(long timeout); 

该方法的返回值是一个容器结构,其中包含来自多个分区的多个ConsumerRecord对象。 ConsumerRecord本身是具有关联元数据(例如从中派生它的分区)的键/值对的持有者对象。

正如第2章中讨论的那样,我们必须不断记住在成功或未成功处理消息之后会发生什么情况,例如,如果客户端无法处理消息或中断工作。 在JMS中,这是通过确认模式处理的。 代理将删除成功处理的消息,或重新传递原始消息或翻转的消息(前提是已使用事务)。
卡夫卡的工作方式完全不同。 校对后,消息不会在代理中删除,失败时所发生的事情的责任在于代码本身。

正如我们已经说过的,一群消费者与杂志中的偏移量相关联。 与该偏差相关的日志位置对应于将响应poll()发出的下一条消息。 阅读的关键是该偏移量增加的时间点。

回到前面讨论的阅读模型,消息处理包括三个阶段:

  1. 检索要阅读的消息。
  2. 处理消息。
  3. 确认消息。

Kafka Consumer Advisor带有enable.auto.commit配置选项 。 这是一个常用的默认设置,通常包含“ auto”一词的设置就是这种情况。

在Kafka 0.10之前,使用此参数的客户端在处理后的下一个poll()调用中发送了上次读取消息的偏移量。 这意味着,如果客户端已经处理了所有已获取的消息,则可以对其进行重新处理,但是在调用poll()之前意外地将其销毁了。 由于代理人不维护有关已阅读该消息多少次的任何状态,因此下一个检索此消息的使用者将不知道发生了什么不良情况。 此行为是伪事务。 仅在成功处理消息的情况下才提交偏移,但是如果客户端中断,则代理再次将相同的消息发送给另一个客户端。 此行为与“ 至少一次 ”消息传递保证一致。

在Kafka 0.10中,按照设置auto.commit.interval.ms更改了客户端代码,以使客户端库开始定期启动提交。 此行为介于JMS AUTO_ACKNOWLEDGE和DUPS_OK_ACKNOWLEDGE模式之间。 使用自动提交时,无论消息是否被实际处理,都可以对其进行确认-如果使用速度较慢,则可能会发生这种情况。 如果计算机中断,则下一个计算机从固定位置开始检索消息,这可能导致消息跳过。 在这种情况下,Kafka不会丢失消息,阅读代码根本不会处理它们。

此模式的前景与0.9版相同:可以处理消息,但是如果发生故障,则可能无法关闭偏移量,这有可能导致传递重复。 执行poll()时检索到的消息越多,这个问题就越大。

第2章的“从队列中减去消息” 一节所述,给定故障模式,在消息传递系统中不存在一次性消息传递。

在Kafka中,有两种方法来修复(提交)偏移量(偏移量):自动和手动。 在这两种情况下,如果消息已处理但在提交之前失败,则可以多次处理消息。 如果提交是在后台进行的,并且您的代码在开始处理之前已完成(可能在Kafka 0.9和更早版本中),则也根本无法处理该消息。

您可以通过将enable.auto.commit设置为false并显式调用以下方法之一来控制在Kafka Consumer API中手动提交偏移量的过程:

 void commitSync(); void commitAsync(); 

如果要“至少一次”处理消息,则必须在处理消息后立即执行此命令,并使用commitSync()手动提交偏移量。

这些方法不允许在处理已确认的消息之前对其进行处理,但是它们并不能消除潜在的处理重复,同时会产生事务性的外观。 Kafka没有交易。 客户没有机会执行以下操作:

  • 自动回滚回滚消息。 消费者自己必须处理由于有问题的有效负载和后端断开而引起的异常,因为他们不能依靠代理重新传递消息。
  • 在一个原子操作中将消息发送到多个主题。 正如我们将很快看到的,对各种主题和分区的控制可以位于Kafka集群中的不同计算机上,它们在发送时不协调事务。 在撰写本文时,已经完成了一些工作,以使KIP-98成为可能。
  • 将从一个主题中读取一条消息与将另一条消息发送至另一主题相关联。 同样,Kafka的体系结构依赖于许多独立的机器作为一条总线工作,并且没有试图隐藏它。 例如,没有API组件允许消费者生产者在交易中链接。 在JMS中,这是由创建MessageProducersMessageConsumersSession对象提供的。

如果我们不能依赖事务,那么我们如何提供比传统消息传递系统所提供的语义更接近的语义?

如果在处理消息之前(例如在客户失败期间)消费者的偏移量有可能增加,那么当分配分区时,客户将无法得知客户组是否错过了消息。 因此,一种策略是将偏移量倒回先前的位置。 Kafka Consumer Advisor API为此提供了以下方法:

 void seek(TopicPartition partition, long offset); void seekToBeginning(Collection < TopicPartition > partitions); 

seek()方法可以与该方法一起使用
offsetsForTimes(映射<TopicPartition,Long> timestampsToSearch)可以倒退到过去任何特定时间点的状态。

隐式地,使用这种方法意味着很可能会读取并再次处理先前处理过的某些消息。 为避免这种情况,我们可以使用第4章所述的幂等读取来跟踪以前查看的消息并消除重复项。

或者,如果允许丢失或重复消息,则使用者的代码可以很简单。 当我们考虑通常使用Kafka的使用场景(例如,处理日志事件,指标,点击跟踪等)时,我们知道丢失单个消息不太可能对周围的应用程序产生重大影响。 在这种情况下,默认值是可以接受的。 另一方面,如果您的应用程序需要转移付款,则必须小心处理每条单独的消息。 一切都取决于上下文。

个人观察表明,随着消息强度的增加,每条消息的价值都会降低。 以汇总形式查看时,高容量消息往往变得很有价值。

高可用性


Kafka的高可用性方法与ActiveMQ完全不同。 Kafka是在水平可伸缩群集的基础上开发的,在该群集中,代理的所有实例同时接收和分发消息。

Kafka群集由运行在不同服务器上的几个代理实例组成。 Kafka旨在在常规的独立硬件上工作,其中每个节点都有自己的专用存储。 不建议使用网络附加存储(SAN),因为多个计算节点可以竞争存储时隙并产生冲突。

Kafka是一个持续运行的系统。 许多大型Kafka用户从未熄灭集群,并且该软件始终通过一致的重启来提供更新。 这是通过保证与消息代理之间的消息和交互的先前版本的兼容性来实现的。

代理连接到ZooKeeper服务器群集 ,该服务器群集充当给定的配置注册表,并用于协调每个代理的角色。 ZooKeeper本身是一个分布式系统,可通过建立仲裁来通过信息复制提供高可用性。

在基本情况下,该主题是在Kafka集群中创建的,具有以下属性:

  • 分区数。 如前所述,此处使用的确切值取决于所需的并发读取级别。
  • 复制系数(因子)确定群集中应包含该分区日志的代理实例数。

通过使用ZooKeepers进行协调,Kafka试图在集群中的代理之间公平地分配新分区。 这是由一个实例完成的,该实例充当控制器。

在运行时, 对于主题的每个分区, 控制器将代理(领导,领导,领导,领导)和跟随者 (跟随者,从属,下属)的角色分配给代理。 代理充当此分区的领导者,负责接收生产者发送给他的所有消息,并将消息分发给消费者。 将消息发送到主题分区时,它们将被复制到充当该分区的跟随者的所有代理节点。 包含分区日志的每个节点称为副本 。 经纪人可以充当某些分区的领导者,而可以充当其他分区的追随者。

包含由领导者存储的所有消息的关注者称为同步副本 (处于同步状态的副本,同步副本)。 如果充当分区领导者的代理断开连接,则处于该分区的更新或同步状态的任何代理都可以担任领导者角色。 这是一个难以置信的可持续性设计。

生产者配置的一部分是acks参数,该参数确定在应用程序流继续发送之前,应确认多少个副本确认消息的接收:0、1或全部。 如果该值设置为all ,那么当接收到该消息时,领导者将收到由min.insync.replicas主题设置 (默认为1)定义的多个副本(包括他自己)的确认后,立即将确认发送回生产者。 如果无法成功复制消息,则生产者将引发该应用程序的异常( NotEnoughReplicasNotEnoughReplicasAfterAppend )。

在典型配置中,创建的主题的复制系数为3(每个分区1个领导者,每个分区2个跟随者),并且min.insync.replicas参数设置为2。在这种情况下,群集将允许断开管理该分区的代理之一的连接。而不影响客户端应用程序。

这使我们回到了性能和可靠性之间已经熟悉的折衷方案。由于来自追随者的确认(确认)的额外等待时间而导致复制发生。尽管由于并行运行,所以至少三个节点的复制具有与两个节点相同的性能(忽略网络带宽使用量的增加)。

使用这种复制方案,Kafka巧妙地避免了使用sync()操作将每个消息物理写入磁盘的需要。生产者发送的每个消息都将写入分区日志,但是,如第2章所述,写入文件的操作最初是在操作系统缓冲区中进行的。如果此消息被复制到Kafka的另一个实例并在其内存中,则丢失领导者并不意味着该消息本身已丢失-同步副本可以将其自身接收。
选择不同步()操作意味着Kafka可以以将消息写入内存的速度接收消息。相反,避免将内存刷新到磁盘的时间越长越好。因此,Kafka代理分配64 GB或更多的内存并不罕见。这种内存使用情况意味着Kafka的一个实例可以轻松地以比传统消息代理快数千倍的速度工作。

Kafka也可以配置为使用sync()消息包。由于Kafka的所有内容都是面向包的,因此对于许多用例来说,它实际上都可以很好地工作,并且对于需要非常有力保证的用户来说是有用的工具。 Kafka的大多数纯粹性能与以数据包形式发送给代理的消息有关,并且这些消息是使用零复制操作(不执行将数据从一个存储区复制到存储区的任务)从代理连续读取的,这是事实。另一个)。后者在性能和资源方面是一大收获,并且只有通过使用定义分区方案的基础日志数据结构才能实现。

在Kafka集群中,与使用单个Kafka代理相比,可以实现更高的性能,因为可以在许多不同的计算机上水平缩放主题分区。

总结


在本章中,我们研究了Kafka架构如何重新解释客户端与代理之间的关系,以提供令人难以置信的健壮的消息传递管道,其带宽是常规消息代理的数十倍。我们讨论了用于实现此目标的功能,并简要回顾了提供此功能的应用程序的体系结构。在下一章中,我们将讨论消息传递应用程序需要解决的常见问题,并讨论解决这些问题的策略。在本章的结尾,我们概述了如何一般性地讨论消息传递技术,以便您可以评估它们对用例的适用性。

翻译完成:tele.gg/middle_java

待续...

Source: https://habr.com/ru/post/zh-CN466585/


All Articles