Bonjour, Habr!
Nous avons découvert les dernières réserves du livre "
Apache Kafka. Stream Processing and Data Analysis " et l'avons envoyé au prépresse. De plus, nous avons reçu un contrat pour le livre "
Kafka Streams in Action " et commençons à le traduire littéralement la semaine prochaine.

Pour montrer le cas intéressant de l'utilisation de la bibliothèque Kafka Streams, nous avons décidé de traduire l'article sur le paradigme Event Sourcing en Kafka du très Adam Worski, dont l'
article sur la langue Scala a été publié il y a deux semaines. Il est encore plus intéressant de noter que l’avis d’Adam Worski n’est pas indéniable:
ici , par exemple, on soutient que ce paradigme ne convient certainement pas à Kafka. D'autant plus mémorable, nous l'espérons, que nous avons l'impression de l'article.
Le terme «Event Sourcing» est traduit par «Event Logging» à la fois dans notre publication de
Clean Architecture de Robert Martin et dans cet article. Si quelqu'un est impressionné par la traduction des "événements de pompage" - faites-le moi savoir.
Création d'un système qui fournit l'enregistrement des événements (sourcing d'événements), tôt ou tard, nous sommes confrontés au problème de la persistance (persistance) - et ici, nous avons quelques options. Il y a tout d'abord
EventStore , une implémentation mature endurcie au combat. Alternativement, vous pouvez utiliser
akka-persistence pour tirer pleinement parti de
l' évolutivité de
Cassandra , ainsi que compter sur les performances du modèle d'acteur. Une autre option est la bonne vieille
base de données relationnelle , où l'approche
CRUD
est combinée avec l'utilisation d'événements et le maximum d'avantages est évincé des transactions.
En plus de ces opportunités (et peut-être de nombreuses autres) qui se sont présentées grâce à plusieurs choses récemment mises en œuvre, il est devenu assez simple aujourd'hui d'organiser l'enregistrement d'événements au-dessus de
Kafka . Voyons comment.
Qu'est-ce que la journalisation des événements?Il existe un certain nombre d'
excellents articles d' introduction à ce sujet, je me limiterai donc à l'introduction la plus concise. Lors de l'enregistrement des événements, nous ne sauvegardons pas l'état «actuel» des entités utilisées dans notre système, mais le flux d'événements liés à ces entités. Chaque
événement est un
fait qui décrit un changement d'état (déjà!) Qui s'est
produit avec l'objet. Comme vous le savez, les faits ne sont pas discutés et
inchangés .
Lorsque nous avons un flux de tels événements, l'état actuel d'une entité peut être clarifié en minimisant tous les événements qui lui sont liés; cependant, gardez à l'esprit que l'inverse n'est pas possible - en ne conservant que l'état "actuel", nous rejetons de nombreuses informations chronologiques précieuses.
La journalisation des événements peut
coexister pacifiquement avec des méthodes plus traditionnelles de stockage de l'état. En règle générale, le système traite un certain nombre de types d'entités (par exemple: utilisateurs, commandes, marchandises, ...) et il est fort possible que l'enregistrement d'événements ne soit utile que pour certaines de ces catégories. Il est important de noter qu'ici nous ne sommes pas confrontés au choix du «tout ou rien»; il s'agit simplement de la fonction de gestion d'état supplémentaire dans notre application.
Stockage d'événements à KafkaLe premier problème à résoudre: comment stocker les événements à Kafka? Il existe trois stratégies possibles:
- Stockez tous les événements pour tous les types d'entités dans un seul sujet (avec de nombreux segments)
- Par sujet par type d'entité, c'est-à-dire que nous supprimons tous les événements liés à l'utilisateur dans un sujet distinct, dans un sujet distinct - tous liés au produit, etc.
- Par sujet par essence, c'est-à-dire par un sujet distinct pour chaque utilisateur spécifique et chaque nom de produit
La troisième stratégie (sujet par essence) est pratiquement impraticable. Si, lorsque chaque nouvel utilisateur apparaissait dans le système, il devait démarrer un sujet distinct, le nombre de sujets deviendrait bientôt illimité. Toute agrégation dans ce cas serait très difficile, par exemple, il serait difficile d'indexer tous les utilisateurs dans un moteur de recherche; non seulement vous auriez à consommer un grand nombre de sujets - mais tous n'étaient pas connus à l'avance.
Il reste donc à choisir entre 1 et 2. Les deux options ont leurs avantages et leurs inconvénients. Le fait d'avoir un seul sujet facilite
la visualisation globale de tous les événements. D'un autre côté, en mettant en surbrillance le sujet pour chaque type d'entité, vous pouvez mettre à l'échelle et segmenter le flux de chaque entité individuellement. Le choix de l'une des deux stratégies dépend du cas d'utilisation spécifique.
De plus, vous pouvez implémenter les deux stratégies à la fois, si vous disposez d'un espace de stockage supplémentaire: produire des rubriques par type d'entité à partir d'une rubrique complète.

Dans le reste de l'article, nous travaillerons avec un seul type d'entité et avec un seul sujet, bien que le matériel présenté puisse être facilement extrapolé et appliqué pour fonctionner avec de nombreux sujets ou types d'entité.
(EDIT: comme l'a noté
Chris Hunt , il y a
un excellent article de Martin Kleppman , qui a examiné en détail comment répartir les événements par sujet et segment).
Les opérations de stockage les plus simples dans le paradigme de la journalisation des événementsL'opération la plus simple, logique à attendre d'un magasin qui prend en charge la journalisation des événements, consiste à lire l'état "actuel" (minimisé) d'une entité particulière. En règle générale, chaque entité a un ou un autre
id
. Par conséquent, connaissant cet
id
, notre système de stockage doit retourner l'état actuel de l'objet.
La vérité en dernier ressort sera le journal des événements: l'état actuel peut toujours être déduit du flux d'événements associés à une entité particulière. Pour cela, le moteur de base de données aura besoin d'une fonction pure (sans effets secondaires) qui accepte l'événement et l'état initial et renvoie l'état modifié:
Event = > State => State
. En présence d'une telle fonction et de la
valeur de l'état initial, l' état actuel est une
convolution du flux d'événements (la fonction de changement d'état doit être
propre afin qu'elle puisse être librement appliquée à plusieurs reprises aux mêmes événements.)
Une implémentation simplifiée de l'opération «lire l'état actuel» dans Kafka collecte un flux de
tous les événements du sujet, les filtre, ne laissant que les événements avec l'
id
donné et s'effondre à l'aide de la fonction spécifiée. S'il y a beaucoup d'événements (et au fil du temps le nombre d'événements ne fait qu'augmenter), cette opération peut devenir lente et consommer beaucoup de ressources. Même si son résultat sera mis en cache dans la mémoire et stocké sur le nœud de service, ces informations devront toujours être recréées périodiquement, par exemple, en raison de défaillances de nœud ou en raison de l'éviction des données de cache.

Par conséquent, un moyen plus rationnel est nécessaire. C'est là que les kafka-streams et les dépôts d'état sont utiles. Les applications Kafka-streams s'exécutent sur tout un cluster de nœuds qui consomment certains sujets ensemble. Chaque nœud se voit attribuer une série de segments de sujet consommés, tout comme avec le consommateur Kafka habituel. Cependant, kafka-streams fournit des opérations de données de niveau supérieur qui facilitent la création de flux dérivés.
Une telle opération dans les
flux kafka est la convolution d'un flux dans le stockage local. Chaque stockage local contient des données provenant uniquement des segments consommés par un nœud donné.
Prêt à l'
emploi , deux implémentations de stockage local sont disponibles:
en RAM et basées sur
RocksDB .
Revenant au sujet de l'enregistrement des événements, nous notons qu'il est possible de réduire le flux d'événements dans
le magasin d'état en maintenant sur le nœud local «l'état actuel» de chaque entité à partir des segments affectés au nœud. Si nous utilisons l'implémentation du magasin d'état basé sur RocksDB, le nombre d'entités que nous pouvons suivre sur un seul nœud dépend uniquement de la quantité d'espace disque.
Voici à quoi ressemble la convolution des événements dans le stockage local lors de l'utilisation de l'API Java (serde signifie "sérialiseur / désérialiseur"):
KStreamBuilder builder = new KStreamBuilder(); builder.stream(keySerde, valueSerde, "my_entity_events") .groupByKey(keySerde, valueSerde)
Un exemple complet
de traitement des commandes basé sur des microservices est disponible sur le site Internet de Confluent.
(EDIT: comme indiqué par
Sergei Egorov et
Nikita Salnikov sur Twitter, pour un système avec journalisation des événements, vous devez probablement modifier les paramètres de stockage de données par défaut dans Kafka afin qu'aucune limite de temps ou de taille ne fonctionne, et aussi, éventuellement, , activez la compression des données.)
Afficher l'état actuelNous avons créé un référentiel d'états où se trouvent les états actuels de toutes les entités provenant des segments affectés au nœud, mais comment demander ce référentiel maintenant? Si la demande est locale (c'est-à-dire qu'elle provient du même noeud où se trouve le référentiel), alors tout est assez simple:
streams .store("my_entity_store", QueryableStoreTypes.keyValueStore()); .get(entityId);
Mais que se passe-t-il si nous voulons demander des données situées sur un autre nœud? Et comment savoir ce qu'est ce nœud? Ici, une autre fonctionnalité récemment introduite dans Kafka est très pratique:
les requêtes interactives . Avec leur aide, vous pouvez accéder aux métadonnées Kafka et découvrir quel nœud traite le segment de rubrique avec l'
id
donné (dans ce cas, l'outil de segmentation de rubrique est implicitement utilisé):
metadataService .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)
Ensuite, vous devez rediriger la demande vers le nœud correct. Veuillez noter: la manière spécifique dont la communication intersite est mise en œuvre et gérée - que ce soit REST, akka-remote ou autre - n'appartient pas à la zone de responsabilité de kafka-streams. Kafka fournit simplement un accès au magasin d'état et fournit des informations sur le nœud où se trouve le magasin d'état pour l'
id
donné.
Reprise après sinistreLes magasins d'état sont beaux, mais que se passe-t-il lorsqu'un nœud tombe en panne? La reconstruction d'un magasin d'état local pour un segment donné peut également être une opération coûteuse. Cela peut provoquer une augmentation des retards ou une perte de demandes pendant une longue période, car les flux kafka devront être rééquilibrés (après l'ajout ou la suppression d'un nœud).
C'est pourquoi, par défaut, les magasins d'état à long terme sont enregistrés: c'est-à-dire que toutes les modifications apportées au magasin sont également écrites dans le journal des modifications. Cette rubrique est compressée (car pour chaque
id
nous ne sommes intéressés que par le dernier enregistrement, sans historique des modifications, car l'historique est stocké dans les événements eux-mêmes) - par conséquent, il est aussi petit que possible. C'est pourquoi la recréation du stockage sur un autre nœud peut se produire beaucoup plus rapidement.
Cependant, avec le rééquilibrage dans ce cas, des retards sont encore possibles. Pour les réduire davantage, kafka-streams offre la possibilité de contenir plusieurs
réplicas de sauvegarde (
num.standby.replicas
) pour chaque référentiel. Ces répliques appliquent toutes les mises à jour récupérées à partir des rubriques avec les journaux des modifications à mesure qu'elles deviennent disponibles et sont prêtes à basculer vers le mode de magasin d'état principal pour un segment donné dès que le magasin principal actuel échoue.
CohérenceAvec les paramètres par défaut, Kafka fournit au moins une livraison unique. Autrement dit, en cas de défaillance d'un nœud, certains messages peuvent être remis plusieurs fois. Par exemple, il est possible qu'un événement particulier soit appliqué deux fois au magasin d'état si le système se bloque après que le magasin d'état a été modifié dans le journal, mais avant que le décalage de cet événement particulier n'ait été effectué. Cela ne posera peut-être aucune difficulté: notre fonction de mise à jour d'état (
Event = > State => State
) peut tout à fait normalement faire face à de telles situations. Cependant, il peut ne pas être en mesure de faire face: dans ce cas, les garanties de
livraison strictement ponctuelle fournies par Kafka peuvent être utilisées. Ces garanties s'appliquent uniquement lors de la lecture et de l'écriture des rubriques Kafka, mais c'est ce que nous faisons ici:
en arrière-plan, toutes les entrées des rubriques Kafka sont réduites à la mise à jour du journal des modifications du magasin d'état et à l'exécution de décalages. Tout cela peut se faire
sous forme de transactions .
Par conséquent, si notre fonction de mise à jour de l'état l'exige, nous pouvons activer la sémantique du traitement de flux «livraison strictement ponctuelle» en utilisant la seule option de configuration:
processing.guarantee
. Pour cette raison, les performances chutent, mais rien ne vient en vain.
Écoute de l'événementMaintenant que nous avons couvert les bases - interroger «l'état actuel» et le mettre à jour pour chaque entité - qu'en est-il du déclenchement
des effets secondaires ? À un moment donné, cela deviendra nécessaire, par exemple, pour:
- Envoi d'e-mails de notification
- Indexation des entités des moteurs de recherche
- Appel de services externes via REST (ou SOAP, CORBA, etc.)
Toutes ces tâches sont, à un degré ou à un autre, bloquantes et liées aux opérations d'E / S (ce qui est naturel pour les effets secondaires), donc ce n'est probablement pas une bonne idée de les exécuter dans le cadre de la logique de mise à jour de l'état: en conséquence, la fréquence des échecs dans la boucle principale peut augmenter événements, et en termes de performances, il y aura un goulot d'étranglement.
De plus, une fonction avec une logique de mise à jour d'état (E
Event = > State => State
) peut être exécutée plusieurs fois (en cas d'échecs ou de redémarrages), et le plus souvent, nous voulons minimiser le nombre de cas dans lesquels les effets secondaires d'un événement particulier sont exécutés plusieurs fois.
Heureusement, puisque nous travaillons avec des sujets Kafka, nous avons une assez grande flexibilité. Au stade des flux, lorsque le magasin d'état est mis à jour, les événements peuvent être émis sous forme inchangée (ou, si nécessaire, également sous une forme modifiée), et le flux / sujet résultant (dans Kafka ces concepts sont équivalents) peut être consommé comme vous le souhaitez. De plus, il peut être consommé avant ou après l'étape de mise à jour de l'état. Enfin, nous pouvons contrôler la façon dont nous lançons les effets secondaires: au moins une fois ou au maximum une fois. La première option est fournie si vous n'effectuez le décalage de l'événement-sujet consommé qu'après que tous les effets secondaires se sont correctement déroulés. Inversement, avec un maximum d'un run, nous effectuons des changements jusqu'à ce que des effets secondaires se déclenchent.
Il existe plusieurs options pour déclencher des effets secondaires, elles dépendent de la situation pratique spécifique. Tout d'abord, vous pouvez définir l'étape des flux Kafka où les effets secondaires de chaque événement sont déclenchés dans le cadre de la fonction de traitement des flux.
La mise en place d'un tel mécanisme est assez simple, mais cette solution n'est pas flexible lorsque vous devez gérer des tentatives, contrôler les décalages et concurrencer les compensations pour de nombreux événements à la fois. Dans ces cas plus complexes, il peut être plus approprié de déterminer le traitement en utilisant, disons,
réactif-kafka ou un autre mécanisme qui consomme les sujets de Kafka "directement".
Il est également possible qu'un événement
déclenche d'autres événements - par exemple, l'événement «commande» peut déclencher les événements «préparation pour l'expédition» et «notification client». Cela peut également être mis en œuvre au stade des flux kafka.
Enfin, si nous voulions stocker des événements ou des données extraites d'événements dans une base de données ou un moteur de recherche, par exemple, dans ElasticSearch ou PostgreSQL, nous pourrions utiliser le
connecteur Kafka Connect , qui traitera pour nous tous les détails liés à la consommation de sujets.
Création de vues et de projectionsEn règle générale, la configuration système requise ne se limite pas à interroger et à traiter uniquement des flux d'entité unique. L'agrégation et la combinaison de plusieurs flux d'événements doivent également être prises en charge. Ces flux combinés sont souvent appelés
projections , et lorsqu'ils sont réduits, ils peuvent être utilisés pour créer des
représentations de données . Est-il possible de les implémenter avec Kafka?

Encore une fois, oui! N'oubliez pas qu'en principe, nous traitons simplement du sujet Kafka, où nos événements sont stockés; par conséquent, nous avons toute la puissance du consommateur / producteur Kafka brut, du combineur kafka-streams et même de
KSQL - tout cela nous est utile pour définir les projections. Par exemple, en utilisant kafka-streams, vous pouvez filtrer un flux, l'afficher, le grouper par clé, l'agréger dans des fenêtres temporaires ou de session, etc. soit au niveau du code, soit en utilisant KSQL de type SQL.
De tels flux peuvent être stockés et fournis pour des requêtes pendant une longue période à l'aide de magasins d'état et de requêtes interactives, tout comme nous l'avons fait avec des flux d'entités individuelles.
Et ensuitePour éviter le flux infini d'événements au fur et à mesure que le système se développe, une option de compression telle que la sauvegarde des
instantanés de «l'état actuel» peut être utile. Ainsi, nous pouvons nous limiter à ne stocker que quelques instantanés récents et les événements survenus après leur création.
Bien que Kafka ne prenne pas directement en charge les instantanés (et dans certains autres systèmes fonctionnant sur le principe de l'enregistrement des événements, c'est le cas), vous pouvez certainement ajouter ce type de fonctionnalité vous-même, en utilisant certains des mécanismes ci-dessus, tels que les flux, les consommateurs, les magasins d'État, etc. d.
RésuméBien que, au départ, Kafka n'ait pas été conçu avec un œil sur le paradigme d'enregistrement des événements, il s'agit en fait d'un moteur de données en streaming avec prise en charge de la
réplication de sujets , de la segmentation,
des référentiels d'état et
des API de streaming , et il est très flexible en même temps. Par conséquent, en plus de Kafka, vous pouvez facilement implémenter un système d'enregistrement d'événements. De plus, comme dans le contexte de tout ce qui se passe, nous aurons toujours un sujet Kafka, nous gagnerons en flexibilité supplémentaire, car nous pouvons travailler avec des API de streaming de haut niveau ou des consommateurs de bas niveau.