使用Apache Kafka的同步请求-响应

总体而言, 事件驱动架构 ,尤其是Apache Kafka ,最近引起了很多关注。 为了充分利用事件驱动的体系结构,事件委托机制必须实质上是异步的。 但是,可能存在一些特定的使用场景/流程,这些场景/流程需要使用Synchronous Request-Response的语义。 此版本显示了如何使用Apache Kafka实现请求-响应

@middle_java翻译

原始文章日期:2018年10月26日

Apache Kafka本质上是异步的。 因此,Apache Kafka 的请求-响应语义是不自然的。 但是,这一挑战并不新鲜。 企业集成模式请求-应答为异步通道上的同步消息传递提供了一种行之有效的机制:



Return Address模式用一种机制来补充Request-Reply模式,该机制使请求者可以指示将响应发送到的地址:



最近, Spring Kafka 2.1.3增加了来自Request Request模式框的支持,在2.2版中,它的一些粗糙度被抛光。 让我们看看这种支持是如何工作的:

客户端:ReplyingKafkaTemplate


模板的众所周知的抽象构成了Spring中Request-Reply机制的客户端部分的基础。

@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); } 

这里的一切都非常简单:我们设置了ReplyingKafkaTemplate ,它使用String键发送请求消息,并使用String键接收响应消息。 但是,ReplyingKafkaTemplate必须基于具有适当使用者和生产者配置的ProducerFactory请求,ConsumerFactory响应和MessageListenerContainer。 因此,所需的配置非常重要:

  @Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); } 

在这种情况下,使用replyKafkaTemplate发送同步请求并接收响应如下:

 @Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...); // producer record ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request); //       record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); //     Kafka          RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () { @Override public void onSuccess(ConsumerRecord < String, Reply > result) { //   consumer record Reply reply = result.value(); System.out.println("Reply: " + reply.toString()); } }); 

还有很多样板文件和一个低级API,甚至是这个过时的ListenableFuture API,而不是现代的CompletableFuture

requestReplyKafkaTemplate负责生成和设置KafkaHeaders.CORRELATION_ID标头,但是我们必须为请求显式设置KafkaHeaders.REPLY_TOPIC标头。 另请注意,在replyListenerContainer中 ,答案的同一主题太无意了。 糟透了 不完全符合我对Spring抽象的期望。

服务器端:@SendTo


在服务器端,通常在KafkaListener上侦听请求主题的地方还附加了@SendTo批注,以提供响应消息。 由侦听器方法返回的对象将自动包装在响应消息中,添加CORRELATION_ID,并将响应发布在REPLY_TOPIC标头中指定的主题中。

  @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); } 

这里也需要一些配置,但是侦听器配置更简单:

  @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; } 

但是消费者的多个实例呢?


直到我们使用了多个消费者实例,一切似乎都可以正常工作。 如果我们有多个客户端实例,则需要确保将响应发送到正确的客户端实例。 Spring Kafka文档假定每个使用者都可以使用唯一的主题,或者随请求一起发送附加的KafkaHeaders标头值RESPONSE_PARTITION是一个四字节字段,其中包含整数部分编号的BIG-ENDIAN表示形式。 为不同的客户使用单独的主题显然不是很灵活,因此我们选择显式的REPLY_PARTITION设置。 然后,客户端应知道将其分配给哪个分区。 该文档建议使用显式配置来选择特定分区。 让我们将其添加到我们的示例中:

  @Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ... 

不是很漂亮,但是可以。 所需的配置广泛,API看起来很底层。 显式配置分区的需求使动态扩展客户端数量的过程变得复杂。 显然,您可以做得更好。

封装主题处理以进行响应和分区


让我们从封装“ 返回地址”模式开始,然后传递响应和分区的主题。 响应的主题必须注入RequestReplyTemplate中 ,因此,根本不应该出现在API中。 当涉及到答案的分区时,我们将执行相反的操作:提取分配给该主题侦听器的答案的分区,然后自动转移该分区。 这消除了客户端处理这些标头的需要。
同时,让我们使API看起来像标准的KafkaTemplate使用简化的参数重载sendAndReceive()方法并使用默认主题添加相应的重载方法):

 public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } } 

下一步:使ListenableFuture适应更现代的CompletableFuture

 public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; //       requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () { @Override public void onSuccess(SendResult < K, V > sendResult) { // NOOP } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); //     requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () { @Override public void onSuccess(ConsumerRecord < K, R > result) { completableResult.complete(result.value()); } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); return completableResult; } } 

我们将其打包到一个实用程序库中,现在我们有了一个与Spring 主要设计理念“ Convention over Configuration”更加一致的API。 这是最终的客户代码:

  @Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); ); 

总结一下


总而言之,Spring for Kafka 2.2在Apache Kafka中提供了Request-Reply模式的全功能实现,但是API仍然有些粗糙。 在本期中,我们看到API的一些其他抽象和改编可以提供更合乎逻辑的高级API。

警告1:
事件驱动的体系结构的主要优点之一是事件产生者和消费者的解耦,这使得创建更加灵活和不断发展的系统成为可能。 当请求方和响应方密切相关时,使用同步语义“请求-响应”是完全相反的。 因此,仅应在必要时使用。

警告2:
如果需要同步的Request-Response ,那么与使用诸如Apache Kafka之类的异步通道相比,基于HTTP协议要简单得多且效率更高。
但是,在某些情况下, 通过Kafka进行同步请求-响应很有意义。 合理地选择最佳的工作工具。

可以在github.com/callistaenterprise/blog-synchronous-kafka找到完整的工作示例。

留言


Federico•7个月前
当我们有混合需求时,例如,在50%的情况下,我们需要请求-响应,而在50%的情况下,我们需要事件管理? 我们该怎么做? Spring Kafka所需的配置看起来非常糟糕。

Jehanzeb Qayyum•6个月前
现在,Spring在响应的一个常见主题中使用分区来提供默认支持。

从版本2.2开始,模板尝试从已配置的响应容器(答复容器)确定响应或分区的主题。

https://docs.spring.io/spring-kafka/reference/html/#replying-template

尼尔·罗森伯格•8个月前
你好
在上一段中,您写道,在某些情况下,与HTTP相比,通过Kafka进行同步的Request-Response有意义。
你能举这样的例子吗?
谢谢啦
尼尔

Janne Keskitalo nir rozenberg•8个月前
我们将把一个大容量的交易处理系统分成几个微服务,我有一个想法,就是使用Kafka的Request-Response消息传递来实现类似的处理亲和力。 基本上,Kafka用于将所有呼叫从一个客户端路由到同一事务处理器进程,然后依次一次执行一个。 这种类型的处理可确保线性化( https://stackoverflow.com/a/19515375/7430325 ),因果关系,并且还可以进行有效的缓存。 本质上,协调工作将从数据库转移到Kafka,我们可以在Serializable严格隔离模式下启动数据库。
我还没有深入研究事务语义的细节,以了解它的不足之处,所以这只是一个主意。

@middle_java翻译

Source: https://habr.com/ru/post/zh-CN476156/


All Articles