Abonnez-vous à Kafka via HTTP ou comment simplifier vos hooks web

Il existe de nombreuses façons de traiter les messages des systèmes Pub-Sub: en utilisant un service distinct, en isolant un processus isolé, en orchestrant un pool de processus / threads, IPC complexe, Poll-over-Http et bien d'autres. Aujourd'hui, je veux parler de la façon d'utiliser Pub-Sub sur HTTP et de mon service écrit spécifiquement pour cela.

Dans certains cas, l'utilisation d'un backend de service HTTP prêt à l'emploi est une solution idéale pour traiter une file d'attente de messages:

  1. Équilibrage hors de la boîte. Habituellement, le backend est déjà derrière l'équilibreur et dispose d'une infrastructure prête à charger, ce qui simplifie considérablement le travail avec les messages.
  2. Utilisation d'un contrôleur REST standard (n'importe quelle ressource HTTP). La consommation de messages HTTP minimise le coût d'implémentation de compumers pour différentes langues si le backend est mixte.
  3. Simplification de l'utilisation des hooks Web d'autres services. Désormais, presque tous les services (Jira, Gitlab, Mattermost, Slack ...) prennent en charge les crochets Web pour interagir avec le monde extérieur. Vous pouvez vous simplifier la vie si vous apprenez à la file d'attente à exécuter les fonctions d'un répartiteur HTTP.

Cette approche présente également des inconvénients:

  1. Vous pouvez oublier la légèreté de la solution. HTTP est un protocole lourd, et l'utilisation de frameworks du côté du consommateur augmentera instantanément la latence et la charge.
  2. Nous perdons les forces de l'approche Poll, obtenant les faiblesses de Push.
  3. Le traitement des messages par les mêmes instances de service qui traitent les clients peut affecter la réactivité. Ce n'est pas significatif, car il est traité avec équilibrage et isolation.

J'ai implémenté l'idée en tant que service Queue-Over-Http, qui sera discuté plus tard. Le projet est écrit en Kotlin en utilisant Spring Boot 2.1. En tant que courtier, seul Apache Kafka est actuellement disponible.

Plus loin dans l'article, il est supposé que le lecteur connaît Kafka et connaît les validations (validation) et les décalages (décalage) des messages, les principes des groupes (groupe) et des consommateurs (consommateur), et comprend également en quoi la partition (partition) diffère du sujet (sujet) . S'il y a des lacunes, je vous conseille de lire cette section de la documentation Kafka avant de continuer.

Table des matières



Revue


Queue-Over-Http est un service qui agit en tant qu'intermédiaire entre un courtier de messages et le consommateur HTTP final (le service facilite la mise en œuvre de la prise en charge de l'envoi de messages aux consommateurs de toute autre manière, par exemple, divers * RPC). Pour le moment, seuls l'abonnement, le désabonnement et l'affichage de la liste des consommateurs sont disponibles. L'envoi de messages au courtier (produit) via HTTP n'a pas encore été mis en œuvre en raison de l'impossibilité de garantir l'ordre des messages sans un soutien spécial du producteur.

Le personnage clé du service est le consommateur, qui peut s'abonner à des partitions spécifiques ou simplement à des sujets (le modèle de sujet est pris en charge). Dans le premier cas, l'équilibre automatique des partitions est désactivé. Après s'être abonné, la ressource HTTP spécifiée commence à recevoir des messages des partitions Kafka attribuées. Sur le plan architectural, chaque abonné est associé à un client Java Kafka natif.

histoire divertissante sur KafkaConsumer
Kafka a un merveilleux client Java qui peut faire beaucoup. Je l'utilise dans l'adaptateur de file d'attente pour recevoir des messages du courtier, puis je l'envoie aux files d'attente de service local. Il est à noter que le client travaille exclusivement dans le cadre d'un seul thread.

L'idée de l'adaptateur est simple. Nous commençons dans un thread, nous écrivons le planificateur le plus simple des clients natifs, en nous concentrant sur la réduction de la latence. Autrement dit, nous écrivons quelque chose de similaire:

