Comment Kafka est devenu réalité


Bonjour, Habr!


Je travaille dans l'équipe Tinkoff, qui développe son propre centre de notification. Pour la plupart, je développe en Java en utilisant Spring Boot et je résous divers problèmes techniques qui surviennent dans le projet.


La plupart de nos microservices interagissent de manière asynchrone les uns avec les autres via un courtier de messages. Auparavant, nous utilisions IBM MQ en tant que courtier, qui a cessé de faire face à la charge, mais en même temps, avait des garanties de livraison élevées.


En remplacement, Apache Kafka nous a été proposé, qui a une grande évolutivité, mais, malheureusement, nécessite une approche de configuration presque individuelle pour différents scénarios. De plus, le mécanisme de livraison au moins une fois, qui fonctionne par défaut dans Kafka, n'a pas permis de maintenir le niveau de cohérence requis hors de la boîte. Ensuite, je partagerai notre expérience dans la configuration de Kafka, en particulier, je vous dirai comment configurer et vivre avec une seule livraison.


Livraison garantie et plus


Les paramètres qui seront discutés plus tard permettront d'éviter un certain nombre de problèmes avec les paramètres de connexion par défaut. Mais d'abord, je veux faire attention à un paramètre qui facilitera un éventuel débogage.


Client.id pour le producteur et le consommateur vous y aidera. À première vue, vous pouvez utiliser le nom de l'application comme valeur, et dans la plupart des cas, cela fonctionnera. Bien que la situation lorsque plusieurs consommateurs sont utilisés dans l'application et que vous leur donnez le même client.id mène à l'avertissement suivant:


org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0 

Si vous souhaitez utiliser JMX dans une application avec Kafka, cela peut être un problème. Dans ce cas, il est préférable d'utiliser une combinaison du nom de l'application et, par exemple, du nom de rubrique, comme valeur de client.id. Le résultat de notre configuration est visible dans la sortie de la commande kafka-consumer-groups des utilitaires de Confluent:



Nous allons maintenant analyser le scénario de livraison garantie des messages. Kafka Producer a un paramètre acks qui vous permet de configurer après combien d'acquittements le chef de cluster doit considérer le message enregistré avec succès. Ce paramètre peut prendre les valeurs suivantes:


  • 0 - accusé de réception ne sera pas pris en considération.
  • 1 - paramètre par défaut, un accusé de réception est requis à partir d'une seule réplique.
  • −1 - un accusé de réception est requis de toutes les répliques synchronisées ( configuration de cluster min.insync.replicas ).

Il peut être vu à partir des valeurs ci-dessus que acks égal à -1 donne les meilleures garanties que le message ne sera pas perdu.


Comme nous le savons tous, les systèmes distribués ne sont pas fiables. Pour se protéger contre les dysfonctionnements temporaires, Kafka Producer fournit un paramètre de nouvelles tentatives qui vous permet de définir le nombre de tentatives de relance pendant delivery.timeout.ms . Étant donné que le paramètre retries par défaut est Integer.MAX_VALUE (2147483647), le nombre de retransmissions d'un message peut être contrôlé en modifiant uniquement delivery.timeout.ms.


Vers une livraison exactement une fois


Ces paramètres permettent à notre producteur de délivrer des messages avec une garantie élevée. Voyons maintenant comment garantir l'enregistrement d'une seule copie d'un message dans un sujet Kafka? Dans le cas le plus simple, pour ce faire sur Producer, définissez le paramètre enable.idempotence sur true. Idempotency garantit l'enregistrement d'un seul message dans une partition particulière d'un sujet. Une condition préalable à l'activation de l'idempotence est acks = all, réessayez> 0, max.in.flight.requests.per.connection ≤ 5 . Si ces paramètres ne sont pas définis par le développeur, les valeurs ci-dessus seront automatiquement définies.


Lorsque idempotency est configuré, il est nécessaire de s'assurer que les mêmes messages tombent à chaque fois dans les mêmes partitions. Cela peut être fait en configurant la clé et le paramètre partitioner.class sur Producer. Commençons par la clé. Pour chaque envoi, il doit être le même. Ceci est facilement réalisé en utilisant n'importe quel identifiant d'entreprise du message d'origine. Le paramètre partitioner.class a la valeur par défaut DefaultPartitioner . Avec cette stratégie de partitionnement, le comportement par défaut est le suivant:


  • Si la partition est explicitement spécifiée lors de l'envoi du message, nous l'utilisons.
  • Si la partition n'est pas spécifiée, mais que la clé est spécifiée, sélectionnez la partition par hachage dans la clé.
  • Si la partition et la clé ne sont pas spécifiées, sélectionnez les partitions tour à tour (round-robin).

