Synchrone Anfrage-Antwort mit Apache Kafka

Event Driven Architecture im Allgemeinen und Apache Kafka im Besonderen haben in letzter Zeit viel Aufmerksamkeit erregt. Um die ereignisgesteuerte Architektur voll ausnutzen zu können, muss der Ereignisdelegierungsmechanismus im Wesentlichen asynchron sein. Es kann jedoch bestimmte Verwendungsszenarien / -flüsse geben, die die Semantik einer synchronen Anforderungsantwort erfordern. Diese Version zeigt, wie Request-Response mit Apache Kafka implementiert wird.

Übersetzt von @middle_java

Originalartikel Datum: 26. Oktober 2018

Apache Kafka ist von Natur aus asynchron. Daher ist die Request-Response- Semantik für Apache Kafka nicht selbstverständlich. Diese Herausforderung ist jedoch nicht neu. Das Enterprise Integration Pattern Request-Reply bietet einen bewährten Mechanismus für synchrones Messaging über asynchrone Kanäle:



Das Rücksendeadressenmuster ergänzt das Anforderungs- / Antwortmuster um einen Mechanismus, mit dem der Anforderer die Adresse angibt, an die die Antwort gesendet werden soll:



Vor kurzem hat Spring Kafka 2.1.3 die Unterstützung aus dem Musterfeld „Request Reply“ hinzugefügt, und in Version 2.2 wurde ein Teil der Rauheit poliert. Mal sehen, wie diese Unterstützung funktioniert:

Client-Seite: ReplyingKafkaTemplate


Die bekannte Abstraktion des Templates bildet die Basis für den Client-Teil des Request-Reply-Mechanismus im Frühjahr.

@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); } 

Hier ist alles ganz einfach: Wir haben ReplyingKafkaTemplate eingerichtet , das Anforderungsnachrichten mit String-Schlüsseln sendet und Antwortnachrichten mit String-Schlüsseln empfängt. Das ReplyingKafkaTemplate muss jedoch auf der ProducerFactory-Anforderung, der ConsumerFactory-Antwort und dem MessageListenerContainer mit den entsprechenden Consumer- und Producer-Konfigurationen basieren. Daher ist die erforderliche Konfiguration ziemlich schwer:

  @Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); } 

In diesem Fall lautet die Verwendung von replyKafkaTemplate zum Senden einer synchronen Anforderung und zum Empfangen einer Antwort wie folgt:

 @Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...); // producer record ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request); //       record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); //     Kafka          RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () { @Override public void onSuccess(ConsumerRecord < String, Reply > result) { //   consumer record Reply reply = result.value(); System.out.println("Reply: " + reply.toString()); } }); 

Es gibt auch viel Boilerplate und eine Low-Level-API und sogar diese veraltete ListenableFuture- API anstelle der modernen CompletableFuture .

requestReplyKafkaTemplate übernimmt das Generieren und Festlegen des Headers KafkaHeaders.CORRELATION_ID. Der Header KafkaHeaders.REPLY_TOPIC muss jedoch explizit für die Anforderung festgelegt werden. Bitte beachten Sie auch, dass dasselbe Thema für die Antwort oben in replyListenerContainer zu ungewollt war. Etwas Mist. Nicht ganz das, was ich von der Frühlingsabstraktion erwartet hatte.

Serverseite: @SendTo


Auf der Serverseite wird der übliche KafkaListener , der das Thema auf die Anforderung abhört, zusätzlich mit der Anmerkung @SendTo versehen , um eine Antwortnachricht bereitzustellen. Das von der Listener-Methode zurückgegebene Objekt wird automatisch in die Antwortnachricht eingeschlossen, CORRELATION_ID wird hinzugefügt , und die Antwort wird in dem im REPLY_TOPIC- Header angegebenen Thema veröffentlicht.

  @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); } 

Hier ist ebenfalls eine gewisse Konfiguration erforderlich, die Listener-Konfiguration ist jedoch einfacher:

  @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; } 

Aber was ist mit mehreren Instanzen des Verbrauchers?


Alles scheint zu funktionieren, bis wir mehrere Instanzen des Verbrauchers verwenden. Wenn wir mehrere Client-Instanzen haben, müssen wir sicherstellen, dass die Antwort an die richtige Client-Instanz gesendet wird. In der Spring Kafka-Dokumentation wird davon ausgegangen, dass jeder Consumer ein eindeutiges Thema verwenden kann oder dass mit der Anforderung ein zusätzlicher KafkaHeaders-Headerwert gesendet wird RESPONSE_PARTITION ist ein 4-Byte-Feld, das eine BIG-ENDIAN-Darstellung der Ganzzahl-Abschnittsnummer enthält. Die Verwendung separater Themen für verschiedene Clients ist eindeutig nicht sehr flexibel, daher wählen wir die explizite Einstellung REPLY_PARTITION . Dann sollte der Client wissen, welcher Partition er zugeordnet ist. In der Dokumentation wird vorgeschlagen, eine explizite Konfiguration zu verwenden, um eine bestimmte Partition auszuwählen. Fügen wir es unserem Beispiel hinzu:

  @Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ... 

Nicht sehr hübsch, aber es funktioniert. Die erforderliche Konfiguration ist umfangreich und die API sieht auf niedriger Ebene aus. Die Notwendigkeit, Partitionen explizit zu konfigurieren, erschwert das dynamische Skalieren der Anzahl der Clients. Natürlich können Sie es besser machen.

