Permintaan-Respons Sinkron menggunakan Apache Kafka

Arsitektur Event Driven pada umumnya, dan Apache Kafka pada khususnya, telah menarik banyak perhatian baru-baru ini. Untuk mengambil keuntungan penuh dari arsitektur yang digerakkan oleh peristiwa, mekanisme pendelegasian acara harus pada dasarnya tidak sinkron. Namun, mungkin ada beberapa skenario penggunaan khusus / aliran yang memerlukan semantik dari Permintaan-Respons Sinkron . Rilis ini menunjukkan bagaimana menerapkan Permintaan-Respons menggunakan Apache Kafka .

Diterjemahkan oleh @middle_java

Tanggal artikel asli: 26 Oktober 2018

Apache Kafka secara inheren tidak sinkron. Oleh karena itu, semantik Permintaan-Respons untuk Apache Kafka tidak wajar. Namun, tantangan ini bukanlah hal baru. Permintaan-Balas Pola Integrasi Perusahaan menyediakan mekanisme yang terbukti untuk pesan sinkron melalui saluran asinkron:



Pola Alamat Pengembalian melengkapi pola Permintaan-Balas dengan mekanisme bagi pemohon untuk menunjukkan alamat yang harus dikirimi respons:



Baru-baru ini, Spring Kafka 2.1.3 menambahkan dukungan dari kotak pola “Request Reply”, dan dalam versi 2.2 beberapa kekasarannya telah dipoles. Mari kita lihat bagaimana dukungan ini bekerja:

Sisi Klien: ReplyingKafkaTemplate


Abstraksi Templat yang terkenal ini membentuk dasar bagi bagian klien dari mekanisme Permintaan-Jawab di Musim Semi.

@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); } 

Semuanya sangat mudah di sini: kami menyiapkan ReplyingKafkaTemplate , yang mengirim pesan permintaan dengan kunci String dan menerima pesan respons dengan kunci String. Namun, ReplyingKafkaTemplate harus didasarkan pada Permintaan ProducerFactory, Respons ConsumerFactory, dan MessageListenerContainer dengan konfigurasi konsumen dan produsen yang sesuai. Oleh karena itu, konfigurasi yang diperlukan cukup berat:

  @Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); } 

Dalam hal ini, menggunakan replyKafkaTemplate untuk mengirim permintaan sinkron dan menerima respons adalah sebagai berikut:

 @Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...); // producer record ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request); //       record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); //     Kafka          RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () { @Override public void onSuccess(ConsumerRecord < String, Reply > result) { //   consumer record Reply reply = result.value(); System.out.println("Reply: " + reply.toString()); } }); 

Ada juga banyak boilerplate dan API tingkat rendah, dan bahkan API ListenableFuture yang usang ini bukan CompletableFuture modern.

requestReplyKafkaTemplate menangani pembuatan dan pengaturan tajuk KafkaHeaders.CORRELATION_ID , tetapi kami harus menetapkan tajuk KafkaHeaders.REPLY_TOPIC secara eksplisit untuk permintaan tersebut. Harap perhatikan juga bahwa topik yang sama untuk jawaban itu terlalu tidak disengaja di atas di replyListenerContainer . Beberapa kotoran. Tidak seperti yang saya harapkan dari abstraksi Spring.

Sisi Server: @KirimKe


Di sisi server, KafkaListener yang biasa mendengarkan topik untuk permintaan juga dihiasi dengan penjelasan @ SendTo untuk memberikan pesan respons. Objek yang dikembalikan oleh metode pendengar secara otomatis dibungkus dengan pesan respons, CORRELATION_ID ditambahkan , dan responsnya diterbitkan dalam topik yang ditentukan dalam tajuk REPLY_TOPIC .

  @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); } 

Beberapa konfigurasi juga diperlukan di sini, tetapi konfigurasi pendengar lebih sederhana:

  @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; } 

Tetapi bagaimana dengan beberapa contoh dari konsumen?


Segalanya tampak bekerja sampai kita menggunakan beberapa contoh dari konsumen. Jika kami memiliki banyak instance klien, kami perlu memastikan bahwa respons dikirim ke instance klien yang benar. Dokumentasi Spring Kafka mengasumsikan bahwa setiap konsumen dapat menggunakan topik unik atau bahwa nilai header KafkaHeaders tambahan dikirim dengan permintaan tersebut. RESPONSE_PARTITION adalah bidang empat byte yang berisi representasi BIG-ENDIAN dari nomor bagian integer. Menggunakan topik terpisah untuk klien yang berbeda jelas tidak terlalu fleksibel, jadi kami memilih pengaturan REPLY_PARTITION eksplisit. Maka klien harus tahu ke partisi mana ia ditugaskan. Dokumentasi menyarankan menggunakan konfigurasi eksplisit untuk memilih partisi tertentu. Mari tambahkan ke contoh kita:

  @Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ... 

Tidak terlalu cantik, tetapi berhasil. Konfigurasi yang diperlukan luas dan API terlihat tingkat rendah. Kebutuhan untuk secara eksplisit mengkonfigurasi partisi mempersulit proses penskalaan jumlah klien secara dinamis. Jelas, Anda bisa melakukan yang lebih baik.

