Arsitektur Event Driven pada umumnya, dan
Apache Kafka pada khususnya, telah menarik banyak perhatian baru-baru ini. Untuk mengambil keuntungan penuh dari arsitektur yang digerakkan oleh peristiwa, mekanisme pendelegasian acara harus pada dasarnya tidak sinkron. Namun, mungkin ada beberapa skenario penggunaan khusus / aliran yang memerlukan semantik dari
Permintaan-Respons Sinkron . Rilis ini menunjukkan bagaimana menerapkan
Permintaan-Respons menggunakan
Apache Kafka .
Diterjemahkan oleh
@middle_javaTanggal artikel asli: 26 Oktober 2018
Apache Kafka secara inheren tidak sinkron. Oleh karena itu, semantik
Permintaan-Respons untuk Apache Kafka tidak wajar. Namun, tantangan ini bukanlah hal baru.
Permintaan-Balas Pola Integrasi Perusahaan menyediakan mekanisme yang terbukti untuk pesan sinkron melalui saluran asinkron:

Pola
Alamat Pengembalian melengkapi pola
Permintaan-Balas dengan mekanisme bagi pemohon untuk menunjukkan alamat yang harus dikirimi respons:

Baru-baru ini,
Spring Kafka 2.1.3 menambahkan dukungan dari kotak pola “Request Reply”, dan dalam versi
2.2 beberapa kekasarannya telah dipoles. Mari kita lihat bagaimana dukungan ini bekerja:
Sisi Klien: ReplyingKafkaTemplate
Abstraksi
Templat yang terkenal ini membentuk dasar bagi bagian klien dari mekanisme Permintaan-Jawab di Musim Semi.
@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
Semuanya sangat mudah di sini: kami menyiapkan
ReplyingKafkaTemplate , yang mengirim pesan permintaan dengan kunci String dan menerima pesan respons dengan kunci String. Namun, ReplyingKafkaTemplate harus didasarkan pada Permintaan ProducerFactory, Respons ConsumerFactory, dan MessageListenerContainer dengan konfigurasi konsumen dan produsen yang sesuai. Oleh karena itu, konfigurasi yang diperlukan cukup berat:
@Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
Dalam hal ini, menggunakan
replyKafkaTemplate untuk mengirim permintaan sinkron dan menerima respons adalah sebagai berikut:
@Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...);
Ada juga banyak boilerplate dan API tingkat rendah, dan bahkan API
ListenableFuture yang usang ini bukan
CompletableFuture modern.
requestReplyKafkaTemplate menangani pembuatan dan pengaturan tajuk
KafkaHeaders.CORRELATION_ID , tetapi kami harus menetapkan tajuk
KafkaHeaders.REPLY_TOPIC secara eksplisit untuk permintaan tersebut. Harap perhatikan juga bahwa topik yang sama untuk jawaban itu terlalu tidak disengaja di atas di
replyListenerContainer . Beberapa kotoran. Tidak seperti yang saya harapkan dari abstraksi Spring.
Sisi Server: @KirimKe
Di sisi server,
KafkaListener yang biasa mendengarkan topik untuk permintaan juga dihiasi dengan penjelasan
@ SendTo untuk memberikan pesan respons. Objek yang dikembalikan oleh metode pendengar secara otomatis dibungkus dengan pesan respons,
CORRELATION_ID ditambahkan
, dan responsnya diterbitkan dalam topik yang ditentukan dalam tajuk
REPLY_TOPIC .
@Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
Beberapa konfigurasi juga diperlukan di sini, tetapi konfigurasi pendengar lebih sederhana:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
Tetapi bagaimana dengan beberapa contoh dari konsumen?
Segalanya tampak bekerja sampai kita menggunakan beberapa contoh dari konsumen. Jika kami memiliki banyak instance klien, kami perlu memastikan bahwa respons dikirim ke instance klien yang benar. Dokumentasi Spring Kafka mengasumsikan bahwa setiap konsumen dapat menggunakan topik unik atau bahwa nilai header
KafkaHeaders tambahan dikirim dengan permintaan tersebut.
RESPONSE_PARTITION adalah bidang empat byte yang berisi representasi BIG-ENDIAN dari nomor bagian integer. Menggunakan topik terpisah untuk klien yang berbeda jelas tidak terlalu fleksibel, jadi kami memilih pengaturan
REPLY_PARTITION eksplisit. Maka klien harus tahu ke partisi mana ia ditugaskan. Dokumentasi menyarankan menggunakan konfigurasi eksplisit untuk memilih partisi tertentu. Mari tambahkan ke contoh kita:
@Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
Tidak terlalu cantik, tetapi berhasil. Konfigurasi yang diperlukan luas dan API terlihat tingkat rendah. Kebutuhan untuk secara eksplisit mengkonfigurasi partisi mempersulit proses penskalaan jumlah klien secara dinamis. Jelas, Anda bisa melakukan yang lebih baik.
Enkapsulasi pemrosesan topik untuk respons dan partisi
Mari kita mulai dengan merangkum pola
Alamat Pengembalian , meneruskan topik untuk respons dan partisi. Topik untuk respons harus disuntikkan dalam
RequestReplyTemplate dan, oleh karena itu, tidak boleh ada di API sama sekali. Ketika datang ke partisi untuk jawaban, kami akan melakukan yang sebaliknya: ekstrak partisi yang ditugaskan untuk pendengar topik untuk jawabannya, dan transfer partisi ini secara otomatis. Ini menghilangkan kebutuhan klien untuk mengurus tajuk ini.
Pada saat yang sama, mari kita juga membuat API terlihat seperti standar
KafkaTemplate (membebani metode
sendAndReceive () dengan parameter yang disederhanakan dan menambahkan metode kelebihan beban yang sesuai menggunakan topik default):
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
Langkah selanjutnya: Menyesuaikan
Future yang Dapat
Didengarkan ke
CompletableFuture yang lebih modern.
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } };
Kami akan mengemas ini ke perpustakaan utilitas dan sekarang kami memiliki API yang jauh lebih konsisten dengan filosofi desain utama Spring,
"Convention over Configuration" . Ini adalah kode klien terakhir:
@Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
Untuk meringkas
Untuk meringkas, Spring untuk Kafka 2.2 menyediakan implementasi berfungsi penuh dari pola
Permintaan-Balas di Apache Kafka, tetapi API masih memiliki beberapa tepi yang kasar. Dalam masalah ini, kami melihat bahwa beberapa abstraksi tambahan dan adaptasi API dapat memberikan API tingkat tinggi yang lebih logis.
Peringatan 1:Salah satu keuntungan utama dari arsitektur yang digerakkan oleh peristiwa adalah decoupling antara produsen dan konsumen acara, yang memungkinkan untuk menciptakan sistem yang jauh lebih fleksibel dan berkembang. Menggunakan semantik sinkron "Permintaan-Respons" adalah kebalikannya ketika pihak yang meminta dan merespons sangat terkait. Karena itu, harus digunakan hanya jika perlu.
Peringatan 2:Jika
Request-Response sinkron diperlukan, maka
protokol berbasis
HTTP jauh lebih sederhana dan lebih efisien daripada menggunakan
saluran asinkron seperti Apache Kafka .
Namun, mungkin ada skenario di mana
Permintaan-Respons sinkron melalui Kafka masuk akal. Cukup pilih alat terbaik untuk pekerjaan itu.
Contoh yang berfungsi penuh dapat ditemukan di
github.com/callistaenterprise/blog-synchronous-kafka .
Komentar
Federico • 7 bulan laluDan ketika kita memiliki kebutuhan hibrid, misalnya, dalam 50% kasus kita membutuhkan Permintaan-Respons dan dalam 50% kita membutuhkan manajemen acara? Bagaimana kita melakukan ini? Konfigurasi yang dibutuhkan oleh Spring Kafka terlihat sangat buruk.
Jehanzeb Qayyum • 6 bulan laluSpring sekarang memiliki dukungan default menggunakan partisi dalam satu topik umum untuk respons.
Dimulai dengan versi 2.2, templat mencoba menentukan topik respons atau partisi dari wadah respons yang dikonfigurasi (wadah balasan).
https://docs.spring.io/spring-kafka/reference/html/#replying-templatenir rozenberg • 8 bulan laluHai
Dalam paragraf terakhir, Anda menulis bahwa mungkin ada skenario ketika Permintaan-Respons sinkron melalui Kafka masuk akal dibandingkan dengan HTTP.
Bisakah Anda memberikan contoh skenario seperti itu?
Terima kasih
Nir
Janne Keskitalo nir rozenberg • 8 bulan laluKami akan membagi sistem pemrosesan transaksi volume besar menjadi beberapa layanan microser, dan saya punya ide untuk menggunakan pesan Permintaan-Respons Kafka untuk mencapai afinitas pemrosesan yang serupa. Pada dasarnya, Kafka digunakan untuk merutekan semua panggilan dari satu klien ke proses prosesor transaksi yang sama, yang kemudian secara berurutan mengeksekusi mereka satu per satu. Jenis pemrosesan ini memastikan kemampuan linieritas (
https://stackoverflow.com/a/19515375/7430325 ), konektivitas kausal, dan juga memungkinkan caching yang efisien. Pada dasarnya, upaya koordinasi akan ditransfer dari database ke Kafka, dan kami dapat memulai database dalam mode isolasi ketat Serializable.
Saya belum menyelidiki rincian semantik transaksi kami untuk melihat di mana itu gagal, jadi ini hanya sebuah ide.
Diterjemahkan oleh
@middle_java