while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) /*      */ if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) { // ,    Thread.sleep(1) } } 

Il semblerait que tout soit merveilleux, la latence est minime même avec des dizaines de consommateurs. En pratique, il s'est avéré que KafkaConsumer à ce mode de fonctionnement et donne un taux d'allocation d'environ 1,5 Mo / s en temps d'inactivité. Avec 100 courriers, le taux d'allocation atteint 150 Mo / s et fait penser souvent à l'application GC. Bien sûr, toutes ces ordures sont dans la zone jeune, GC est tout à fait capable de gérer cela, mais toujours, la solution n'est pas parfaite.

Évidemment, vous devez suivre la voie typique de KafkaConsumer et maintenant je place chaque abonné dans mon flux. Cela donne une surcharge pour la mémoire et la planification, mais il n'y a pas d'autre moyen.

Je réécris le code d'en haut, supprimant la boucle interne et changeant Duration.ZERO en Duration.ofMillis(100) . Il s'avère que le taux d'allocation tombe à un niveau acceptable de 80 à 150 Ko / s par consommateur. Cependant, Poll avec un délai d'expiration de 100 ms retarde toute la file d'attente des validations sur ces mêmes 100 ms, ce qui est beaucoup inacceptable.

Dans le processus de recherche de solutions au problème, je me souviens de KafkaConsumer::wakeup , qui lève une WakeupException et interrompt toute opération de blocage sur le consommateur. Avec cette méthode, le chemin vers une faible latence est simple: lorsqu'une nouvelle demande de validation arrive, nous la mettons dans la file d'attente, et sur le consommateur natif, nous appelons le wakeup . Dans le cycle de travail, WakeupException et allez valider ce qui s'est accumulé. Pour le transfert de contrôle à l'aide d'exceptions, vous devez immédiatement le remettre entre vos mains, mais puisque rien d'autre ...

Il s'avère que cette option est loin d'être parfaite, car toute opération sur le consommateur natif WakeupException désormais une WakeupException , y compris la validation elle-même. Le traitement de cette situation encombrera le code avec un indicateur permettant d'effectuer le wakeup .

J'arrive à la conclusion qu'il serait bien de modifier la méthode KafkaConsumer::poll afin qu'elle puisse être interrompue normalement, selon un indicateur supplémentaire. En conséquence, Frankenstein est né de la réflexion, qui copie exactement la méthode de sondage d'origine, en ajoutant une sortie de la boucle par le drapeau. Cet indicateur est défini par une méthode interruptPoll distincte, qui, en outre, appelle le réveil sur le sélecteur client pour libérer le verrou de thread sur les opérations d'E / S.

Après avoir implémenté le client de cette manière, j'obtiens la vitesse de réaction à partir du moment où une demande de validation arrive jusqu'à ce qu'elle soit traitée jusqu'à 100 microsecondes, et une excellente latence pour récupérer les messages d'un courtier, ce qui est très bien.

Chaque partition est représentée par une file d'attente locale distincte, où l'adaptateur écrit des messages à partir du courtier. Le travailleur en prend des messages et les envoie pour exécution, c'est-à-dire pour l'envoi via HTTP.

Le service prend en charge le traitement des messages par lots pour augmenter le débit. Lors de l'abonnement, vous pouvez spécifier le concurrencyFactor chaque rubrique (s'applique à chaque partition affectée indépendamment). Par exemple, concurrencyFactor=1000 signifie que 1000 messages sous forme de requêtes HTTP peuvent être envoyés au consommateur en même temps. Dès que tous les messages du pack ont ​​été établis sans ambiguïté par le consommateur, le service décide de la prochaine validation de l'offset du dernier message dans Kafka. Par conséquent, la deuxième valeur de concurrencyFactor est le nombre maximal de messages traités par le consommateur en cas de panne de Kafka ou Queue-Over-Http.

Pour réduire les délais, la file d'attente a loadFactor = concurrencyFactor * 2 , ce qui vous permet de lire deux fois plus de messages du courtier que vous pouvez envoyer. Étant donné que la validation automatique est désactivée sur le client natif, un tel schéma ne viole pas les garanties At-Least-Once.
Une valeur concurrencyFactor élevée augmente le débit de la file d'attente en réduisant le nombre de validations qui prennent jusqu'à 10 ms dans le pire des cas. Dans le même temps, la charge sur le consommateur augmente.

L'ordre d'envoi des messages dans le bundle n'est pas garanti, mais il peut être atteint en définissant concurrencyFactor=1 .

S'engage


Les commits sont une partie importante du service. Lorsque le prochain paquet de données est prêt, le décalage du dernier message du paquet est immédiatement validé dans Kafka, et ce n'est qu'après une validation réussie que le prochain paquet devient disponible pour le traitement. Souvent, cela ne suffit pas et une validation automatique est requise. Pour ce faire, il existe le paramètre autoCommitPeriodMs , qui a peu de choses en commun avec la période classique d'autocommit pour les clients natifs qui valident le dernier message lu depuis la partition. Imaginez concurrencyFactor=10 . Le service a envoyé les 10 messages et attend que chacun d'eux soit prêt. Le traitement du message 3 est terminé en premier, puis le message 1, puis le message 10. À ce stade, il est temps pour la validation automatique. Il est important de ne pas violer la sémantique At-Least-Once. Par conséquent, vous ne pouvez valider que le premier message, c'est-à-dire l'offset 2, car seul il a été traité avec succès à ce moment. De plus, jusqu'à la prochaine validation automatique, les messages 2, 5, 6, 4 et 8. sont traités. Vous devez maintenant valider uniquement l'offset 7, etc. Autocommit n'a presque aucun effet sur le débit.

