A Arquitetura Orientada a Eventos em geral, e o
Apache Kafka em particular, atraíram muita atenção recentemente. Para aproveitar ao máximo a arquitetura orientada a eventos, o mecanismo de delegação de eventos deve ser essencialmente assíncrono. No entanto, pode haver alguns cenários / fluxos de uso específicos que exigem a semântica de uma
Resposta de solicitação síncrona . Esta versão mostra como implementar a
solicitação-resposta usando o
Apache Kafka .
Traduzido por
@middle_javaData do artigo original: 26 de outubro de 2018
O Apache Kafka é inerentemente assíncrono. Portanto,
a semântica de
solicitação-resposta para o Apache Kafka não é natural. No entanto, esse desafio não é novo. O Enterprise Integration Pattern
Request-Reply fornece um mecanismo comprovado para mensagens síncronas através de canais assíncronos:

O padrão
Endereço de Retorno complementa o padrão
Solicitação de Resposta com um mecanismo para o solicitante indicar o endereço para o qual a resposta deve ser enviada:

Recentemente, o
Spring Kafka 2.1.3 adicionou suporte da caixa padrão de solicitação e, na versão
2.2, parte de sua rugosidade foi polida. Vamos ver como esse suporte funciona:
Lado do cliente: ReplyKafkaTemplate
A conhecida abstração do
modelo forma a base da parte do cliente do mecanismo de solicitação e resposta no Spring.
@Bean public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate( ProducerFactory < String, Request > pf, KafkaMessageListenerContainer < String, Reply > lc) { return new ReplyingKafkaTemplate < > (pf, lc); }
Tudo é bem direto aqui: configuramos o
ReplyKafkaTemplate , que envia mensagens de solicitação com chaves String e recebe mensagens de resposta com chaves String. No entanto, o ReplyKafkaTemplate deve ser baseado na solicitação ProducerFactory, na resposta ConsumerFactory e no MessageListenerContainer com as configurações apropriadas de consumidor e produtor. Portanto, a configuração necessária é bastante pesada:
@Value("${kafka.topic.car.reply}") private String replyTopic; @Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory < String, Request > requestProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public ConsumerFactory < String, Reply > replyConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Reply > ()); } @Bean public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties); }
Nesse caso, o uso de
replyKafkaTemplate para enviar uma solicitação síncrona e receber uma resposta é o seguinte:
@Value("${kafka.topic.car.request}") private String requestTopic; @Value("${kafka.topic.car.reply}") private String replyTopic; @Autowired private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... RequestReply request = RequestReply.request(...);
Também há muitos clichês e uma API de baixo nível, e até essa API
ListenableFuture obsoleta, em vez da
CompletableFuture moderna.
requestReplyKafkaTemplate cuida da geração e configuração do cabeçalho
KafkaHeaders.CORRELATION_ID , mas devemos definir explicitamente o cabeçalho
KafkaHeaders.REPLY_TOPIC para a solicitação. Observe também que o mesmo tópico para a resposta não foi intencional acima em
replyListenerContainer . Um pouco de sujeira. Não é exatamente o que eu esperava da abstração da Primavera.
Lado do Servidor: @SendTo
No lado do servidor, o
KafkaListener comum que
está ouvindo o tópico da solicitação também é decorado com a anotação
@SendTo para fornecer uma mensagem de resposta. O objeto retornado pelo método listener é quebrado automaticamente na mensagem de resposta,
CORRELATION_ID é incluído e a resposta é publicada no tópico especificado no cabeçalho
REPLY_TOPIC .
@Bean public Map < String, Object > consumerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; } @Bean public Map < String, Object > producerConfigs() { Map < String, Object > props = new HashMap < > (); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ConsumerFactory < String, Request > requestConsumerFactory() { return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(), new JsonSerializer < Request > ()); } @Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory < String, Request > factory = new ConcurrentKafkaListenerContainerFactory < > (); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(replyTemplate()); return factory; } @Bean public ProducerFactory < String, Reply > replyProducerFactory() { return new DefaultKafkaProducerFactory < > (producerConfigs()); } @Bean public KafkaTemplate < String, Reply > replyTemplate() { return new KafkaTemplate < > (replyProducerFactory()); }
Alguma configuração também é necessária aqui, mas a configuração do ouvinte é mais simples:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory") @SendTo() public Reply receive(Request request) { Reply reply = ...; return reply; }
Mas e as várias instâncias do consumidor?
Tudo parece funcionar até usarmos várias instâncias do consumidor. Se tivermos várias instâncias do cliente, precisamos garantir que a resposta seja enviada à instância correta do cliente. A documentação do Spring Kafka pressupõe que cada consumidor possa usar um tópico exclusivo ou que um valor de cabeçalho
KafkaHeaders adicional seja enviado com a solicitação
RESPONSE_PARTITION é um campo de quatro bytes que contém uma representação BIG-ENDIAN do número da seção inteira. O uso de tópicos separados para diferentes clientes claramente não é muito flexível, portanto, escolhemos a configuração
REPLY_PARTITION explícita. Em seguida, o cliente deve saber a qual partição está atribuída. A documentação sugere o uso de uma configuração explícita para selecionar uma partição específica. Vamos adicioná-lo ao nosso exemplo:
@Value("${kafka.topic.car.reply.partition}") private int replyPartition; ... @Bean public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() { ContainerProperties containerProperties = new ContainerProperties(replyTopic); TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition); return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset); } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } ... record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition))); RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record); ...
Não é muito bonito, mas funciona. A configuração necessária é extensa e a API parece de baixo nível. A necessidade de configurar explicitamente partições complica o processo de dimensionar dinamicamente o número de clientes. Obviamente, você pode fazer melhor.
Encapsulando o processamento de tópicos para resposta e partição
Vamos começar encapsulando o padrão
Endereço de Retorno , passando o tópico da resposta e da partição. O tópico para a resposta deve ser injetado no
RequestReplyTemplate e, portanto, não deve estar presente na API. Quando se trata de partições para uma resposta, faremos o oposto: extrair a (s) partição (ões) atribuída (s) ao ouvinte de tópico da resposta e transferir essa partição automaticamente. Isso elimina a necessidade de o cliente cuidar desses cabeçalhos.
Ao mesmo tempo, vamos fazer a API parecer o
KafkaTemplate padrão (sobrecarregar o método
sendAndReceive () com parâmetros simplificados e adicionar os métodos sobrecarregados correspondentes usando o tópico padrão):
public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > { public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } private TopicPartition getFirstAssignedReplyTopicPartition() { if (getAssignedReplyTopicPartitions() != null && getAssignedReplyTopicPartitions().iterator().hasNext()) { TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next(); if (this.logger.isDebugEnabled()) { this.logger.debug("Using partition " + replyPartition.partition()); } return replyPartition; } else { throw new KafkaException("Illegal state: No reply partition is assigned to this instance"); } } private static byte[] intToBytesBigEndian(final int data) { return new byte[] { (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff), (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff), }; } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(@Nullable V data) { return sendAndReceive(getDefaultTopic(), data); } public RequestReplyFuture < K, V, R > sendAndReceiveDefault(K key, @Nullable V data) { return sendAndReceive(getDefaultTopic(), key, data); } ... public RequestReplyFuture < K, V, R > sendAndReceive(String topic, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, data); return doSendAndReceive(record); } public RequestReplyFuture < K, V, R > sendAndReceive(String topic, K key, @Nullable V data) { ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data); return doSendAndReceive(record); } ... @Override public RequestReplyFuture < K, V, R > sendAndReceive(ProducerRecord < K, V > record) { return doSendAndReceive(record); } protected RequestReplyFuture < K, V, R > doSendAndReceive(ProducerRecord < K, V > record) { TopicPartition replyPartition = getFirstAssignedReplyTopicPartition(); record.headers() .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes())) .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition.partition()))); return super.sendAndReceive(record); } }
Próxima etapa: adaptando o
ListenableFuture ao mais moderno
CompletableFuture .
public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > { public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory, GenericMessageListenerContainer < K, R > replyContainer) { super(producerFactory, replyContainer); } public CompletableFuture < R > requestReplyDefault(V value) { return adapt(sendAndReceiveDefault(value)); } public CompletableFuture < R > requestReplyDefault(K key, V value) { return adapt(sendAndReceiveDefault(key, value)); } ... public CompletableFuture < R > requestReply(String topic, V value) { return adapt(sendAndReceive(topic, value)); } public CompletableFuture < R > requestReply(String topic, K key, V value) { return adapt(sendAndReceive(topic, key, value)); } ... private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) { CompletableFuture < R > completableResult = new CompletableFuture < R > () { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = requestReplyFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } };
Vamos empacotar isso em uma biblioteca de utilitários e agora temos uma API muito mais consistente com
a principal filosofia de design do Spring,
"Convenção sobre configuração" . Aqui está o código final do cliente:
@Autowired private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate; ... requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - > System.out.println("Reply: " + reply.toString()); );
Resumir
Para resumir, o Spring for Kafka 2.2 fornece uma implementação totalmente funcional do padrão
Request-Reply no Apache Kafka, mas a API ainda possui algumas arestas. Nesta edição, vimos que algumas abstrações e adaptações adicionais da API podem fornecer uma API de alto nível mais lógica.
Aviso 1:Uma das principais vantagens de uma arquitetura orientada a eventos é a dissociação de produtores e consumidores de eventos, o que possibilita a criação de sistemas muito mais flexíveis e em evolução. Usando semântica síncrona “Solicitação-resposta” é exatamente o oposto quando as partes solicitantes e respondentes estão fortemente relacionadas. Portanto, ele deve ser usado apenas se necessário.
Aviso 2:Se uma
solicitação-resposta síncrona for necessária, o
protocolo baseado em
HTTP será muito mais simples e mais eficiente do que usar
um canal assíncrono como o Apache Kafka .
No entanto, pode haver cenários em que uma
resposta de solicitação síncrona via Kafka faça sentido. Escolha razoavelmente a melhor ferramenta para o trabalho.
Um exemplo completo pode ser encontrado em
github.com/callistaenterprise/blog-synchronous-kafka .
Comentários
Federico • 7 meses atrásE quando temos necessidades híbridas, por exemplo, em 50% dos casos precisamos de uma Resposta de solicitação e em 50% precisamos de gerenciamento de eventos? Como fazemos isso? A configuração necessária para o Spring Kafka parece bastante terrível.
Jehanzeb Qayyum • 6 meses atrásO Spring agora tem suporte padrão usando partições em um tópico comum para a resposta.
A partir da versão 2.2, o modelo tenta determinar o tópico para a resposta ou partição do contêiner de resposta configurado (contêiner de resposta).
https://docs.spring.io/spring-kafka/reference/html/#replying-templatenir rozenberg • 8 meses atrásOi
No último parágrafo, você escreveu que pode haver cenários em que uma resposta de solicitação síncrona via Kafka faz sentido em comparação ao HTTP.
Você pode dar exemplos de tais cenários?
Obrigada
Nir
Janne Keskitalo nir rozenberg • 8 meses atrásVamos dividir um sistema de processamento de transações de grande volume em vários microsserviços, e tenho uma ideia de usar o sistema de mensagens Request-Response da Kafka para obter afinidade de processamento semelhante. Basicamente, o Kafka é usado para rotear todas as chamadas de um cliente para o mesmo processo do processador de transações, que as executa sequencialmente uma por vez. Esse tipo de processamento garante linearidade (
https://stackoverflow.com/a/19515375/7430325 ), conectividade causal e também permite armazenamento em cache eficiente. Essencialmente, os esforços de coordenação seriam transferidos do banco de dados para Kafka, e poderíamos iniciar o banco de dados no modo de isolamento estrito serializável.
Ainda tenho que me aprofundar nos detalhes de nossa semântica de transações para ver onde ela fica aquém, então isso é apenas uma ideia.
Traduzido por
@middle_java