Synchronous Request-Response à l'aide d'Apache Kafka

L'architecture événementielle en général, et Apache Kafka en particulier, ont récemment attiré beaucoup d'attention. Pour tirer pleinement parti de l'architecture pilotée par les événements, le mécanisme de délégation d'événements doit être essentiellement asynchrone. Cependant, certains scénarios / flux d'utilisation spécifiques peuvent nécessiter la sémantique d'une demande-réponse synchrone . Cette version montre comment implémenter Request-Response à l' aide d' Apache Kafka .

Traduit par @middle_java

Date de l'article original: 26 octobre 2018

Apache Kafka est intrinsèquement asynchrone. Par conséquent, la sémantique Requête-Réponse pour Apache Kafka n'est pas naturelle. Cependant, ce défi n'est pas nouveau. Le modèle Enterprise Integration Pattern Request-Reply fournit un mécanisme éprouvé pour la messagerie synchrone sur des canaux asynchrones:



Le modèle d' adresse de retour complète le modèle de demande-réponse avec un mécanisme permettant au demandeur d'indiquer l'adresse à laquelle la réponse doit être envoyée:



Récemment, Spring Kafka 2.1.3 a ajouté la prise en charge de la boîte de configuration Request Request et, dans la version 2.2, une partie de sa rugosité a été polie. Voyons comment fonctionne ce support:

Côté client: ReplyingKafkaTemplate


L'abstraction bien connue du modèle constitue la base de la partie client du mécanisme de demande-réponse dans Spring.

@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); } 

Tout est assez simple ici: nous avons configuré ReplyingKafkaTemplate , qui envoie des messages de demande avec des clés de chaîne et reçoit des messages de réponse avec des clés de chaîne. Cependant, le ReplyingKafkaTemplate doit être basé sur la demande ProducerFactory, la réponse ConsumerFactory et le MessageListenerContainer avec les configurations de consommateur et de producteur appropriées. Par conséquent, la configuration requise est assez lourde:

  @Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); } 

Dans ce cas, l'utilisation de replyKafkaTemplate pour envoyer une demande synchrone et recevoir une réponse est la suivante:

 @Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...); // producer record ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request); //       record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); //     Kafka          RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () { @Override public void onSuccess(ConsumerRecord < String, Reply > result) { //   consumer record Reply reply = result.value(); System.out.println("Reply: " + reply.toString()); } }); 

Il y a aussi beaucoup de passe-partout et une API de bas niveau, et même cette API ListenableFuture obsolète au lieu de la CompletableFuture moderne.

requestRhnessKafkaTemplate se charge de générer et de définir l'en - tête KafkaHeaders.CORRELATION_ID , mais nous devons explicitement définir l'en - tête KafkaHeaders.REPLY_TOPIC pour la demande. Veuillez également noter que le même sujet pour la réponse était trop involontaire ci-dessus dans replyListenerContainer . Un peu de boue. Pas tout à fait ce que j'attendais de l'abstraction du printemps.

Côté serveur: @SendTo


Côté serveur, l'écoute habituelle de KafkaListener sur le sujet de la demande est en outre décorée de l'annotation @SendTo pour fournir un message de réponse. L'objet renvoyé par la méthode d'écoute est automatiquement encapsulé dans le message de réponse, CORRELATION_ID est ajouté et la réponse est publiée dans la rubrique spécifiée dans l'en-tête REPLY_TOPIC .

  @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); } 

Une configuration est également requise ici, mais la configuration de l'écouteur est plus simple:

  @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; } 

Mais qu'en est-il des instances multiples du consommateur?


Tout semble fonctionner jusqu'à ce que nous utilisions plusieurs instances du consommateur. Si nous avons plusieurs instances client, nous devons nous assurer que la réponse est envoyée à la bonne instance client. La documentation Spring Kafka suppose que chaque consommateur peut utiliser une rubrique unique ou qu'une valeur d'en-tête KafkaHeaders supplémentaire est envoyée avec la demande. RESPONSE_PARTITION est un champ de quatre octets contenant une représentation BIG-ENDIAN du numéro de section entier. L'utilisation de rubriques distinctes pour différents clients n'est clairement pas très flexible, nous choisissons donc le paramètre REPLY_PARTITION explicite. Le client doit alors savoir à quelle partition il est affecté. La documentation suggère d'utiliser une configuration explicite pour sélectionner une partition particulière. Ajoutons-le à notre exemple:

  @Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ... 

Pas très joli, mais ça marche. La configuration requise est étendue et l'API semble de bas niveau. La nécessité de configurer explicitement les partitions complique le processus de mise à l'échelle dynamique du nombre de clients. De toute évidence, vous pouvez faire mieux.

Encapsulation du traitement des rubriques pour la réponse et la partition


Commençons par encapsuler le modèle d' adresse de retour , en passant le sujet de la réponse et de la partition. La rubrique de la réponse doit être injectée dans le RequestRhnessTemplate et, par conséquent, ne doit pas du tout être présente dans l'API. En ce qui concerne les partitions pour une réponse, nous ferons le contraire: extraire la ou les partitions affectées à l'écouteur de rubrique pour la réponse, et transférer cette partition automatiquement. Cela élimine la nécessité pour le client de prendre soin de ces en-têtes.
Dans le même temps, faisons également ressembler l'API à la méthode KafkaTemplate standard (surchargez la méthode sendAndReceive () avec des paramètres simplifiés et ajoutez les méthodes surchargées correspondantes en utilisant la rubrique par défaut):

 public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } } 

Étape suivante: adapter le ListenableFuture au CompletableFuture plus moderne.

 public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; //       requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () { @Override public void onSuccess(SendResult < K, V > sendResult) { // NOOP } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); //     requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () { @Override public void onSuccess(ConsumerRecord < K, R > result) { completableResult.complete(result.value()); } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); return completableResult; } } 

Nous allons l'intégrer dans une bibliothèque d'utilitaires et nous avons maintenant une API qui est beaucoup plus cohérente avec la philosophie de conception principale de Spring, «Convention over Configuration» . Voici le code client final:

  @Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); ); 

Pour résumer


Pour résumer, Spring for Kafka 2.2 fournit une implémentation entièrement fonctionnelle du modèle de demande-réponse dans Apache Kafka, mais l'API a encore quelques aspérités. Dans ce numéro, nous avons vu que certaines abstractions et adaptations supplémentaires de l'API peuvent fournir une API de haut niveau plus logique.

Avertissement 1:
L'un des principaux avantages d'une architecture événementielle est le découplage des producteurs d'événements et des consommateurs, ce qui permet de créer des systèmes beaucoup plus flexibles et évolutifs. L'utilisation de la sémantique synchrone «Request-Response» est exactement le contraire lorsque les parties requérantes et répondantes sont fortement liées. Par conséquent, il ne doit être utilisé qu'en cas de besoin.

Avertissement 2:
Si une demande-réponse synchrone est requise, le protocole basé sur HTTP est beaucoup plus simple et plus efficace que l'utilisation d' un canal asynchrone comme Apache Kafka .
Cependant, il peut y avoir des scénarios où une demande-réponse synchrone via Kafka est logique. Choisissez raisonnablement le meilleur outil pour le travail.

Un exemple pleinement fonctionnel peut être trouvé sur github.com/callistaenterprise/blog-synchronous-kafka .

Commentaires


Federico • il y a 7 mois
Et lorsque nous avons des besoins hybrides, par exemple, dans 50% des cas, nous avons besoin d'une demande-réponse et dans 50%, nous avons besoin de la gestion des événements? Comment fait-on cela? La configuration requise par Spring Kafka est assez horrible.

Jehanzeb Qayyum • il y a 6 mois
Spring prend désormais en charge par défaut l'utilisation de partitions dans un sujet commun pour la réponse.

À partir de la version 2.2, le modèle tente de déterminer la rubrique pour la réponse ou la partition à partir du conteneur de réponses configuré (conteneur de réponses).

https://docs.spring.io/spring-kafka/reference/html/#replying-template

nir rozenberg • il y a 8 mois
Salut
Dans le dernier paragraphe, vous avez écrit qu'il peut y avoir des scénarios où une demande-réponse synchrone via Kafka est logique par rapport à HTTP.
Pouvez-vous donner des exemples de tels scénarios?
Je vous remercie
Nir

Janne Keskitalo nir rozenberg • il y a 8 mois
Nous allons diviser un système de traitement de transactions à gros volume en plusieurs microservices, et j'ai une idée d'utiliser la messagerie Request-Response de Kafka pour obtenir une affinité de traitement similaire. Fondamentalement, Kafka est utilisé pour router tous les appels d'un client vers le même processus de processeur de transactions, qui les exécute ensuite séquentiellement un par un. Ce type de traitement garantit la linéarisation ( https://stackoverflow.com/a/19515375/7430325 ), la connectivité causale et permet également une mise en cache efficace. Essentiellement, les efforts de coordination seraient transférés de la base de données vers Kafka, et nous pourrions démarrer la base de données en mode d'isolement strict sérialisable.
Je n'ai pas encore fouillé dans les détails de notre sémantique de transaction pour voir où elle échoue, donc ce n'est qu'une idée.

Traduit par @middle_java

Source: https://habr.com/ru/post/fr476156/


All Articles