Gestion des erreurs


En mode de fonctionnement normal, le service envoie une fois un message au superviseur. Si, pour une raison quelconque, il a provoqué une erreur 4xx ou 5xx, le service renverra le message en attendant la réussite du traitement. Le temps entre les tentatives peut être configuré comme un paramètre distinct.

Il est également possible de définir le nombre de tentatives après lesquelles le message sera marqué comme traité, ce qui arrêtera les retransmissions quel que soit l'état de la réponse. Je déconseille de l'utiliser pour des données sensibles, les situations de défaillance des consommateurs doivent toujours être ajustées manuellement. Les messages persistants peuvent être surveillés par les journaux de service et la surveillance de l'état de la réponse du consommateur.

de coller
Habituellement, le serveur HTTP, donnant à 4xx ou 5xx l'état de la réponse, envoie également l'en-tête Connection: close . Une connexion TCP qui est fermée de cette manière reste à l'état TIME_WAITED jusqu'à ce qu'elle soit effacée par le système d'exploitation après un certain temps. Le problème est que ces connexions occupent un port entier qui ne peut pas être réutilisé avant d'être libéré. Cela peut entraîner l'absence de ports libres sur la machine pour établir une connexion TCP et le service sera levé avec des exceptions dans les journaux pour chaque envoi. En pratique, sur Windows 10, les ports se terminent après 10 à 20 000 envois de messages erronés en 1 à 2 minutes. En mode standard, ce n'est pas un problème.

Des messages


Chaque message extrait du courtier est envoyé au conseiller via HTTP vers la ressource spécifiée lors de l'abonnement. Par défaut, un message est envoyé par une requête POST dans le corps. Ce comportement peut être modifié en spécifiant toute autre méthode. Si la méthode ne prend pas en charge l'envoi de données dans le corps, vous pouvez spécifier le nom du paramètre de chaîne dans lequel le message sera envoyé. De plus, lors de l'abonnement, vous pouvez spécifier des en-têtes supplémentaires qui seront ajoutés à chaque message, ce qui est pratique pour l'autorisation de base à l'aide de jetons. Des en-têtes sont ajoutés à chaque message avec l'identifiant du consommateur, le sujet et la partition, d'où le message a été lu, le numéro du message, la clé de partition, le cas échéant, ainsi que le nom du courtier.

Performances


Pour évaluer les performances, j'ai utilisé un PC (Windows 10, OpenJDK-11 (G1 sans réglage), i7-6700K, 16 Go), qui exécute le service et un ordinateur portable (Windows 10, i5-8250U, 8 Go), sur lequel le producteur de messages, HTTP Consommateur de ressources et Kafka avec les paramètres par défaut. Le PC est connecté au routeur via une connexion filaire 1Gb / s, l'ordinateur portable via 802.11ac. Le producteur écrit toutes les 110 ms toutes les 100 ms pour 110 octets de messages dans les sujets désignés pour lesquels les consommateurs sont abonnés ( concurrencyFactor=500 , la validation automatique est désactivée) à partir de différents groupes. Le support est loin d'être idéal, mais vous pouvez obtenir une image.

Un paramètre de mesure clé est l'effet du service sur la latence.

Soit:
- t q - horodatage du service recevant les messages du client natif
- d t0 est le temps entre t q et le moment où le message a été envoyé de la file d'attente locale au pool de cadres
- d t est le temps entre t q et le moment où la requête HTTP a été envoyée. Ce d t est l'influence du service sur la latence du message.

Au cours des mesures, les résultats suivants ont été obtenus (C - consommateurs, T - thèmes, M - messages):



En mode de fonctionnement standard, le service lui-même n'affecte presque pas la latence et la consommation de mémoire est minime. Les valeurs maximales de d t (environ 60 ms) ne sont pas spécifiquement indiquées, car elles dépendent du fonctionnement du GC, et non du service lui-même. Un réglage spécial de GC ou le remplacement de G1 par Shenandoah peut aider à lisser la propagation des valeurs maximales.

Tout change radicalement lorsque le consommateur ne gère pas le flux de messages de la file d'attente et que le service active le mode de limitation. Dans ce mode, la consommation de mémoire augmente, car le temps de réponse aux requêtes augmente considérablement, ce qui empêche un nettoyage rapide des ressources. L'effet sur la latence reste ici au niveau des résultats précédents et les valeurs dt élevées sont causées par le préchargement des messages dans la file d'attente locale.

