Solicitud-respuesta sincrónica usando Apache Kafka

Event Driven Architecture en general, y Apache Kafka en particular, han atraído mucha atención recientemente. Para aprovechar al máximo la arquitectura controlada por eventos, el mecanismo de delegación de eventos debe ser esencialmente asíncrono. Sin embargo, puede haber algunos escenarios / flujos de uso específicos que requieren la semántica de una solicitud-respuesta sincrónica . Esta versión muestra cómo implementar Solicitud-Respuesta usando Apache Kafka .

Traducido por @middle_java

Fecha del artículo original: 26 de octubre de 2018

Apache Kafka es inherentemente asíncrono. Por lo tanto, la semántica de Solicitud-Respuesta para Apache Kafka no es natural. Sin embargo, este desafío no es nuevo. El patrón de integración -solicitud de respuesta empresarial proporciona un mecanismo probado para la mensajería sincrónica a través de canales asincrónicos:



El patrón de dirección de retorno complementa el patrón de solicitud-respuesta con un mecanismo para que el solicitante indique la dirección a la que se debe enviar la respuesta:



Recientemente, Spring Kafka 2.1.3 agregó soporte desde el cuadro de patrón Solicitud Solicitud, y en la versión 2.2 se pulió parte de su aspereza. Veamos cómo funciona este soporte:

Lado del cliente: ReplyingKafkaTemplate


La conocida abstracción de la Plantilla forma la base para la parte del cliente del mecanismo de Solicitud-Respuesta en Spring.

@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); } 

Aquí todo es bastante sencillo: configuramos ReplyingKafkaTemplate , que envía mensajes de solicitud con claves de cadena y recibe mensajes de respuesta con claves de cadena. Sin embargo, ReplyingKafkaTemplate debe basarse en la solicitud ProducerFactory, la respuesta ConsumerFactory y el MessageListenerContainer con las configuraciones apropiadas de consumidor y productor. Por lo tanto, la configuración requerida es bastante pesada:

  @Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); } 

En este caso, el uso de replyKafkaTemplate para enviar una solicitud síncrona y recibir una respuesta es la siguiente:

 @Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...); // producer record ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request); //       record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); //     Kafka          RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () { @Override public void onSuccess(ConsumerRecord < String, Reply > result) { //   consumer record Reply reply = result.value(); System.out.println("Reply: " + reply.toString()); } }); 

También hay una gran cantidad de repeticiones y una API de bajo nivel, e incluso esta obsoleta API ListenableFuture en lugar de la CompletableFuture moderna.

requestReplyKafkaTemplate se encarga de generar y configurar el encabezado KafkaHeaders.CORRELATION_ID , pero debemos establecer explícitamente el encabezado KafkaHeaders.REPLY_TOPIC para la solicitud. Tenga en cuenta también que el mismo tema para la respuesta fue demasiado involuntario anteriormente en replyListenerContainer . Un poco de estiércol. No es exactamente lo que esperaba de la abstracción de primavera.

Lado del servidor: @SendTo


En el lado del servidor, el KafkaListener habitual que escucha el tema de la solicitud se decora adicionalmente con la anotación @SendTo para proporcionar un mensaje de respuesta. El objeto devuelto por el método de escucha se ajusta automáticamente en el mensaje de respuesta, se agrega CORRELATION_ID y la respuesta se publica en el tema especificado en el encabezado REPLY_TOPIC .

  @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); } 

Aquí también se requiere alguna configuración, pero la configuración del oyente es más simple:

  @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; } 

¿Pero qué pasa con múltiples instancias del consumidor?


Todo parece funcionar hasta que usamos varias instancias del consumidor. Si tenemos varias instancias de cliente, debemos asegurarnos de que la respuesta se envíe a la instancia de cliente correcta. La documentación de Spring Kafka supone que cada consumidor puede usar un tema único o que se envía un valor de encabezado KafkaHeaders adicional con la solicitud. RESPONSE_PARTITION es un campo de cuatro bytes que contiene una representación BIG-ENDIAN del número de sección entero. El uso de temas separados para diferentes clientes claramente no es muy flexible, por lo que elegimos la configuración explícita REPLY_PARTITION . Entonces el cliente debe saber a qué partición está asignado. La documentación sugiere usar una configuración explícita para seleccionar una partición particular. Añádalo a nuestro ejemplo:

  @Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ... 

No es muy bonita, pero funciona. La configuración requerida es extensa y la API parece de bajo nivel. La necesidad de configurar particiones explícitamente complica el proceso de escalar dinámicamente el número de clientes. Obviamente, puedes hacerlo mejor.

Encapsulando el procesamiento de temas para respuesta y partición


