Event Driven Architecture im Allgemeinen und
Apache Kafka im Besonderen haben in letzter Zeit viel Aufmerksamkeit erregt. Um die ereignisgesteuerte Architektur voll ausnutzen zu können, muss der Ereignisdelegierungsmechanismus im Wesentlichen asynchron sein. Es kann jedoch bestimmte Verwendungsszenarien / -flüsse geben, die die Semantik einer
synchronen Anforderungsantwort erfordern. Diese Version zeigt, wie
Request-Response mit
Apache Kafka implementiert wird.
Übersetzt von
@middle_javaOriginalartikel Datum: 26. Oktober 2018
Apache Kafka ist von Natur aus asynchron. Daher ist
die Request-Response- Semantik für Apache Kafka nicht selbstverständlich. Diese Herausforderung ist jedoch nicht neu. Das Enterprise Integration Pattern
Request-Reply bietet einen bewährten Mechanismus für synchrones Messaging über asynchrone Kanäle:

Das
Rücksendeadressenmuster ergänzt das
Anforderungs- / Antwortmuster um einen Mechanismus, mit dem der Anforderer die Adresse angibt, an die die Antwort gesendet werden soll:

Vor kurzem hat
Spring Kafka 2.1.3 die Unterstützung aus dem Musterfeld „Request Reply“ hinzugefügt, und in Version
2.2 wurde ein Teil der Rauheit poliert. Mal sehen, wie diese Unterstützung funktioniert:
Client-Seite: ReplyingKafkaTemplate
Die bekannte Abstraktion des
Templates bildet die Basis für den Client-Teil des Request-Reply-Mechanismus im Frühjahr.
@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
Hier ist alles ganz einfach: Wir haben
ReplyingKafkaTemplate eingerichtet , das Anforderungsnachrichten mit String-Schlüsseln sendet und Antwortnachrichten mit String-Schlüsseln empfängt. Das ReplyingKafkaTemplate muss jedoch auf der ProducerFactory-Anforderung, der ConsumerFactory-Antwort und dem MessageListenerContainer mit den entsprechenden Consumer- und Producer-Konfigurationen basieren. Daher ist die erforderliche Konfiguration ziemlich schwer:
@Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
In diesem Fall
lautet die Verwendung von
replyKafkaTemplate zum Senden einer synchronen Anforderung und zum Empfangen einer Antwort wie folgt:
@Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...);
Es gibt auch viel Boilerplate und eine Low-Level-API und sogar diese veraltete
ListenableFuture- API anstelle der modernen
CompletableFuture .
requestReplyKafkaTemplate übernimmt das Generieren und Festlegen des Headers
KafkaHeaders.CORRELATION_ID. Der Header
KafkaHeaders.REPLY_TOPIC muss jedoch explizit für die Anforderung festgelegt werden. Bitte beachten Sie auch, dass dasselbe Thema für die Antwort oben in
replyListenerContainer zu ungewollt war. Etwas Mist. Nicht ganz das, was ich von der Frühlingsabstraktion erwartet hatte.
Serverseite: @SendTo
Auf der Serverseite wird der übliche
KafkaListener , der das Thema auf die Anforderung
abhört, zusätzlich mit der Anmerkung
@SendTo versehen , um eine Antwortnachricht bereitzustellen. Das von der Listener-Methode zurückgegebene Objekt wird automatisch in die Antwortnachricht eingeschlossen,
CORRELATION_ID wird hinzugefügt
, und die Antwort wird in dem im
REPLY_TOPIC- Header angegebenen Thema veröffentlicht.
@Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
Hier ist ebenfalls eine gewisse Konfiguration erforderlich, die Listener-Konfiguration ist jedoch einfacher:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
Aber was ist mit mehreren Instanzen des Verbrauchers?
Alles scheint zu funktionieren, bis wir mehrere Instanzen des Verbrauchers verwenden. Wenn wir mehrere Client-Instanzen haben, müssen wir sicherstellen, dass die Antwort an die richtige Client-Instanz gesendet wird. In der Spring Kafka-Dokumentation wird davon
ausgegangen, dass jeder Consumer ein eindeutiges Thema verwenden kann oder dass mit der Anforderung ein zusätzlicher
KafkaHeaders-Headerwert gesendet wird
RESPONSE_PARTITION ist ein 4-Byte-Feld, das eine BIG-ENDIAN-Darstellung der Ganzzahl-Abschnittsnummer enthält. Die Verwendung separater Themen für verschiedene Clients ist eindeutig nicht sehr flexibel, daher wählen wir die explizite Einstellung
REPLY_PARTITION . Dann sollte der Client wissen, welcher Partition er zugeordnet ist. In der Dokumentation wird vorgeschlagen, eine explizite Konfiguration zu verwenden, um eine bestimmte Partition auszuwählen. Fügen wir es unserem Beispiel hinzu:
@Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
Nicht sehr hübsch, aber es funktioniert. Die erforderliche Konfiguration ist umfangreich und die API sieht auf niedriger Ebene aus. Die Notwendigkeit, Partitionen explizit zu konfigurieren, erschwert das dynamische Skalieren der Anzahl der Clients. Natürlich können Sie es besser machen.
Kapselung der Themenverarbeitung für Antwort und Partition
Beginnen wir mit der Kapselung des
Rücksprungadressenmusters und geben das Thema für die Antwort und die Partition weiter. Das Thema für die Antwort muss in das
RequestReplyTemplate eingefügt werden und sollte daher überhaupt nicht in der API vorhanden sein. Wenn es um Partitionen für eine Antwort geht, machen wir das Gegenteil: Extrahieren Sie die Partition (en), die dem Themenlistener für die Antwort zugewiesen sind, und übertragen Sie diese Partition automatisch. Dadurch muss sich der Client nicht mehr um diese Header kümmern.
Nehmen wir zur gleichen Zeit auch die API so vor, dass sie der Standard-
KafkaTemplate ähnelt (überladen Sie die
sendAndReceive () -Methode
mit vereinfachten Parametern und fügen Sie die entsprechenden überladenen Methoden unter Verwendung des Standardthemas hinzu):
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
Nächster Schritt: Anpassung der
ListenableFuture an die modernere
CompletableFuture .
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } };
Wir werden dies in eine Hilfsprogrammbibliothek packen und haben jetzt eine API, die viel besser mit Spring
' Hauptentwurfsphilosophie
„Convention over Configuration“ übereinstimmt. Hier ist der endgültige Client-Code:
@Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
Um es zusammenzufassen
Zusammenfassend bietet Spring for Kafka 2.2 eine voll funktionsfähige Implementierung des
Request-Reply- Musters in Apache Kafka, die API weist jedoch noch einige Ecken und Kanten auf. In dieser Ausgabe haben wir gesehen, dass einige zusätzliche Abstraktionen und Anpassungen der API eine logischere API auf hoher Ebene bieten können.
Warnung 1:Einer der Hauptvorteile einer ereignisgesteuerten Architektur ist die Entkopplung von Ereignisproduzenten und -konsumenten, die es ermöglicht, flexiblere und sich weiterentwickelnde Systeme zu erstellen. Verwendung der synchronen Semantik „Request-Response“ ist das genaue Gegenteil, wenn die anfordernden und antwortenden Parteien eng miteinander verwandt sind. Daher sollte es nur bei Bedarf verwendet werden.
Warnung 2:Wenn eine
synchrone Request-Response erforderlich ist, ist das
HTTP- basierte
Protokoll viel einfacher und effizienter als die Verwendung
eines asynchronen Kanals wie Apache Kafka .
Es kann jedoch Szenarien geben, in denen eine
synchrone Anfrage-Antwort über Kafka sinnvoll ist. Wählen Sie vernünftigerweise das beste Werkzeug für den Job.
Ein voll funktionsfähiges Beispiel finden Sie unter
github.com/callistaenterprise/blog-synchronous-kafka .
Kommentare
Federico • vor 7 MonatenUnd wenn wir hybride Bedürfnisse haben, brauchen wir zum Beispiel in 50% der Fälle eine Anfrage-Antwort und in 50% ein Event-Management? Wie machen wir das? Die Konfiguration, die Spring Kafka benötigt, sieht ziemlich schrecklich aus.
Jehanzeb Qayyum • vor 6 MonatenSpring unterstützt jetzt standardmäßig Partitionen in einem gemeinsamen Thema für die Antwort.
Ab Version 2.2 versucht die Vorlage, das Thema für die Antwort oder Partition aus dem konfigurierten Antwortcontainer (Antwortcontainer) zu ermitteln.
https://docs.spring.io/spring-kafka/reference/html/#replying-templatenir rozenberg • vor 8 MonatenHallo,
Im letzten Absatz haben Sie geschrieben, dass es Szenarien geben kann, in denen eine synchrone Request-Response über Kafka im Vergleich zu HTTP sinnvoll ist.
Können Sie Beispiele für solche Szenarien nennen?
Vielen Dank,
Nir
Janne Keskitalo nir rozenberg • vor 8 MonatenWir werden ein großvolumiges Transaktionsverarbeitungssystem in mehrere Microservices aufteilen, und ich habe die Idee, Kafkas Request-Response-Messaging zu verwenden, um eine ähnliche Verarbeitungsaffinität zu erzielen. Grundsätzlich wird Kafka verwendet, um alle Aufrufe von einem Client an denselben Transaktionsprozessorprozess weiterzuleiten, der sie dann nacheinander einzeln ausführt. Diese Art der Verarbeitung gewährleistet Linearisierbarkeit (
https://stackoverflow.com/a/19515375/7430325 ), kausale Konnektivität und ermöglicht auch effizientes Caching. Im Wesentlichen würden die Koordinierungsbemühungen von der Datenbank auf Kafka übertragen, und wir könnten die Datenbank im seriellen strengen Isolationsmodus starten.
Ich muss mich noch mit den Details unserer Transaktionssemantik befassen, um zu sehen, wo es nicht funktioniert. Dies ist also nur eine Idee.
Übersetzt von
@middle_java