Malheureusement, il n'est pas possible de tester à une charge plus élevée, car l'ordinateur portable se plie déjà à 1300 RPS. Si quelqu'un peut aider à l'organisation des mesures à des charges élevées, je fournirai volontiers un ensemble pour les tests.

Démonstration


Passons maintenant à la démonstration. Pour cela, nous avons besoin de:

  • Courtier Kafka, prêt à partir. Je prendrai l'instance déclenchée le 192.168.99.100:9092 de Bitnami.
  • Une ressource HTTP qui recevra des messages. Pour plus de clarté, j'ai pris des crochets Web de Slack.

Tout d'abord, vous devez augmenter le service Queue-Over-Http lui-même. Pour ce faire, créez le contenu suivant dans un répertoire application.yml vide:

 spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092" 

Ici, nous indiquons au service les paramètres de connexion d'un courtier spécifique, ainsi que l'emplacement de stockage des abonnés afin qu'ils ne soient pas perdus entre les démarrages. Dans `app.brokers []. Config`, vous pouvez spécifier tous les paramètres de connexion pris en charge par le client Kafka natif; une liste complète peut être trouvée ici .

Comme le fichier de configuration est traité par Spring, vous pouvez y écrire beaucoup de choses intéressantes. Y compris, configurer la journalisation.

Exécutez maintenant le service lui-même. Nous utilisons le moyen le plus simple - docker-compose.yml :

 version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist 

Si cette option ne vous convient pas, vous pouvez compiler le service à partir de la source. Instructions d'assemblage dans le projet Readme, dont un lien est donné à la fin de l'article.

L'étape suivante consiste à enregistrer le premier abonné. Pour ce faire, vous devez effectuer une demande HTTP auprès du service avec une description du consommateur:

 POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } } 

Si tout s'est bien passé, la réponse sera presque le même contenu envoyé.

Passons en revue chaque paramètre:

  • Consumer.id - ID de notre abonné
  • Consumer.group.id - identifiant de groupe
  • Consumer.broker - indiquez à quels courtiers de services vous devez vous abonner
  • Consumer.topics[0].name - le nom du sujet à partir duquel nous voulons recevoir des messages
  • Consumer.topics[0].config. concurrencyFactor Consumer.topics[0].config. concurrencyFactor - nombre maximum de messages envoyés simultanément
  • Consumer.topics[0].config. autoCommitPeriodMs Consumer.topics[0].config. autoCommitPeriodMs - période de validation forcée pour les messages prêts
  • Consumer.subscriptionMethod.type - type d'abonnement. Seul HTTP est actuellement disponible.
  • Consumer.subscriptionMethod.delayOnErrorMs - délai avant de renvoyer un message qui s'est terminé par une erreur
  • Consumer.subscriptionMethod.retryBeforeCommit - le nombre de tentatives pour renvoyer le message d'erreur. Si 0 - le message tournera jusqu'à ce que le traitement soit réussi. Dans notre cas, la garantie d'une livraison complète n'est pas aussi importante que la constance du flux.
  • Consumer.subscriptionMethod.uri - la ressource à laquelle les messages seront envoyés
  • Consumer.subscriptionMethod.additionalHeader - en-têtes supplémentaires qui seront envoyés avec chaque message. Notez qu'il y aura du JSON dans le corps de chaque message afin que Slack puisse interpréter correctement la demande.

Dans cette demande, la méthode HTTP est omise, car la valeur par défaut, POST, Slack est assez bonne.

À partir de ce moment, le service surveille les partitions affectées de la rubrique slack.test pour les nouveaux messages.

Pour écrire des messages sur le sujet, j'utiliserai les utilitaires intégrés dans Kafka qui se trouvent dans /opt/bitnami/kafka/bin image Kafka lancée (l'emplacement des utilitaires dans d'autres instances de Kafka peut différer):

 kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {“text”: “Hello!”} 

Dans le même temps, Slack vous informera d'un nouveau message:



Pour désinscrire un consommateur, il suffit de faire une demande POST de «courtier / désinscrire» avec le même contenu que lors de l'abonnement.

Conclusion


Pour le moment, seule la fonctionnalité de base est implémentée. De plus, il est prévu d'améliorer le traitement par lots, d'essayer d'implémenter une sémantique exacte, d'ajouter la possibilité d'envoyer des messages au courtier via HTTP et, plus important encore, de prendre en charge d'autres Pub-Sub populaires.

Le service Queue-Over-Http est actuellement en cours de développement. La version 0.1.3 est suffisamment stable pour les tests sur les stands de développement et de scène. Les performances ont été testées sur Windows 10, Debian 9 et Ubuntu 18.04. Vous pouvez utiliser prod à vos risques et périls. Si vous souhaitez aider au développement ou donner des commentaires sur le service - bienvenue dans le projet Github .

Source: https://habr.com/ru/post/fr435346/


All Articles