
哈Ha!
我在Tinkoff团队工作,该团队正在开发自己的通知中心。 在大多数情况下,我使用Spring boot在Java中进行开发,并解决了该项目中出现的各种技术问题。
我们的大多数微服务都通过消息代理彼此异步交互。 以前,我们使用IBM MQ作为代理,该代理停止应付负载,但同时具有很高的交付保证。
作为替代,我们提供了具有高可伸缩性的Apache Kafka,但不幸的是,它需要针对不同场景的几乎单独的配置方法。 另外,默认情况下,默认情况下,Kafka至少有一次传递机制不允许立即保持所需的一致性级别。 接下来,我将分享我们在配置Kafka方面的经验,特别是,我将告诉您如何配置并在交付后立即使用。
保证交货等
稍后将讨论的参数将有助于防止默认连接设置出现许多问题。 但是首先,我要注意一个参数,它有助于进行可能的调试。
生产者和消费者的Client.id将对此提供帮助。 乍一看,您可以将应用程序的名称用作值,并且在大多数情况下,这是可行的。 尽管在应用程序中使用了多个使用者并给他们提供相同的client.id时,会导致以下警告:
org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0
如果要在具有Kafka的应用程序中使用JMX,则可能会出现问题。 在这种情况下,最好使用应用程序名称和主题名称的组合作为client.id的值。 从Confluent的实用程序的kafka-consumer-groups命令的输出中可以看到我们配置的结果:

现在,我们将分析保证消息传递的场景。 Kafka Producer有一个acks参数,通过该参数,您可以配置在确认集群领导者需要考虑成功记录的消息的确认数量之后。 此参数可以采用以下值:
- 0-不考虑确认。
- 1-默认参数,仅1个副本需要确认。
- -1-所有同步副本都需要确认( min.insync.replicas集群配置 )。
从上述值可以看出,等于-1的acks提供了最强的保证,即消息不会丢失。
众所周知,分布式系统是不可靠的。 为了防止出现临时故障,Kafka Producer提供了一个重试参数,该参数允许您设置delivery.timeout.ms期间的重试次数。 由于retries参数默认为Integer.MAX_VALUE(2147483647),因此可以通过仅更改delivery.timeout.ms来控制消息的重传次数。
朝着正好一次交货的方向发展
这些设置使我们的生产者可以高度保证地传递消息。 现在让我们谈谈如何确保在Kafka主题中仅记录一条消息的副本? 在最简单的情况下,要在Producer上执行此操作,请将enable.idempotence参数设置为true。 幂等保证在一个主题的特定分区中仅记录一条消息。 启用幂等性的先决条件是acks = all,重试> 0,最大in.flight.requests.per.connection≤5 。 如果开发人员未设置这些参数,则将自动设置以上值。
设置幂等时,必须确保每次都将相同的消息放入相同的分区中。 这可以通过在Producer上配置键和partitioner.class参数来完成。 让我们从钥匙开始。 对于每次装运,必须相同。 使用原始消息中的任何业务标识符可以轻松实现这一点。 partitioner.class参数的默认值为DefaultPartitioner 。 使用此分区策略,默认行为如下:
- 如果在发送消息时明确指定了分区,那么我们将使用它。
- 如果未指定分区,但指定了键,请从键中通过哈希选择分区。
- 如果未指定分区和键,则依次选择分区(循环轮询)。
此外,通过使用参数max.in.flight.requests.per.connection = 1的键和幂等发送,可以有序地处理Consumer上的消息。 另外,值得记住的是,如果在群集上配置了访问控制,那么您将需要有权写入该主题的权限。
如果突然之间没有足够的可能性来通过密钥进行幂等发送,或者生产者端的逻辑要求保持不同分区之间的数据一致性,那么事务就可以解决。 此外,使用链式交易,您可以有条件地将Kafka中的记录与数据库中的记录进行同步。 为了使事务发送到Producer,必须具有幂等性,并可以选择设置transactional.id 。 如果在您的Kafka集群上配置了访问控制,则对于事务记录以及幂等,您将需要写许可权,可以通过mask使用transactional.id中存储的值来授予写许可权。
正式地,您可以使用任何字符串(例如,应用程序的名称)作为事务标识符。 但是,如果您使用相同的transactional.id运行同一应用程序的多个实例,则第一个启动的实例将因错误而停止,因为Kafka会将其视为僵尸进程。
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
为了解决此问题,我们以主机名的形式向应用程序名称添加后缀,该后缀是从环境变量获取的。
配置了生产者,但是Kafka上的事务仅控制消息范围。 无论事务状态如何,该消息都会立即成为主题,但是具有其他系统属性。
为了防止消费者提前读取此类消息,他需要将isolation.level参数设置为read_committed。 这样的使用者将能够像以前一样读取非事务性消息,并且仅在提交之后才能读取事务性消息。
如果您安装了上面列出的所有设置,则只需配置一次交付即可。 恭喜你!
但是还有一点细微差别。 我们在上面配置的Transactional.id实际上是交易前缀。 在事务管理器上,向其添加序列号。 接收的标识符在transactional.id.expiration.ms上发布,该标识符在Kafka集群上配置,默认值为“ 7 days”。 如果在这段时间内应用程序没有收到任何消息,那么当您尝试下一次事务发送时,您将收到InvalidPidMappingException 。 此后,事务协调器将为下一个事务发布新的序列号。 但是,如果未正确处理InvalidPidMappingException,则该消息可能会丢失。
代替总数
如您所见,仅向Kafka发送消息是不够的。 您需要选择参数的组合,并准备进行快速更改。 在本文中,我试图详细显示一次传递设置,并描述了我们遇到的几个client.id和transactional.id配置问题。 生产者和消费者设置摘要如下。
制片人:
- 臀部=全部
- 重试> 0
- enable.idempotence = true
- 最大飞行请求连接数≤5(1-有序发送)
- transactional.id = $ {应用程序名称}-$ {主机名}
消费者:
- isolated.level = read_committed
为了使将来的应用程序中的错误最小化,我们对spring配置进行了包装,其中已设置了某些列出的参数的值。
以下是一些可供独立学习的材料: