
Olá Habr!
Trabalho na equipe de Tinkoff, que está desenvolvendo seu próprio centro de notificações. Na maioria das vezes, desenvolvo em Java usando o Spring boot e resolvo vários problemas técnicos que surgem no projeto.
A maioria dos nossos microsserviços interage de forma assíncrona por meio de um intermediário de mensagens. Anteriormente, usamos o IBM MQ como um broker, que deixou de lidar com a carga, mas, ao mesmo tempo, possuía altas garantias de entrega.
Como substituição, nos foi oferecido o Apache Kafka, que tem alta escalabilidade, mas, infelizmente, requer uma abordagem de configuração quase individual para diferentes cenários. Além disso, o mecanismo de entrega, pelo menos uma vez, que funciona em Kafka por padrão, não permitiu manter pronto o nível de consistência necessário. Em seguida, compartilharei nossa experiência na configuração do Kafka, em particular, explicarei como configurar e viver exatamente com a entrega única.
Entrega garantida e muito mais
Os parâmetros que serão discutidos posteriormente ajudarão a evitar vários problemas com as configurações de conexão padrão. Mas primeiro, quero prestar atenção a um parâmetro que facilitará uma possível depuração.
Client.id para produtor e consumidor ajudará com isso. À primeira vista, você pode usar o nome do aplicativo como um valor e, na maioria dos casos, isso funcionará. Embora a situação em que vários Consumidores sejam usados no aplicativo e você lhes dê o mesmo client.id leve ao seguinte aviso:
org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0
Se você deseja usar o JMX em um aplicativo com Kafka, isso pode ser um problema. Nesse caso, é melhor usar uma combinação do nome do aplicativo e, por exemplo, o nome do tópico, como o valor de client.id. O resultado de nossa configuração pode ser visto na saída do comando kafka-consumer-groups dos utilitários do Confluent:

Agora vamos analisar o cenário de entrega garantida de mensagens. O Kafka Producer possui um parâmetro acks que permite que você configure depois de quantos reconhecem que o líder do cluster precisa considerar a mensagem gravada com êxito. Este parâmetro pode assumir os seguintes valores:
- 0 - o reconhecimento não será considerado.
- 1 - parâmetro padrão, é necessário reconhecimento a partir de apenas 1 réplica.
- −1 - é necessário reconhecimento de todas as réplicas sincronizadas ( configuração do cluster min.insync.replicas ).
Pode ser visto pelos valores acima que acks iguais a -1 dão as garantias mais fortes de que a mensagem não será perdida.
Como todos sabemos, sistemas distribuídos não são confiáveis. Para proteger contra falhas temporárias, o Kafka Producer fornece um parâmetro de novas tentativas que permite definir o número de tentativas de repetição durante delivery.timeout.ms . Como o parâmetro de novas tentativas é padronizado como Integer.MAX_VALUE (2147483647), o número de retransmissões de uma mensagem pode ser ajustado alterando apenas delivery.timeout.ms.
Movendo-se exatamente para a entrega única
Essas configurações permitem que nosso produtor envie mensagens com alta garantia. Vamos agora falar sobre como garantir a gravação de apenas uma cópia de uma mensagem em um tópico Kafka? No caso mais simples, para fazer isso no Producer, configure o parâmetro enable.idempotence como true. Idempotency garante a gravação de apenas uma mensagem em uma partição específica de um tópico. Um pré-requisito para ativar a idempotência é acks = all, tente novamente> 0, max.in.flight.requests.per.connection ≤ 5 . Se esses parâmetros não forem definidos pelo desenvolvedor, os valores acima serão definidos automaticamente.
Quando a idempotência é configurada, é necessário garantir que as mesmas mensagens caiam nas mesmas partições todas as vezes. Isso pode ser feito configurando a chave e o parâmetro partitioner.class no Producer. Vamos começar com a chave. Para cada remessa, deve ser o mesmo. Isso é facilmente alcançado usando qualquer identificador de negócios da mensagem original. O parâmetro partitioner.class possui um valor padrão de DefaultPartitioner . Com essa estratégia de particionamento, o comportamento padrão é o seguinte:
- Se a partição for especificada explicitamente ao enviar a mensagem, nós a usaremos.
- Se a partição não estiver especificada, mas a chave estiver especificada, selecione a partição por hash na chave.
- Se a partição e a chave não forem especificadas, selecione as partições por vez (round-robin).
Além disso, o uso da chave e do envio idempotente com o parâmetro max.in.flight.requests.per.connection = 1 fornece um processamento ordenado de mensagens no Consumer. Separadamente, vale lembrar que, se o controle de acesso estiver configurado em seu cluster, você precisará dos direitos de gravação idempotente no tópico.
Se você repentinamente não possuir os recursos de envio idempotente por chave ou a lógica no lado do Produtor exigir a preservação da consistência dos dados entre diferentes partições, as transações serão úteis. Além disso, usando uma transação em cadeia, você pode sincronizar condicionalmente um registro no Kafka, por exemplo, com um registro no banco de dados. Para habilitar o envio transacional para o Producer, é necessário que ele possua idempotência e, opcionalmente, defina transactional.id . Se o controle de acesso estiver configurado no cluster Kafka, para gravação transacional e também idempotente, você precisará de permissões de gravação, que podem ser concedidas pela máscara usando o valor armazenado em transactional.id.
Formalmente, você pode usar qualquer sequência, por exemplo, o nome do aplicativo, como um identificador de transação. Mas se você executar várias instâncias de um aplicativo com o mesmo transactional.id, a primeira instância iniciada será interrompida com um erro, pois o Kafka o considerará um processo zumbi.
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Para resolver esse problema, adicionamos um sufixo ao nome do aplicativo na forma do nome do host, obtido das variáveis de ambiente.
O produtor está configurado, mas as transações no Kafka controlam apenas o escopo da mensagem. Independentemente do status da transação, a mensagem cai imediatamente no tópico, mas possui atributos adicionais do sistema.
Para impedir que essas mensagens sejam lidas antecipadamente pelo Consumidor, ele precisa definir o parâmetro isolamento.level como read_committed. Esse consumidor poderá ler mensagens não transacionais como antes e transacionais somente após uma confirmação.
Se você instalou todas as configurações listadas acima, configurou exatamente uma vez a entrega. Parabéns!
Mas há mais uma nuance. Transactional.id, que configuramos acima, é na verdade um prefixo de transação. No gerenciador de transações, um número de série é adicionado a ele. O identificador recebido é emitido em transactional.id.expiration.ms , configurado no cluster Kafka e com um valor padrão de "7 dias". Se, durante esse período, o aplicativo não recebeu nenhuma mensagem, ao tentar o próximo envio transacional, você receberá uma InvalidPidMappingException . Depois disso, o coordenador da transação emitirá um novo número de sequência para a próxima transação. No entanto, a mensagem pode ser perdida se o InvalidPidMappingException não for processado corretamente.
Em vez de totais
Como você pode ver, apenas enviar mensagens para Kafka não é suficiente. Você precisa escolher uma combinação de parâmetros e estar preparado para fazer alterações rápidas. Neste artigo, tentei mostrar exatamente uma vez as configurações de entrega em detalhes e descrevi vários problemas de configuração client.id e transactional.id que encontramos. O resumo das configurações do produtor e do consumidor está resumido abaixo.
Produtor:
- acks = all
- tentativas> 0
- enable.idempotence = true
- max.in.flight.requests.per.connection ≤ 5 (1 - para envio ordenado)
- transactional.id = $ {application-name} - $ {hostname}
Consumidor:
- isolamento.level = read_committed
Para minimizar erros em aplicativos futuros, criamos nosso invólucro na configuração de mola, onde os valores para alguns dos parâmetros listados já estão definidos.
E aqui estão alguns materiais para estudo independente: