جذبت
حدثا الهندسة المعمارية بشكل عام ،
وأباتشي كافكا على وجه الخصوص ، الكثير من الاهتمام مؤخرا. للاستفادة الكاملة من بنية الحدث ، يجب أن تكون آلية تفويض الحدث غير متزامنة بشكل أساسي. ومع ذلك ، قد يكون هناك بعض سيناريوهات / تدفقات استخدام محددة تتطلب دلالات "
طلب استجابة متزامن" . يوضح هذا الإصدار كيفية تنفيذ
طلب - استجابة باستخدام
Apache Kafka .
ترجم بواسطة
middle_javaتاريخ المقالة الأصلية: 26 أكتوبر 2018
اباتشي كافكا بطبيعته غير متزامن. لذلك ،
فإن دلالات
طلب الاستجابة لـ Apache Kafka ليست طبيعية. ومع ذلك ، فإن هذا التحدي ليس جديدا. يوفر
طلب - نمط
طلب تكامل المؤسسة آلية مثبتة للرسائل المتزامنة عبر قنوات غير متزامنة:

يكمل نمط عنوان المرسل نموذج
طلب الرد بآلية للمقدم للإشارة إلى العنوان الذي يجب إرسال الرد إليه:

في الآونة الأخيرة ، أضاف
Spring Kafka 2.1.3 دعمًا من مربع نموذج "Request Reply" ، وفي الإصدار
2.2 تم تلميع بعض خشونه. دعونا نرى كيف يعمل هذا الدعم:
جانب العميل: AnswerKafkaTemplate
يشكل التجريد المعروف للقالب الأساس لجزء العميل من آلية طلب الرد في الربيع.
@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
كل شيء بسيط إلى الأمام هنا: لقد قمنا بإعداد
ReplyKafkaTemplate ، والذي يرسل رسائل الطلب مع مفاتيح السلسلة ويتلقى رسائل الاستجابة مع مفاتيح السلسلة. ومع ذلك ، يجب أن تستند ReplyKafkaTemplate على طلب ProducerFactory ، واستجابة ConsumerFactory ، و MessageListenerContainer مع تكوينات المستهلك والمنتج المناسبة. لذلك ، التكوين المطلوب ذو وزن كبير:
@Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
في هذه الحالة ، يكون استخدام
replyKafkaTemplate لإرسال طلب متزامن وتلقي استجابة كما يلي:
@Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...);
هناك أيضًا الكثير من
القواعد النمطيّة وواجهة برمجة التطبيقات منخفضة المستوى ، وحتى واجهة برمجة تطبيقات
ListenableFuture القديمة التي عفا عليها الزمن بدلًا من برنامج
CompleteableFuture الحديث.
requestReplyKafkaTemplate يهتم بإنشاء
وتهيئة رأس
KafkaHeaders.CORRELATION_ID ، لكن يجب علينا تعيين رأس
KafkaHeaders بشكل صريح.
REPLY_TOPIC رأس الطلب. يرجى أيضًا ملاحظة أن نفس موضوع الإجابة كان غير مقصود أعلاه في
replyListenerContainer . بعض الوحل. ليس تماما ما كنت أتوقع من الربيع التجريد.
جانب الخادم: SENDTo
على جانب الخادم ، فإن
تطبيق KafkaListener المعتاد الذي يستمع إلى موضوع الطلب مصمم بشكل إضافي مع التعليق التوضيحي
SendTo لتقديم رسالة استجابة. يتم تلقائيًا التفاف الكائن الذي يتم إرجاعه بواسطة طريقة المستمع في رسالة الاستجابة ،
وتتم إضافة
CORRELATION_ID ، ويتم نشر الاستجابة في الموضوع المحدد في رأس
REPLY_TOPIC .
@Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
بعض التكوين مطلوب أيضًا هنا ، لكن تكوين المستمع أبسط:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
ولكن ماذا عن حالات متعددة للمستهلك؟
يبدو أن كل شيء يعمل حتى نستخدم عدة حالات للمستهلك. إذا كان لدينا العديد من مثيلات العملاء ، فنحن بحاجة إلى التأكد من إرسال الاستجابة إلى مثيل العميل الصحيح. تفترض وثائق Spring Kafka أن كل مستهلك يمكنه استخدام موضوع فريد أو أنه يتم إرسال قيمة رأس
KafkaHeaders إضافية مع الطلب.
RESPONSE_PARTITION هو حقل من أربعة بايت يحتوي على تمثيل BIG-ENDIAN لرقم قسم عدد صحيح. من الواضح أن استخدام موضوعات منفصلة لعملاء مختلفين ليس مرنًا للغاية ، لذلك نختار إعداد
REPLY_PARTITION الصريح. ثم يجب على العميل معرفة القسم الذي تم تعيينه له. تقترح الوثائق استخدام تكوين صريح لتحديد قسم معين. دعونا إضافته إلى مثالنا:
@Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
ليست جميلة جدا ، لكنها تعمل. التكوين المطلوب واسع النطاق وواجهة برمجة التطبيقات تبدو منخفضة المستوى. تؤدي الحاجة إلى تكوين أقسام بشكل صريح إلى تعقيد عملية زيادة عدد العملاء بشكل ديناميكي. من الواضح ، يمكنك أن تفعل أفضل.
تغليف معالجة الموضوع للاستجابة والقسم
لنبدأ بتغليف نمط
عنوان المرسل ، مع تمرير الموضوع للاستجابة والقسم. يجب حقن موضوع الاستجابة في
RequestReplyTemplate ، وبالتالي ، يجب ألا يكون موجودًا في واجهة برمجة التطبيقات على الإطلاق. عندما يتعلق الأمر بالأقسام للحصول على إجابة ، سنفعل العكس: سنقوم باستخراج القسم (الأقسام) المعين لمستمع الموضوع للإجابة ، ونقل هذا القسم تلقائيًا. هذا يلغي الحاجة للعميل لرعاية هذه الرؤوس.
في الوقت نفسه ، لنقم أيضًا
بإنشاء واجهة برمجة التطبيقات (API) بحيث تشبه طريقة
KafkaTemplate القياسية (التحميل الزائد
لأسلوب sendAndReceive () مع معلمات مبسطة وإضافة الطرق المحمّلة الزائدة باستخدام الموضوع الافتراضي):
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
الخطوة التالية: تكييف
ListenableFuture مع
CompleteableFuture الأكثر حداثة.
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } };
نحن نضعها في مكتبة أدوات ، والآن لدينا واجهة برمجة تطبيقات تتوافق أكثر مع فلسفة Spring الأساسية للتصميم ،
"Convention over Configuration" . هنا هو رمز العميل النهائي:
@Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
لتلخيص
للتلخيص ، يوفر Spring for Kafka 2.2 تطبيقًا وظيفيًا بالكامل لنمط
طلب الرد في Apache Kafka ، لكن واجهة برمجة التطبيقات لا تزال بها بعض الحواف الخشنة. في هذه المشكلة ، رأينا أن بعض التجريدات والتعديلات الإضافية لواجهة برمجة التطبيقات يمكن أن توفر واجهة برمجة تطبيقات عالية المستوى أكثر منطقية.
تحذير 1:واحدة من المزايا الرئيسية للهيكل الذي يحركه الحدث هو الفصل بين منتجي الحدث والمستهلكين ، مما يجعل من الممكن إنشاء أنظمة أكثر مرونة وتطوراً. إن استخدام دلالات متزامنة "طلب - استجابة" هو العكس تمامًا عندما ترتبط الأطراف الطالبة والمستجيبة ارتباطًا وثيقًا. لذلك ، يجب استخدامه فقط إذا لزم الأمر.
تحذير 2:إذا كانت هناك حاجة إلى
طلب استجابة متزامن ، فإن
البروتوكول القائم على
HTTP يكون أبسط وأكثر فعالية من استخدام
قناة غير متزامنة مثل Apache Kafka .
ومع ذلك ، قد تكون هناك سيناريوهات يكون فيها
طلب - استجابة متزامن عبر كافكا منطقيًا. اختيار معقول أفضل أداة لهذا المنصب.
يمكن العثور على مثال يعمل بشكل كامل في
github.com/callistaenterprise/blog-synchronous-kafka .
تعليقات
Federico • قبل 7 أشهروعندما تكون لدينا احتياجات مختلطة ، على سبيل المثال ، في 50٪ من الحالات نحتاج إلى طلب استجابة وفي 50٪ نحتاج إلى إدارة الأحداث؟ كيف نفعل هذا؟ التكوين الذي يحتاجه Spring Kafka يبدو فظيعًا جدًا.
جهانزيب قيوم • منذ 6 أشهريتمتع Spring الآن بدعم افتراضي باستخدام أقسام في موضوع واحد شائع للاستجابة.
بدءًا من الإصدار 2.2 ، يحاول القالب تحديد موضوع الاستجابة أو القسم من حاوية الاستجابة المكونة (حاوية الرد).
https://docs.spring.io/spring-kafka/reference/html/#replying-templateنير روزنبرغ • منذ 8 أشهرمرحبا،
في الفقرة الأخيرة ، كتبت أنه قد تكون هناك سيناريوهات عندما يكون طلب - استجابة متزامن عبر كافكا منطقيًا مقارنةً بـ HTTP.
هل يمكنك إعطاء أمثلة على مثل هذه السيناريوهات؟
شكرا لك
نير
Janne Keskitalo nir rozenberg • منذ 8 أشهرسنقوم بتقسيم نظام معالجة المعاملات كبير الحجم إلى عدة خدمات مصغرة ولدي فكرة لاستخدام مراسلة Kafka للطلب والاستجابة لتحقيق تقارب مماثل في المعالجة. بشكل أساسي ، يتم استخدام كافكا لتوجيه جميع المكالمات من عميل واحد إلى نفس عملية معالج المعاملات ، والتي تنفذ بعد ذلك بالتسلسل واحد في وقت واحد. يضمن هذا النوع من المعالجة إمكانية الخطية (
https://stackoverflow.com/a/19515375/7430325 ) ، والاتصال السببي ، كما يتيح التخزين المؤقت الفعال. بشكل أساسي ، سيتم نقل جهود التنسيق من قاعدة البيانات إلى كافكا ، ويمكننا بدء تشغيل قاعدة البيانات في وضع عزل تسلسلي صارم.
لا يزال يتعين علي الخوض في تفاصيل دلالات معاملاتنا لمعرفة أين تنخفض ، لذلك هذه مجرد فكرة.
ترجم بواسطة
middle_java