总体而言,
事件驱动架构 ,尤其是
Apache Kafka ,最近引起了很多关注。 为了充分利用事件驱动的体系结构,事件委托机制必须实质上是异步的。 但是,可能存在一些特定的使用场景/流程,这些场景/流程需要使用
Synchronous Request-Response的语义。 此版本显示了如何使用
Apache Kafka实现
请求-响应 。
由
@middle_java翻译
原始文章日期:2018年10月26日
Apache Kafka本质上是异步的。 因此,Apache Kafka
的请求-响应语义是不自然的。 但是,这一挑战并不新鲜。 企业集成模式
请求-应答为异步通道上的同步消息传递提供了一种行之有效的机制:
Return Address模式用一种机制来补充
Request-Reply模式,该机制使请求者可以指示将响应发送到的地址:

最近,
Spring Kafka 2.1.3增加了来自Request Request模式框的支持,在
2.2版中,它的一些粗糙度被抛光。 让我们看看这种支持是如何工作的:
客户端:ReplyingKafkaTemplate
模板的众所周知的抽象构成了Spring中Request-Reply机制的客户端部分的基础。
@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
这里的一切都非常简单:我们设置了
ReplyingKafkaTemplate ,它使用String键发送请求消息,并使用String键接收响应消息。 但是,ReplyingKafkaTemplate必须基于具有适当使用者和生产者配置的ProducerFactory请求,ConsumerFactory响应和MessageListenerContainer。 因此,所需的配置非常重要:
@Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
在这种情况下,使用
replyKafkaTemplate发送同步请求并接收响应如下:
@Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...);
还有很多样板文件和一个低级API,甚至是这个过时的
ListenableFuture API,而不是现代的
CompletableFuture 。
requestReplyKafkaTemplate负责生成和设置
KafkaHeaders.CORRELATION_ID标头,但是我们必须为请求显式设置
KafkaHeaders.REPLY_TOPIC标头。 另请注意,在
replyListenerContainer中 ,答案的同一主题太无意了。 糟透了 不完全符合我对Spring抽象的期望。
服务器端:@SendTo
在服务器端,通常在
KafkaListener上侦听请求主题的地方还附加了
@SendTo批注,以提供响应消息。 由侦听器方法返回的对象将自动包装在响应消息中,添加
CORRELATION_ID,并将响应发布在
REPLY_TOPIC标头中指定的主题中。
@Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
这里也需要一些配置,但是侦听器配置更简单:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
但是消费者的多个实例呢?
直到我们使用了多个消费者实例,一切似乎都可以正常工作。 如果我们有多个客户端实例,则需要确保将响应发送到正确的客户端实例。 Spring Kafka文档假定每个使用者都可以使用唯一的主题,或者随请求一起发送附加的
KafkaHeaders标头值
RESPONSE_PARTITION是一个四字节字段,其中包含整数部分编号的BIG-ENDIAN表示形式。 为不同的客户使用单独的主题显然不是很灵活,因此我们选择显式的
REPLY_PARTITION设置。 然后,客户端应知道将其分配给哪个分区。 该文档建议使用显式配置来选择特定分区。 让我们将其添加到我们的示例中:
@Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
不是很漂亮,但是可以。 所需的配置广泛,API看起来很底层。 显式配置分区的需求使动态扩展客户端数量的过程变得复杂。 显然,您可以做得更好。
封装主题处理以进行响应和分区
让我们从封装“
返回地址”模式开始,然后传递响应和分区的主题。 响应的主题必须注入
RequestReplyTemplate中 ,因此,根本不应该出现在API中。 当涉及到答案的分区时,我们将执行相反的操作:提取分配给该主题侦听器的答案的分区,然后自动转移该分区。 这消除了客户端处理这些标头的需要。
同时,让我们使API看起来像标准的
KafkaTemplate (
使用简化的参数重载
sendAndReceive()方法
,并使用默认主题添加相应的重载方法):
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
下一步:使
ListenableFuture适应更现代的
CompletableFuture 。
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } };
我们将其打包到一个实用程序库中,现在我们有了一个与Spring
的主要设计理念
“ Convention over Configuration”更加一致的API。 这是最终的客户代码:
@Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
总结一下
总而言之,Spring for Kafka 2.2在Apache Kafka中提供了
Request-Reply模式的全功能实现,但是API仍然有些粗糙。 在本期中,我们看到API的一些其他抽象和改编可以提供更合乎逻辑的高级API。
警告1:事件驱动的体系结构的主要优点之一是事件产生者和消费者的解耦,这使得创建更加灵活和不断发展的系统成为可能。 当请求方和响应方密切相关时,使用同步语义“请求-响应”是完全相反的。 因此,仅应在必要时使用。
警告2:如果需要
同步的Request-Response ,那么与使用
诸如Apache Kafka之类的异步通道相比,基于
HTTP的
协议要简单得多且效率更高。
但是,在某些情况下,
通过Kafka进行
同步请求-响应很有意义。 合理地选择最佳的工作工具。
可以在
github.com/callistaenterprise/blog-synchronous-kafka找到完整的工作示例。
留言
Federico•7个月前当我们有混合需求时,例如,在50%的情况下,我们需要请求-响应,而在50%的情况下,我们需要事件管理? 我们该怎么做? Spring Kafka所需的配置看起来非常糟糕。
Jehanzeb Qayyum•6个月前现在,Spring在响应的一个常见主题中使用分区来提供默认支持。
从版本2.2开始,模板尝试从已配置的响应容器(答复容器)确定响应或分区的主题。
https://docs.spring.io/spring-kafka/reference/html/#replying-template尼尔·罗森伯格•8个月前你好
在上一段中,您写道,在某些情况下,与HTTP相比,通过Kafka进行同步的Request-Response有意义。
你能举这样的例子吗?
谢谢啦
尼尔
Janne Keskitalo nir rozenberg•8个月前我们将把一个大容量的交易处理系统分成几个微服务,我有一个想法,就是使用Kafka的Request-Response消息传递来实现类似的处理亲和力。 基本上,Kafka用于将所有呼叫从一个客户端路由到同一事务处理器进程,然后依次一次执行一个。 这种类型的处理可确保线性化(
https://stackoverflow.com/a/19515375/7430325 ),因果关系,并且还可以进行有效的缓存。 本质上,协调工作将从数据库转移到Kafka,我们可以在Serializable严格隔离模式下启动数据库。
我还没有深入研究事务语义的细节,以了解它的不足之处,所以这只是一个主意。
由
@middle_java翻译