Enkapsulasi pemrosesan topik untuk respons dan partisi


Mari kita mulai dengan merangkum pola Alamat Pengembalian , meneruskan topik untuk respons dan partisi. Topik untuk respons harus disuntikkan dalam RequestReplyTemplate dan, oleh karena itu, tidak boleh ada di API sama sekali. Ketika datang ke partisi untuk jawaban, kami akan melakukan yang sebaliknya: ekstrak partisi yang ditugaskan untuk pendengar topik untuk jawabannya, dan transfer partisi ini secara otomatis. Ini menghilangkan kebutuhan klien untuk mengurus tajuk ini.
Pada saat yang sama, mari kita juga membuat API terlihat seperti standar KafkaTemplate (membebani metode sendAndReceive () dengan parameter yang disederhanakan dan menambahkan metode kelebihan beban yang sesuai menggunakan topik default):

 public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } } 

Langkah selanjutnya: Menyesuaikan Future yang Dapat Didengarkan ke CompletableFuture yang lebih modern.

 public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; //       requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () { @Override public void onSuccess(SendResult < K, V > sendResult) { // NOOP } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); //     requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () { @Override public void onSuccess(ConsumerRecord < K, R > result) { completableResult.complete(result.value()); } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); return completableResult; } } 

Kami akan mengemas ini ke perpustakaan utilitas dan sekarang kami memiliki API yang jauh lebih konsisten dengan filosofi desain utama Spring, "Convention over Configuration" . Ini adalah kode klien terakhir:

  @Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); ); 

Untuk meringkas


Untuk meringkas, Spring untuk Kafka 2.2 menyediakan implementasi berfungsi penuh dari pola Permintaan-Balas di Apache Kafka, tetapi API masih memiliki beberapa tepi yang kasar. Dalam masalah ini, kami melihat bahwa beberapa abstraksi tambahan dan adaptasi API dapat memberikan API tingkat tinggi yang lebih logis.

Peringatan 1:
Salah satu keuntungan utama dari arsitektur yang digerakkan oleh peristiwa adalah decoupling antara produsen dan konsumen acara, yang memungkinkan untuk menciptakan sistem yang jauh lebih fleksibel dan berkembang. Menggunakan semantik sinkron "Permintaan-Respons" adalah kebalikannya ketika pihak yang meminta dan merespons sangat terkait. Karena itu, harus digunakan hanya jika perlu.

Peringatan 2:
Jika Request-Response sinkron diperlukan, maka protokol berbasis HTTP jauh lebih sederhana dan lebih efisien daripada menggunakan saluran asinkron seperti Apache Kafka .
Namun, mungkin ada skenario di mana Permintaan-Respons sinkron melalui Kafka masuk akal. Cukup pilih alat terbaik untuk pekerjaan itu.

Contoh yang berfungsi penuh dapat ditemukan di github.com/callistaenterprise/blog-synchronous-kafka .

Komentar


Federico • 7 bulan lalu
Dan ketika kita memiliki kebutuhan hibrid, misalnya, dalam 50% kasus kita membutuhkan Permintaan-Respons dan dalam 50% kita membutuhkan manajemen acara? Bagaimana kita melakukan ini? Konfigurasi yang dibutuhkan oleh Spring Kafka terlihat sangat buruk.

Jehanzeb Qayyum • 6 bulan lalu
Spring sekarang memiliki dukungan default menggunakan partisi dalam satu topik umum untuk respons.

Dimulai dengan versi 2.2, templat mencoba menentukan topik respons atau partisi dari wadah respons yang dikonfigurasi (wadah balasan).

https://docs.spring.io/spring-kafka/reference/html/#replying-template

nir rozenberg • 8 bulan lalu
Hai
Dalam paragraf terakhir, Anda menulis bahwa mungkin ada skenario ketika Permintaan-Respons sinkron melalui Kafka masuk akal dibandingkan dengan HTTP.
Bisakah Anda memberikan contoh skenario seperti itu?
Terima kasih
Nir

Janne Keskitalo nir rozenberg • 8 bulan lalu
Kami akan membagi sistem pemrosesan transaksi volume besar menjadi beberapa layanan microser, dan saya punya ide untuk menggunakan pesan Permintaan-Respons Kafka untuk mencapai afinitas pemrosesan yang serupa. Pada dasarnya, Kafka digunakan untuk merutekan semua panggilan dari satu klien ke proses prosesor transaksi yang sama, yang kemudian secara berurutan mengeksekusi mereka satu per satu. Jenis pemrosesan ini memastikan kemampuan linieritas ( https://stackoverflow.com/a/19515375/7430325 ), konektivitas kausal, dan juga memungkinkan caching yang efisien. Pada dasarnya, upaya koordinasi akan ditransfer dari database ke Kafka, dan kami dapat memulai database dalam mode isolasi ketat Serializable.
Saya belum menyelidiki rincian semantik transaksi kami untuk melihat di mana itu gagal, jadi ini hanya sebuah ide.

Diterjemahkan oleh @middle_java

Source: https://habr.com/ru/post/id476156/


All Articles