De plus, l'utilisation de la clé et l'envoi idempotent avec le paramètre max.in.flight.requests.per.connection = 1 vous donne un traitement ordonné des messages sur Consumer. Séparément, il convient de se rappeler que si le contrôle d'accès est configuré sur votre cluster, vous aurez alors besoin des droits d'écriture idempotente sur le sujet.


Si vous manquez soudainement des capacités d'envoi idempotent par clé ou si la logique du côté producteur nécessite la préservation de la cohérence des données entre les différentes partitions, les transactions viendront à la rescousse. De plus, à l'aide d'une transaction en chaîne, vous pouvez synchroniser conditionnellement un enregistrement dans Kafka, par exemple, avec un enregistrement dans la base de données. Pour activer l'envoi transactionnel au producteur, il est nécessaire qu'il possède l'idempotency, et éventuellement définissez transactional.id . Si le contrôle d'accès est configuré sur votre cluster Kafka, alors pour l'enregistrement transactionnel, ainsi que pour idempotent, vous aurez besoin d'autorisations d'écriture, qui peuvent être accordées par masque en utilisant la valeur stockée dans transactional.id.


Formellement, n'importe quelle chaîne peut être utilisée comme identifiant de transaction, par exemple, le nom d'une application. Mais si vous exécutez plusieurs instances de la même application avec le même transactional.id, la première instance lancée sera arrêtée avec une erreur, car Kafka le considérera comme un processus zombie.


 org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. 

Pour résoudre ce problème, nous ajoutons un suffixe au nom de l'application sous la forme du nom d'hôte, qui est obtenu à partir des variables d'environnement.


Le producteur est configuré, mais les transactions sur Kafka ne contrôlent que la portée du message. Quel que soit le statut de la transaction, le message tombe immédiatement dans la rubrique, mais possède des attributs système supplémentaires.


Pour empêcher que de tels messages soient lus à l'avance par Consumer, il doit définir le paramètre isolation.level sur read_committed. Un tel consommateur pourra lire les messages non transactionnels comme auparavant, et les messages transactionnels uniquement après une validation.
Si vous avez installé tous les paramètres répertoriés ci-dessus, vous avez configuré exactement une fois la livraison. Félicitations!


Mais il y a encore une nuance. Transactional.id, que nous avons configuré ci-dessus, est en fait un préfixe de transaction. Sur le gestionnaire de transactions, un numéro de série lui est ajouté. L'identifiant reçu est émis sur transactional.id.expiration.ms , qui est configuré sur le cluster Kafka et a une valeur par défaut de "7 jours". Si pendant ce temps l'application n'a reçu aucun message, alors lorsque vous essayez le prochain envoi transactionnel, vous recevrez une exception InvalidPidMappingException . Après cela, le coordinateur de transaction émettra un nouveau numéro de séquence pour la prochaine transaction. Toutefois, le message peut être perdu si l'InvalidPidMappingException n'est pas correctement traité.


Au lieu de totaux


Comme vous pouvez le voir, il ne suffit pas d'envoyer des messages à Kafka. Vous devez choisir une combinaison de paramètres et être prêt à effectuer des changements rapides. Dans cet article, j'ai essayé de montrer en détail les paramètres de livraison une seule fois et j'ai décrit plusieurs problèmes de configuration client.id et transactional.id que nous avons rencontrés. Le résumé des paramètres Producteur et Consommateur est résumé ci-dessous.


Producteur:


  1. acks = tout
  2. nouvelles tentatives> 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 - pour l'envoi ordonné)
  5. transactional.id = $ {nom-application} - $ {hostname}

Consommateur:


  1. isolation.level = read_committed

Pour minimiser les erreurs dans les applications futures, nous avons créé notre wrapper sur la configuration du ressort, où les valeurs de certains des paramètres répertoriés sont déjà définies.


Et voici quelques documents pour une étude indépendante:


Source: https://habr.com/ru/post/fr481784/


All Articles