L'architecture événementielle en général, et
Apache Kafka en particulier, ont récemment attiré beaucoup d'attention. Pour tirer pleinement parti de l'architecture pilotée par les événements, le mécanisme de délégation d'événements doit être essentiellement asynchrone. Cependant, certains scénarios / flux d'utilisation spécifiques peuvent nécessiter la sémantique d'une
demande-réponse synchrone . Cette version montre comment implémenter
Request-Response à l' aide d'
Apache Kafka .
Traduit par
@middle_javaDate de l'article original: 26 octobre 2018
Apache Kafka est intrinsèquement asynchrone. Par conséquent,
la sémantique
Requête-Réponse pour Apache Kafka n'est pas naturelle. Cependant, ce défi n'est pas nouveau. Le modèle Enterprise Integration Pattern
Request-Reply fournit un mécanisme éprouvé pour la messagerie synchrone sur des canaux asynchrones:

Le modèle d'
adresse de retour complète le modèle de
demande-réponse avec un mécanisme permettant au demandeur d'indiquer l'adresse à laquelle la réponse doit être envoyée:

Récemment,
Spring Kafka 2.1.3 a ajouté la prise en charge de la boîte de configuration Request Request et, dans la version
2.2, une partie de sa rugosité a été polie. Voyons comment fonctionne ce support:
Côté client: ReplyingKafkaTemplate
L'abstraction bien connue du
modèle constitue la base de la partie client du mécanisme de demande-réponse dans Spring.
@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
Tout est assez simple ici: nous avons configuré
ReplyingKafkaTemplate , qui envoie des messages de demande avec des clés de chaîne et reçoit des messages de réponse avec des clés de chaîne. Cependant, le ReplyingKafkaTemplate doit être basé sur la demande ProducerFactory, la réponse ConsumerFactory et le MessageListenerContainer avec les configurations de consommateur et de producteur appropriées. Par conséquent, la configuration requise est assez lourde:
@Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
Dans ce cas, l'utilisation de
replyKafkaTemplate pour envoyer une demande synchrone et recevoir une réponse est la suivante:
@Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...);
Il y a aussi beaucoup de passe-partout et une API de bas niveau, et même cette API
ListenableFuture obsolète au lieu de la
CompletableFuture moderne.
requestRhnessKafkaTemplate se charge de générer et de définir l'en
- tête
KafkaHeaders.CORRELATION_ID , mais nous devons explicitement définir l'en
- tête
KafkaHeaders.REPLY_TOPIC pour la demande. Veuillez également noter que le même sujet pour la réponse était trop involontaire ci-dessus dans
replyListenerContainer . Un peu de boue. Pas tout à fait ce que j'attendais de l'abstraction du printemps.
Côté serveur: @SendTo
Côté serveur, l'écoute habituelle de
KafkaListener sur le sujet de la demande est en outre décorée de l'annotation
@SendTo pour fournir un message de réponse. L'objet renvoyé par la méthode d'écoute est automatiquement encapsulé dans le message de réponse,
CORRELATION_ID est ajouté et la réponse est publiée dans la rubrique spécifiée dans l'en-tête
REPLY_TOPIC .
@Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
Une configuration est également requise ici, mais la configuration de l'écouteur est plus simple:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
Mais qu'en est-il des instances multiples du consommateur?
Tout semble fonctionner jusqu'à ce que nous utilisions plusieurs instances du consommateur. Si nous avons plusieurs instances client, nous devons nous assurer que la réponse est envoyée à la bonne instance client. La documentation Spring Kafka suppose que chaque consommateur peut utiliser une rubrique unique ou qu'une valeur d'en-tête
KafkaHeaders supplémentaire est envoyée avec la demande.
RESPONSE_PARTITION est un champ de quatre octets contenant une représentation BIG-ENDIAN du numéro de section entier. L'utilisation de rubriques distinctes pour différents clients n'est clairement pas très flexible, nous choisissons donc le paramètre
REPLY_PARTITION explicite. Le client doit alors savoir à quelle partition il est affecté. La documentation suggère d'utiliser une configuration explicite pour sélectionner une partition particulière. Ajoutons-le à notre exemple:
@Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
Pas très joli, mais ça marche. La configuration requise est étendue et l'API semble de bas niveau. La nécessité de configurer explicitement les partitions complique le processus de mise à l'échelle dynamique du nombre de clients. De toute évidence, vous pouvez faire mieux.
Encapsulation du traitement des rubriques pour la réponse et la partition
Commençons par encapsuler le modèle d'
adresse de retour , en passant le sujet de la réponse et de la partition. La rubrique de la réponse doit être injectée dans le
RequestRhnessTemplate et, par conséquent, ne doit pas du tout être présente dans l'API. En ce qui concerne les partitions pour une réponse, nous ferons le contraire: extraire la ou les partitions affectées à l'écouteur de rubrique pour la réponse, et transférer cette partition automatiquement. Cela élimine la nécessité pour le client de prendre soin de ces en-têtes.
Dans le même temps, faisons également ressembler l'API à la
méthode KafkaTemplate standard (surchargez la méthode
sendAndReceive () avec des paramètres simplifiés et ajoutez les méthodes surchargées correspondantes en utilisant la rubrique par défaut):
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
Étape suivante: adapter le
ListenableFuture au
CompletableFuture plus moderne.
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } };
Nous allons l'intégrer dans une bibliothèque d'utilitaires et nous avons maintenant une API qui est beaucoup plus cohérente avec
la philosophie de conception principale de Spring,
«Convention over Configuration» . Voici le code client final:
@Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
Pour résumer
Pour résumer, Spring for Kafka 2.2 fournit une implémentation entièrement fonctionnelle du modèle de
demande-réponse dans Apache Kafka, mais l'API a encore quelques aspérités. Dans ce numéro, nous avons vu que certaines abstractions et adaptations supplémentaires de l'API peuvent fournir une API de haut niveau plus logique.
Avertissement 1:L'un des principaux avantages d'une architecture événementielle est le découplage des producteurs d'événements et des consommateurs, ce qui permet de créer des systèmes beaucoup plus flexibles et évolutifs. L'utilisation de la sémantique synchrone «Request-Response» est exactement le contraire lorsque les parties requérantes et répondantes sont fortement liées. Par conséquent, il ne doit être utilisé qu'en cas de besoin.
Avertissement 2:Si une
demande-réponse synchrone est requise, le
protocole basé sur
HTTP est beaucoup plus simple et plus efficace que l'utilisation d'
un canal asynchrone comme Apache Kafka .
Cependant, il peut y avoir des scénarios où une
demande-réponse synchrone via Kafka est logique. Choisissez raisonnablement le meilleur outil pour le travail.
Un exemple pleinement fonctionnel peut être trouvé sur
github.com/callistaenterprise/blog-synchronous-kafka .
Commentaires
Federico • il y a 7 moisEt lorsque nous avons des besoins hybrides, par exemple, dans 50% des cas, nous avons besoin d'une demande-réponse et dans 50%, nous avons besoin de la gestion des événements? Comment fait-on cela? La configuration requise par Spring Kafka est assez horrible.
Jehanzeb Qayyum • il y a 6 moisSpring prend désormais en charge par défaut l'utilisation de partitions dans un sujet commun pour la réponse.
À partir de la version 2.2, le modèle tente de déterminer la rubrique pour la réponse ou la partition à partir du conteneur de réponses configuré (conteneur de réponses).
https://docs.spring.io/spring-kafka/reference/html/#replying-templatenir rozenberg • il y a 8 moisSalut
Dans le dernier paragraphe, vous avez écrit qu'il peut y avoir des scénarios où une demande-réponse synchrone via Kafka est logique par rapport à HTTP.
Pouvez-vous donner des exemples de tels scénarios?
Je vous remercie
Nir
Janne Keskitalo nir rozenberg • il y a 8 moisNous allons diviser un système de traitement de transactions à gros volume en plusieurs microservices, et j'ai une idée d'utiliser la messagerie Request-Response de Kafka pour obtenir une affinité de traitement similaire. Fondamentalement, Kafka est utilisé pour router tous les appels d'un client vers le même processus de processeur de transactions, qui les exécute ensuite séquentiellement un par un. Ce type de traitement garantit la linéarisation (
https://stackoverflow.com/a/19515375/7430325 ), la connectivité causale et permet également une mise en cache efficace. Essentiellement, les efforts de coordination seraient transférés de la base de données vers Kafka, et nous pourrions démarrer la base de données en mode d'isolement strict sérialisable.
Je n'ai pas encore fouillé dans les détails de notre sémantique de transaction pour voir où elle échoue, donc ce n'est qu'une idée.
Traduit par
@middle_java