Event Driven Architecture en general, y
Apache Kafka en particular, han atraído mucha atención recientemente. Para aprovechar al máximo la arquitectura controlada por eventos, el mecanismo de delegación de eventos debe ser esencialmente asíncrono. Sin embargo, puede haber algunos escenarios / flujos de uso específicos que requieren la semántica de una
solicitud-respuesta sincrónica . Esta versión muestra cómo implementar
Solicitud-Respuesta usando
Apache Kafka .
Traducido por
@middle_javaFecha del artículo original: 26 de octubre de 2018
Apache Kafka es inherentemente asíncrono. Por lo tanto,
la semántica de
Solicitud-Respuesta para Apache Kafka no es natural. Sin embargo, este desafío no es nuevo. El patrón de integración
-solicitud de respuesta empresarial proporciona un mecanismo probado para la mensajería sincrónica a través de canales asincrónicos:

El patrón de
dirección de retorno complementa el patrón de
solicitud-respuesta con un mecanismo para que el solicitante indique la dirección a la que se debe enviar la respuesta:

Recientemente,
Spring Kafka 2.1.3 agregó soporte desde el cuadro de patrón Solicitud Solicitud, y en la versión
2.2 se pulió parte de su aspereza. Veamos cómo funciona este soporte:
Lado del cliente: ReplyingKafkaTemplate
La conocida abstracción de la
Plantilla forma la base para la parte del cliente del mecanismo de Solicitud-Respuesta en Spring.
@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
Aquí todo es bastante sencillo: configuramos
ReplyingKafkaTemplate , que envía mensajes de solicitud con claves de cadena y recibe mensajes de respuesta con claves de cadena. Sin embargo, ReplyingKafkaTemplate debe basarse en la solicitud ProducerFactory, la respuesta ConsumerFactory y el MessageListenerContainer con las configuraciones apropiadas de consumidor y productor. Por lo tanto, la configuración requerida es bastante pesada:
@Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
En este caso, el uso de
replyKafkaTemplate para enviar una solicitud síncrona y recibir una respuesta es la siguiente:
@Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...);
También hay una gran cantidad de repeticiones y una API de bajo nivel, e incluso esta obsoleta API
ListenableFuture en lugar de la
CompletableFuture moderna.
requestReplyKafkaTemplate se encarga de generar y configurar el encabezado
KafkaHeaders.CORRELATION_ID , pero debemos establecer explícitamente el encabezado
KafkaHeaders.REPLY_TOPIC para la solicitud. Tenga en cuenta también que el mismo tema para la respuesta fue demasiado involuntario anteriormente en
replyListenerContainer . Un poco de estiércol. No es exactamente lo que esperaba de la abstracción de primavera.
Lado del servidor: @SendTo
En el lado del servidor, el
KafkaListener habitual que escucha el tema de la solicitud se decora adicionalmente con la anotación
@SendTo para proporcionar un mensaje de respuesta. El objeto devuelto por el método de escucha se ajusta automáticamente en el mensaje de respuesta,
se agrega
CORRELATION_ID y la respuesta se publica en el tema especificado en el encabezado
REPLY_TOPIC .
@Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
Aquí también se requiere alguna configuración, pero la configuración del oyente es más simple:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
¿Pero qué pasa con múltiples instancias del consumidor?
Todo parece funcionar hasta que usamos varias instancias del consumidor. Si tenemos varias instancias de cliente, debemos asegurarnos de que la respuesta se envíe a la instancia de cliente correcta. La documentación de Spring Kafka supone que cada consumidor puede usar un tema único o que se envía un valor de encabezado
KafkaHeaders adicional con la solicitud.
RESPONSE_PARTITION es un campo de cuatro bytes que contiene una representación BIG-ENDIAN del número de sección entero. El uso de temas separados para diferentes clientes claramente no es muy flexible, por lo que elegimos la configuración explícita
REPLY_PARTITION . Entonces el cliente debe saber a qué partición está asignado. La documentación sugiere usar una configuración explícita para seleccionar una partición particular. Añádalo a nuestro ejemplo:
@Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
No es muy bonita, pero funciona. La configuración requerida es extensa y la API parece de bajo nivel. La necesidad de configurar particiones explícitamente complica el proceso de escalar dinámicamente el número de clientes. Obviamente, puedes hacerlo mejor.
Encapsulando el procesamiento de temas para respuesta y partición
Comencemos encapsulando el patrón de
Dirección de retorno , pasando el tema de la respuesta y la partición. El tema de la respuesta debe inyectarse en
RequestReplyTemplate y, por lo tanto, no debe estar presente en la API. Cuando se trata de particiones para una respuesta, haremos lo contrario: extraer las particiones asignadas al oyente del tema para la respuesta y transferir esta partición automáticamente. Esto elimina la necesidad de que el cliente se encargue de estos encabezados.
Al mismo tiempo, hagamos que la API se parezca al
KafkaTemplate estándar (sobrecargue el método
sendAndReceive () con parámetros simplificados y agregue los métodos sobrecargados correspondientes utilizando el tema predeterminado)
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
Siguiente paso: Adaptar el
ListenableFuture al
CompletableFuture más moderno.
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } };
Empaquetaremos esto en una biblioteca de utilidades y ahora tenemos una API que es mucho más consistente con
la filosofía de diseño principal de Spring,
"Convención sobre configuración" . Aquí está el código final del cliente:
@Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
Para resumir
Para resumir, Spring for Kafka 2.2 proporciona una implementación completamente funcional del patrón de
Solicitud-Respuesta en Apache Kafka, pero la API todavía tiene algunas asperezas. En este número, vimos que algunas abstracciones y adaptaciones adicionales de la API pueden proporcionar una API de alto nivel más lógica.
Advertencia 1:Una de las principales ventajas de una arquitectura basada en eventos es el desacoplamiento de los productores y consumidores de eventos, lo que hace posible crear sistemas mucho más flexibles y en evolución. El uso de la semántica síncrona "Solicitud-Respuesta" es exactamente lo contrario cuando las partes solicitantes y las que responden están fuertemente relacionadas. Por lo tanto, debe usarse solo si es necesario.
Advertencia 2:Si se requiere una
solicitud-respuesta sincrónica , entonces el
protocolo basado en
HTTP es mucho más simple y más eficiente que usar
un canal asíncrono como Apache Kafka .
Sin embargo, puede haber escenarios en los que tenga sentido una
solicitud-respuesta sincrónica a través de Kafka . Razonablemente elija la mejor herramienta para el trabajo.
Puede encontrar un ejemplo completamente funcional en
github.com/callistaenterprise/blog-synchronous-kafka .
Comentarios
Federico • Hace 7 meses¿Y cuando tenemos necesidades híbridas, por ejemplo, en el 50% de los casos necesitamos una solicitud de respuesta y en el 50% necesitamos la gestión de eventos? ¿Cómo hacemos esto? La configuración que necesita Spring Kafka se ve bastante horrible.
Jehanzeb Qayyum • Hace 6 mesesSpring ahora tiene soporte predeterminado usando particiones en un tema común para la respuesta.
A partir de la versión 2.2, la plantilla intenta determinar el tema para la respuesta o partición desde el contenedor de respuesta configurado (contenedor de respuesta).
https://docs.spring.io/spring-kafka/reference/html/#replying-templatenir rozenberg • Hace 8 mesesHola
En el último párrafo, escribió que puede haber escenarios en los que una solicitud-respuesta sincrónica a través de Kafka tiene sentido en comparación con HTTP.
¿Puedes dar ejemplos de tales escenarios?
Gracias
Nir
Janne Keskitalo nir rozenberg • Hace 8 mesesVamos a dividir un sistema de procesamiento de transacciones de gran volumen en varios microservicios, y tengo la idea de usar el mensaje de Solicitud-Respuesta de Kafka para lograr una afinidad de procesamiento similar. Básicamente, Kafka se utiliza para enrutar todas las llamadas de un cliente al mismo proceso del procesador de transacciones, que luego las ejecuta secuencialmente de una en una. Este tipo de procesamiento garantiza la linealización (
https://stackoverflow.com/a/19515375/7430325 ), conectividad causal y también permite un almacenamiento en caché eficiente. Esencialmente, los esfuerzos de coordinación se transferirían de la base de datos a Kafka, y podríamos iniciar la base de datos en modo de aislamiento estricto serializable.
Todavía tengo que profundizar en los detalles de nuestra semántica de transacciones para ver dónde se queda corto, así que esto es solo una idea.
Traducido por
@middle_java