Comencemos encapsulando el patrón de Dirección de retorno , pasando el tema de la respuesta y la partición. El tema de la respuesta debe inyectarse en RequestReplyTemplate y, por lo tanto, no debe estar presente en la API. Cuando se trata de particiones para una respuesta, haremos lo contrario: extraer las particiones asignadas al oyente del tema para la respuesta y transferir esta partición automáticamente. Esto elimina la necesidad de que el cliente se encargue de estos encabezados.
Al mismo tiempo, hagamos que la API se parezca al KafkaTemplate estándar (sobrecargue el método sendAndReceive () con parámetros simplificados y agregue los métodos sobrecargados correspondientes utilizando el tema predeterminado)

 public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } } 

Siguiente paso: Adaptar el ListenableFuture al CompletableFuture más moderno.

 public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; //       requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () { @Override public void onSuccess(SendResult < K, V > sendResult) { // NOOP } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); //     requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () { @Override public void onSuccess(ConsumerRecord < K, R > result) { completableResult.complete(result.value()); } @Override public void onFailure(Throwable t) { completableResult.completeExceptionally(t); } }); return completableResult; } } 

Empaquetaremos esto en una biblioteca de utilidades y ahora tenemos una API que es mucho más consistente con la filosofía de diseño principal de Spring, "Convención sobre configuración" . Aquí está el código final del cliente:

  @Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); ); 

Para resumir


Para resumir, Spring for Kafka 2.2 proporciona una implementación completamente funcional del patrón de Solicitud-Respuesta en Apache Kafka, pero la API todavía tiene algunas asperezas. En este número, vimos que algunas abstracciones y adaptaciones adicionales de la API pueden proporcionar una API de alto nivel más lógica.

Advertencia 1:
Una de las principales ventajas de una arquitectura basada en eventos es el desacoplamiento de los productores y consumidores de eventos, lo que hace posible crear sistemas mucho más flexibles y en evolución. El uso de la semántica síncrona "Solicitud-Respuesta" es exactamente lo contrario cuando las partes solicitantes y las que responden están fuertemente relacionadas. Por lo tanto, debe usarse solo si es necesario.

Advertencia 2:
Si se requiere una solicitud-respuesta sincrónica , entonces el protocolo basado en HTTP es mucho más simple y más eficiente que usar un canal asíncrono como Apache Kafka .
Sin embargo, puede haber escenarios en los que tenga sentido una solicitud-respuesta sincrónica a través de Kafka . Razonablemente elija la mejor herramienta para el trabajo.

Puede encontrar un ejemplo completamente funcional en github.com/callistaenterprise/blog-synchronous-kafka .

Comentarios


Federico • Hace 7 meses
¿Y cuando tenemos necesidades híbridas, por ejemplo, en el 50% de los casos necesitamos una solicitud de respuesta y en el 50% necesitamos la gestión de eventos? ¿Cómo hacemos esto? La configuración que necesita Spring Kafka se ve bastante horrible.

Jehanzeb Qayyum • Hace 6 meses
Spring ahora tiene soporte predeterminado usando particiones en un tema común para la respuesta.

A partir de la versión 2.2, la plantilla intenta determinar el tema para la respuesta o partición desde el contenedor de respuesta configurado (contenedor de respuesta).

https://docs.spring.io/spring-kafka/reference/html/#replying-template

nir rozenberg • Hace 8 meses
Hola
En el último párrafo, escribió que puede haber escenarios en los que una solicitud-respuesta sincrónica a través de Kafka tiene sentido en comparación con HTTP.
¿Puedes dar ejemplos de tales escenarios?
Gracias
Nir

Janne Keskitalo nir rozenberg • Hace 8 meses
Vamos a dividir un sistema de procesamiento de transacciones de gran volumen en varios microservicios, y tengo la idea de usar el mensaje de Solicitud-Respuesta de Kafka para lograr una afinidad de procesamiento similar. Básicamente, Kafka se utiliza para enrutar todas las llamadas de un cliente al mismo proceso del procesador de transacciones, que luego las ejecuta secuencialmente de una en una. Este tipo de procesamiento garantiza la linealización ( https://stackoverflow.com/a/19515375/7430325 ), conectividad causal y también permite un almacenamiento en caché eficiente. Esencialmente, los esfuerzos de coordinación se transferirían de la base de datos a Kafka, y podríamos iniciar la base de datos en modo de aislamiento estricto serializable.
Todavía tengo que profundizar en los detalles de nuestra semántica de transacciones para ver dónde se queda corto, así que esto es solo una idea.

Traducido por @middle_java

Source: https://habr.com/ru/post/476156/


All Articles