Kapselung der Themenverarbeitung für Antwort und Partition


Beginnen wir mit der Kapselung des Rücksprungadressenmusters und geben das Thema für die Antwort und die Partition weiter. Das Thema für die Antwort muss in das RequestReplyTemplate eingefügt werden und sollte daher überhaupt nicht in der API vorhanden sein. Wenn es um Partitionen für eine Antwort geht, machen wir das Gegenteil: Extrahieren Sie die Partition (en), die dem Themenlistener für die Antwort zugewiesen sind, und übertragen Sie diese Partition automatisch. Dadurch muss sich der Client nicht mehr um diese Header kümmern.
Nehmen wir zur gleichen Zeit auch die API so vor, dass sie der Standard- KafkaTemplate ähnelt (überladen Sie die sendAndReceive () -Methode mit vereinfachten Parametern und fügen Sie die entsprechenden überladenen Methoden unter Verwendung des Standardthemas hinzu):

 public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } } 

Nächster Schritt: Anpassung der ListenableFuture an die modernere CompletableFuture .

 public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; //       requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () { @Override public void onSuccess(SendResult < K, V > sendResult) { // NOOP } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); //     requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () { @Override public void onSuccess(ConsumerRecord < K, R > result) { completableResult.complete(result.value()); } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); return completableResult; } } 

Wir werden dies in eine Hilfsprogrammbibliothek packen und haben jetzt eine API, die viel besser mit Spring ' Hauptentwurfsphilosophie „Convention over Configuration“ übereinstimmt. Hier ist der endgültige Client-Code:

  @Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); ); 

Um es zusammenzufassen


Zusammenfassend bietet Spring for Kafka 2.2 eine voll funktionsfähige Implementierung des Request-Reply- Musters in Apache Kafka, die API weist jedoch noch einige Ecken und Kanten auf. In dieser Ausgabe haben wir gesehen, dass einige zusätzliche Abstraktionen und Anpassungen der API eine logischere API auf hoher Ebene bieten können.

Warnung 1:
Einer der Hauptvorteile einer ereignisgesteuerten Architektur ist die Entkopplung von Ereignisproduzenten und -konsumenten, die es ermöglicht, flexiblere und sich weiterentwickelnde Systeme zu erstellen. Verwendung der synchronen Semantik „Request-Response“ ist das genaue Gegenteil, wenn die anfordernden und antwortenden Parteien eng miteinander verwandt sind. Daher sollte es nur bei Bedarf verwendet werden.

Warnung 2:
Wenn eine synchrone Request-Response erforderlich ist, ist das HTTP- basierte Protokoll viel einfacher und effizienter als die Verwendung eines asynchronen Kanals wie Apache Kafka .
Es kann jedoch Szenarien geben, in denen eine synchrone Anfrage-Antwort über Kafka sinnvoll ist. Wählen Sie vernünftigerweise das beste Werkzeug für den Job.

Ein voll funktionsfähiges Beispiel finden Sie unter github.com/callistaenterprise/blog-synchronous-kafka .

Kommentare


Federico • vor 7 Monaten
Und wenn wir hybride Bedürfnisse haben, brauchen wir zum Beispiel in 50% der Fälle eine Anfrage-Antwort und in 50% ein Event-Management? Wie machen wir das? Die Konfiguration, die Spring Kafka benötigt, sieht ziemlich schrecklich aus.

Jehanzeb Qayyum • vor 6 Monaten
Spring unterstützt jetzt standardmäßig Partitionen in einem gemeinsamen Thema für die Antwort.

Ab Version 2.2 versucht die Vorlage, das Thema für die Antwort oder Partition aus dem konfigurierten Antwortcontainer (Antwortcontainer) zu ermitteln.

https://docs.spring.io/spring-kafka/reference/html/#replying-template

nir rozenberg • vor 8 Monaten
Hallo,
Im letzten Absatz haben Sie geschrieben, dass es Szenarien geben kann, in denen eine synchrone Request-Response über Kafka im Vergleich zu HTTP sinnvoll ist.
Können Sie Beispiele für solche Szenarien nennen?
Vielen Dank,
Nir

Janne Keskitalo nir rozenberg • vor 8 Monaten
Wir werden ein großvolumiges Transaktionsverarbeitungssystem in mehrere Microservices aufteilen, und ich habe die Idee, Kafkas Request-Response-Messaging zu verwenden, um eine ähnliche Verarbeitungsaffinität zu erzielen. Grundsätzlich wird Kafka verwendet, um alle Aufrufe von einem Client an denselben Transaktionsprozessorprozess weiterzuleiten, der sie dann nacheinander einzeln ausführt. Diese Art der Verarbeitung gewährleistet Linearisierbarkeit ( https://stackoverflow.com/a/19515375/7430325 ), kausale Konnektivität und ermöglicht auch effizientes Caching. Im Wesentlichen würden die Koordinierungsbemühungen von der Datenbank auf Kafka übertragen, und wir könnten die Datenbank im seriellen strengen Isolationsmodus starten.
Ich muss mich noch mit den Details unserer Transaktionssemantik befassen, um zu sehen, wo es nicht funktioniert. Dies ist also nur eine Idee.

Übersetzt von @middle_java

Source: https://habr.com/ru/post/de476156/


All Articles