Le livre Kafka Streams en action. Applications et microservices en temps réel »

image Salut, habrozhiteli! Ce livre convient Ă  tout dĂ©veloppeur qui souhaite comprendre le traitement en streaming. Comprendre la programmation distribuĂ©e vous aidera Ă  mieux comprendre Kafka et Kafka Streams. Ce serait bien de connaĂźtre le framework Kafka lui-mĂȘme, mais ce n'est pas nĂ©cessaire: je vais vous dire tout ce dont vous avez besoin. GrĂące Ă  ce livre, les dĂ©veloppeurs Kafka expĂ©rimentĂ©s, comme les novices, apprendront Ă  crĂ©er des applications de streaming intĂ©ressantes Ă  l'aide de la bibliothĂšque Kafka Streams. Les dĂ©veloppeurs Java intermĂ©diaires et de haut niveau familiarisĂ©s avec des concepts tels que la sĂ©rialisation apprendront Ă  appliquer leurs compĂ©tences pour crĂ©er des applications Kafka Streams. Le code source du livre est Ă©crit en Java 8 et utilise essentiellement la syntaxe des expressions lambda de Java 8, donc la possibilitĂ© de travailler avec des fonctions lambda (mĂȘme dans un autre langage de programmation) vous est utile.

Extrait. 5.3. OpĂ©rations d'agrĂ©gation et de fenĂȘtres


Dans cette section, nous passons aux parties les plus prometteuses de Kafka Streams. Jusqu'à présent, nous avons couvert les aspects suivants des flux Kafka:

  • crĂ©er une topologie de traitement;
  • utilisation de l'Ă©tat dans les applications de streaming;
  • Ă©tablir des connexions de flux de donnĂ©es;
  • diffĂ©rences entre les flux d'Ă©vĂ©nements (KStream) et les flux de mise Ă  jour (KTable).

Dans les exemples suivants, nous allons rassembler tous ces Ă©lĂ©ments. De plus, vous serez initiĂ© aux opĂ©rations sur les fenĂȘtres - une autre grande fonctionnalitĂ© des applications de streaming. Notre premier exemple sera l'agrĂ©gation simple.

5.3.1. Agrégation des ventes d'actions par industrie


L'agrégation et le regroupement sont des outils essentiels pour travailler avec des données en streaming. L'examen des dossiers individuels sur une base d'admission n'est souvent pas suffisant. Pour extraire des informations supplémentaires des données, leur regroupement et leur combinaison sont nécessaires.

Dans cet exemple, vous devez essayer la combinaison d'un trader intrajournalier qui doit suivre le volume des ventes d'actions de sociétés dans plusieurs secteurs. En particulier, vous vous intéressez aux cinq sociétés qui réalisent les plus grandes parts de ventes dans chaque industrie.

Pour une telle agrégation, vous aurez besoin de plusieurs des étapes suivantes pour traduire les données sous la forme souhaitée (en termes généraux).

  1. Créez une source thématique qui publie des informations brutes sur les transactions boursiÚres. Nous devrons mapper un objet de type StockTransaction à un objet de type ShareVolume. Le fait est que l'objet StockTransaction contient des métadonnées de vente, et nous n'avons besoin que de données sur le nombre d'actions vendues.
  2. Groupez les données de volume de partage par symboles boursiers. AprÚs avoir regroupé par symboles, vous pouvez réduire ces données en sous-totaux des ventes d'actions. Il convient de noter que la méthode KStream.groupBy renvoie une instance de type KGroupedStream. Et vous pouvez obtenir une instance de KTable en appelant la méthode KGroupedStream.reduce plus tard.

Qu'est-ce que l'interface KGroupedStream

Les méthodes KStream.groupBy et KStream.groupByKey renvoient une instance de KGroupedStream. KGroupedStream est une représentation intermédiaire du flux d'événements aprÚs regroupement par clé. Il n'est pas du tout destiné à fonctionner directement avec lui. Au lieu de cela, KGroupedStream est utilisé pour les opérations d'agrégation, dont le résultat est toujours KTable. Et comme le résultat des opérations d'agrégation est KTable et qu'elles utilisent le stockage d'état, il est possible que toutes les mises à jour en conséquence ne soient pas envoyées plus loin dans le pipeline.

La méthode KTable.groupBy renvoie un KGroupedTable similaire - une représentation intermédiaire du flux de mises à jour regroupées par clé.

Prenons une courte pause et regardons la fig. 5.9, qui montre ce que nous avons accompli. Cette topologie devrait dĂ©jĂ  vous ĂȘtre familiĂšre.

image

Voyons maintenant le code de cette topologie (il se trouve dans le fichier src / main / java / bbejeck / chapter_5 / AggregationsAndReducingExample.java) (Listing 5.2).

image

Le code donnĂ© diffĂšre par sa briĂšvetĂ© et un grand volume d'actions effectuĂ©es sur plusieurs lignes. Dans le premier paramĂštre de la mĂ©thode builder.stream, vous pouvez remarquer quelque chose de nouveau par vous-mĂȘme: la valeur du type Ă©numĂ©rĂ© AutoOffsetReset.EARLIEST (il existe Ă©galement LATEST), dĂ©finie Ă  l'aide de la mĂ©thode Consumed.withOffsetResetPolicy. En utilisant ce type Ă©numĂ©rĂ©, vous pouvez spĂ©cifier une stratĂ©gie de rĂ©initialisation des dĂ©calages pour chacun de KStream ou KTable; il a prioritĂ© sur le paramĂštre de rĂ©initialisation des dĂ©calages de la configuration.

GroupByKey et GroupBy

L'interface KStream propose deux méthodes de regroupement des enregistrements: GroupByKey et GroupBy. Les deux renvoient KGroupedTable, vous pourriez donc avoir une question légitime: quelle est la différence entre eux et quand utiliser lequel?

La méthode GroupByKey est utilisée lorsque les clés de KStream sont déjà non vides. Et surtout, l'indicateur «nécessite une nouvelle partition» n'a jamais été défini.

La méthode GroupBy suppose que vous avez modifié les clés de regroupement, donc l'indicateur de re-partitionnement est défini sur true. Effectuer des connexions, des agrégations, etc. aprÚs la méthode GroupBy entraßnera un re-partitionnement automatique.
Résumé: Vous devez utiliser GroupByKey plutÎt que GroupBy dans la mesure du possible.

Ce que font les mĂ©thodes mapValues ​​et groupBy est comprĂ©hensible, alors jetez un Ɠil Ă  la mĂ©thode sum () (elle se trouve dans le fichier src / main / java / bbejeck / model / ShareVolume.java) (Listing 5.3).

image

La méthode ShareVolume.sum renvoie le sous-total du volume des ventes d'actions et le résultat de toute la chaßne de calcul est un objet KTable <String, ShareVolume>. Vous comprenez maintenant quel rÎle joue KTable. Lorsque les objets ShareVolume arrivent, la derniÚre mise à jour actuelle est enregistrée dans le KTable correspondant. Il est important de ne pas oublier que toutes les mises à jour sont reflétées dans le précédent shareVolumeKTable, mais toutes ne sont pas envoyées plus loin.

De plus, avec l'aide de ce tableau, nous effectuons une agrégation (par le nombre d'actions vendues) afin d'obtenir les cinq sociétés avec les ventes d'actions les plus élevées dans chaque industrie. Nos actions dans ce cas seront similaires aux actions lors de la premiÚre agrégation.

  1. Effectuez une autre opération groupBy pour regrouper des objets ShareVolume individuels par secteur.
  2. Passez à résumer les objets ShareVolume. Cette fois, l'objet d'agrégation est une file d'attente prioritaire de taille fixe. Seules cinq sociétés avec le plus grand nombre d'actions vendues sont conservées dans une telle file d'attente de taille fixe.
  3. Affichez les lignes du paragraphe précédent dans une valeur de chaßne et retournez les cinq meilleures ventes par le nombre d'actions par industrie.
  4. Écrivez les rĂ©sultats sous forme de chaĂźne dans la rubrique.

Dans la fig. 5.10 montre un graphique de la topologie du mouvement des données. Comme vous pouvez le voir, le deuxiÚme cycle de traitement est assez simple.

image

Maintenant, aprÚs avoir bien compris la structure de ce deuxiÚme cycle de traitement, vous pouvez vous référer à son code source (vous le trouverez dans le fichier src / main / java / bbejeck / chapter_5 / AggregationsAndReducingExample.java) (Listing 5.4).

Il existe une variable fixedQueue dans cet initialiseur. Il s'agit d'un objet personnalisé - un adaptateur pour java.util.TreeSet, qui est utilisé pour suivre les résultats les plus élevés dans l'ordre décroissant du nombre de parts vendues.

image

Vous avez dĂ©jĂ  rencontrĂ© des appels Ă  groupBy et mapValues, nous ne nous arrĂȘterons donc pas sur eux (nous appelons la mĂ©thode KTable.toStream, car la mĂ©thode KTable.print est dĂ©conseillĂ©e). Mais vous n'avez pas encore vu la version KTable de la mĂ©thode d'agrĂ©gat (), nous allons donc passer un peu de temps Ă  en discuter.

Comme vous vous en souvenez, KTable se distingue par le fait que les enregistrements avec les mĂȘmes clĂ©s sont considĂ©rĂ©s comme des mises Ă  jour. KTable remplace l'ancien enregistrement par le nouveau. L'agrĂ©gation se dĂ©roule de la mĂȘme maniĂšre: les derniers enregistrements avec une clĂ© sont agrĂ©gĂ©s. Lorsqu'un enregistrement arrive, il est ajoutĂ© Ă  une instance de la classe FixedSizePriorityQueue Ă  l'aide d'un additionneur (le deuxiĂšme paramĂštre de l'appel Ă  la mĂ©thode d'agrĂ©gation), mais si un autre enregistrement avec la mĂȘme clĂ© existe dĂ©jĂ , l'ancien enregistrement est supprimĂ© Ă  l'aide du soustracteur (le troisiĂšme paramĂštre de l'appel Ă  la mĂ©thode d'agrĂ©gation).

Cela signifie que notre agrégateur, FixedSizePriorityQueue, n'agrÚge pas toutes les valeurs avec une seule clé, mais stocke la somme mobile des quantités N des types de stocks les plus vendus. Chaque entrée contient le nombre total d'actions vendues jusqu'à présent. KTable vous fournira des informations sur les actions des sociétés qui sont actuellement les plus vendues; l'agrégation continue de chaque mise à jour n'est pas requise.

Nous avons appris Ă  faire deux choses importantes:

  • regrouper les valeurs dans KTable par une clĂ© qui leur est commune;
  • Effectuez des opĂ©rations utiles telles que la convolution et l'agrĂ©gation sur ces valeurs groupĂ©es.

La capacité d'effectuer ces opérations est importante pour comprendre la signification des données qui transitent par l'application Kafka Streams et déterminer quelles informations elles contiennent.

Nous avons Ă©galement rassemblĂ© certains des concepts clĂ©s discutĂ©s plus haut dans ce livre. Au chapitre 4, nous avons parlĂ© de l'importance d'un Ă©tat local Ă  sĂ©curitĂ© intĂ©grĂ©e pour une application de streaming. Le premier exemple de ce chapitre a montrĂ© pourquoi l'État local est si important - il permet de suivre les informations que vous avez dĂ©jĂ  vues. L'accĂšs local Ă©vite les retards rĂ©seau, rendant l'application plus productive et rĂ©sistante aux erreurs.

Lorsque vous effectuez une opération de convolution ou d'agrégation, vous devez spécifier le nom du magasin d'état. Les opérations de convolution et d'agrégation renvoient une instance de KTable, et KTable utilise un magasin d'état pour remplacer les anciens résultats par de nouveaux. Comme vous l'avez vu, toutes les mises à jour ne sont pas envoyées plus loin dans le pipeline, ce qui est important, car les opérations d'agrégation sont conçues pour obtenir les informations finales. Si l'état local n'est pas appliqué, KTable enverra en outre tous les résultats d'agrégation et de convolution.

Ensuite, nous examinons l'exĂ©cution d'opĂ©rations telles que l'agrĂ©gation, dans une pĂ©riode de temps spĂ©cifique - les opĂ©rations dites de fenĂȘtrage.

5.3.2. OpĂ©rations de fenĂȘtre


Dans la section précédente, nous avons introduit la convolution et l'agrégation «roulante». L'application a effectué une convolution continue des ventes d'actions avec l'agrégation subséquente des cinq actions les plus vendues.

Parfois, une telle agrĂ©gation continue et convolution des rĂ©sultats est nĂ©cessaire. Et parfois, vous devez effectuer des opĂ©rations uniquement sur une pĂ©riode de temps donnĂ©e. Par exemple, calculez combien de transactions boursiĂšres ont Ă©tĂ© effectuĂ©es avec des actions d'une entreprise particuliĂšre au cours des 10 derniĂšres minutes. Ou combien d'utilisateurs ont cliquĂ© sur une nouvelle banniĂšre publicitaire au cours des 15 derniĂšres minutes. Une application peut effectuer de telles opĂ©rations plusieurs fois, mais avec des rĂ©sultats liĂ©s uniquement Ă  des intervalles de temps spĂ©cifiĂ©s (fenĂȘtres de temps).

Comptage des transactions d'échange par acheteur


Dans l'exemple suivant, nous serons engagés dans le suivi des transactions de change pour plusieurs commerçants - soit de grandes organisations, soit de simples financiers intelligents.

Il y a deux raisons possibles Ă  ce suivi. L'un d'eux est la nĂ©cessitĂ© de savoir ce que les leaders du marchĂ© achĂštent / vendent. Si ces grands acteurs et investisseurs avertis voient des opportunitĂ©s par eux-mĂȘmes, il est logique de suivre leur stratĂ©gie. La deuxiĂšme raison est le dĂ©sir de remarquer tout signe possible de transactions illĂ©gales utilisant des informations privilĂ©giĂ©es. Pour ce faire, vous devrez analyser la corrĂ©lation des fortes hausses des ventes avec les communiquĂ©s de presse importants.

Un tel suivi comprend des étapes telles que:

  • crĂ©er un flux pour la lecture du sujet des transactions boursiĂšres;
  • regroupement des enregistrements entrants par ID client et symbole boursier du stock. Un appel Ă  la mĂ©thode groupBy renvoie une instance de la classe KGroupedStream;
  • KGroupedStream.windowedBy renvoie un flux de donnĂ©es dĂ©limitĂ© par une fenĂȘtre temporaire, qui permet l'agrĂ©gation de fenĂȘtres. Selon le type de fenĂȘtre, TimeWindowedKStream ou SessionWindowedKStream est renvoyĂ©;
  • Comptage des transactions pour une opĂ©ration d'agrĂ©gation. Le flux de donnĂ©es de fenĂȘtre dĂ©termine si un enregistrement particulier est pris en compte dans ce calcul;
  • Ă©crire des rĂ©sultats dans une rubrique ou les afficher sur la console pendant le dĂ©veloppement.

La topologie de cette application est simple, mais son image visuelle ne fait pas de mal. Jetez un oeil Ă  la photo. 5.11.

De plus, nous considĂ©rerons la fonctionnalitĂ© des opĂ©rations de fenĂȘtre et le code correspondant.

image

Types de fenĂȘtres


Il existe trois types de fenĂȘtres dans Kafka Streams:

  • session
  • Tumbling (tumbling);
  • glissement / "saut" (glissement / saut).

Le choix dĂ©pend des besoins de l'entreprise. Les fenĂȘtres "Tumbling" et "jumping" sont limitĂ©es dans le temps, tandis que les restrictions de session sont associĂ©es aux actions de l'utilisateur - la durĂ©e de la ou des sessions est dĂ©terminĂ©e uniquement par le comportement actif de l'utilisateur. L'essentiel est de ne pas oublier que tous les types de fenĂȘtres sont basĂ©s sur les horodatages des enregistrements et non sur l'heure systĂšme.

Ensuite, nous implĂ©mentons notre topologie avec chacun des types de fenĂȘtres. Le code complet ne sera donnĂ© que dans le premier exemple, rien ne changera pour les autres types de fenĂȘtres, Ă  l'exception du type d'opĂ©ration de fenĂȘtre.

FenĂȘtres de session


Les fenĂȘtres de session sont trĂšs diffĂ©rentes de tous les autres types de fenĂȘtres. Ils sont limitĂ©s non pas tant par le temps que par l'activitĂ© de l'utilisateur (ou l'activitĂ© de l'entitĂ© que vous souhaitez suivre). Les fenĂȘtres de session sont dĂ©limitĂ©es par des pĂ©riodes d'inactivitĂ©.

La figure 5.12 illustre le concept des fenĂȘtres de session. Une session plus petite fusionnera avec la session Ă  sa gauche. Et la session de droite sera sĂ©parĂ©e, car elle suit une longue pĂ©riode d'inactivitĂ©. Les fenĂȘtres de session sont basĂ©es sur les actions de l'utilisateur, mais appliquent des horodatages Ă  partir des enregistrements pour dĂ©terminer Ă  quelle session l'enregistrement appartient.

image


Utilisation des fenĂȘtres de session pour suivre les transactions Exchange


Nous utiliserons des fenĂȘtres de session pour capturer des informations sur les transactions d'Ă©change. L'implĂ©mentation des fenĂȘtres de session est prĂ©sentĂ©e dans le Listing 5.5 (qui se trouve dans src / main / java / bbejeck / chapter_5 / CountingWindowingAndKTableJoinExample.java).

image

Vous avez déjà rencontré la plupart des opérations de cette topologie, il n'est donc pas nécessaire de les considérer ici à nouveau. Mais il y a plusieurs nouveaux éléments que nous allons discuter maintenant.

Pour toute opĂ©ration groupBy, une sorte d'opĂ©ration d'agrĂ©gation (agrĂ©gation, convolution ou comptage) est gĂ©nĂ©ralement effectuĂ©e. Vous pouvez effectuer une agrĂ©gation cumulative avec un total cumulĂ© ou une agrĂ©gation de fenĂȘtres, dans laquelle les enregistrements sont pris en compte dans une fenĂȘtre de temps donnĂ©e.

Le code du Listing 5.5 compte le nombre de transactions dans les fenĂȘtres de session. Dans la fig. 5.13 ces actions sont analysĂ©es Ă©tape par Ă©tape.

En appelant windowedBy (SessionWindows. With (vingt secondes). Jusqu'Ă  (quinze minutes)), nous crĂ©ons une fenĂȘtre de session avec un intervalle d'inactivitĂ© de 20 secondes et un intervalle de rĂ©tention de 15 minutes. Un intervalle d'inactivitĂ© de 20 secondes signifie que l'application inclura tout enregistrement qui arrive dans les 20 secondes suivant la fin ou le dĂ©but de la session en cours dans la session en cours (active).

image

Ensuite, nous indiquons quelle opĂ©ration d'agrĂ©gation effectuer dans la fenĂȘtre de session - dans ce cas, comptez. Si l'enregistrement entrant tombe en dehors de l'intervalle d'inactivitĂ© (de chaque cĂŽtĂ© du cachet de date / heure), l'application crĂ©e une nouvelle session. Un intervalle de sauvegarde signifie le maintien d'une session pendant un certain temps et permet des donnĂ©es en retard qui vont au-delĂ  de la pĂ©riode d'inactivitĂ© de la session mais peuvent toujours ĂȘtre attachĂ©es. De plus, le dĂ©but et la fin d'une nouvelle session rĂ©sultant de la fusion correspondent Ă  l'horodatage le plus ancien et le plus rĂ©cent.

Examinons quelques entrées de la méthode count pour voir comment les sessions fonctionnent (tableau 5.1).

image

Lors de la rĂ©ception des enregistrements, nous recherchons les sessions dĂ©jĂ  existantes avec la mĂȘme clĂ©, l'heure de fin est infĂ©rieure Ă  la date / heure actuelle - l'intervalle d'inactivitĂ© et l'heure de dĂ©but sont supĂ©rieures Ă  la date / heure actuelle + intervalle d'inactivitĂ©. Dans cet esprit, quatre enregistrements de la table. 5.1 fusionner en une seule session comme suit.

1. L'enregistrement 1 vient en premier, donc l'heure de début est égale à l'heure de fin et est 00:00:00.

2. Vient ensuite l'enregistrement 2, et nous recherchons des sessions qui se terminent au plus tÎt à 23:59:55 et commencent au plus tard à 00:00:35. Recherchez l'enregistrement 1 et combinez les sessions 1 et 2. Prenez l'heure de début de la session 1 (plus tÎt) et l'heure de fin de la session 2 (plus tard), de sorte que notre nouvelle session commence à 00:00:00 et se termine à 00:00:15.

3. L'enregistrement 3 arrive, nous recherchons des sessions entre 00:00:30 et 00:01:10 et n'en trouvons aucune. Ajoutez une deuxiÚme session pour la clé 123-345-654, FFBE, commençant et se terminant à 00:00:50.

4. L'enregistrement 4 arrive et nous recherchons des sessions entre 23:59:45 et 00:00:25. Cette fois, il y a deux sessions - 1 et 2. Les trois sessions sont combinées en une seule, avec une heure de début de 00:00:00 et une heure de fin de 00:00:15.

D'aprĂšs ce qui est dit dans cette section, il convient de se rappeler les nuances importantes suivantes:

  • Les sessions ne sont pas des fenĂȘtres de taille fixe. La durĂ©e d'une session est dĂ©terminĂ©e par l'activitĂ© dans une pĂ©riode de temps donnĂ©e;
  • Les horodatages dans les donnĂ©es dĂ©terminent si un Ă©vĂ©nement tombe dans une session existante ou dans une pĂ©riode d'inactivitĂ©.

Plus loin, nous discuterons du type de fenĂȘtres suivant - les fenĂȘtres de "saut pĂ©rilleux".

FenĂȘtres Ă  bascule


Les fenĂȘtres «tumbling» capturent les Ă©vĂ©nements qui se produisent dans une certaine pĂ©riode de temps. Imaginez que vous devez capturer toutes les transactions d'Ă©change d'une entreprise toutes les 20 secondes, afin de collecter tous les Ă©vĂ©nements de cette pĂ©riode. À la fin de l'intervalle de 20 secondes, la fenĂȘtre «bascule» et passe Ă  un nouvel intervalle d'observation de 20 secondes. La figure 5.14 illustre cette situation.

image

Comme vous pouvez le voir, tous les Ă©vĂ©nements reçus au cours des 20 derniĂšres secondes sont inclus dans la fenĂȘtre. À la fin de cette pĂ©riode, une nouvelle fenĂȘtre est créée.

Le listing 5.6 montre le code qui illustre l'utilisation de fenĂȘtres tumbling pour capturer les transactions d'Ă©change toutes les 20 secondes (vous pouvez le trouver dans src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java).

image

GrĂące Ă  cette petite modification de l'appel Ă  la mĂ©thode TimeWindows.of, vous pouvez utiliser la fenĂȘtre tumbling. Dans cet exemple, il n'y a aucun appel Ă  la mĂ©thode until (), Ă  la suite de quoi l'intervalle de sauvegarde par dĂ©faut de 24 heures sera utilisĂ©.

Enfin, il est temps de passer Ă  la derniĂšre des options de fenĂȘtre - saut de fenĂȘtres.

FenĂȘtres coulissantes ("sautantes")


Les fenĂȘtres coulissantes / «sautillantes» sont similaires au «culbutage», mais avec une lĂ©gĂšre diffĂ©rence. Les fenĂȘtres coulissantes n'attendent pas la fin de l'intervalle de temps avant de crĂ©er une nouvelle fenĂȘtre pour gĂ©rer les Ă©vĂ©nements rĂ©cents. Ils commencent de nouveaux calculs aprĂšs un intervalle d'attente plus court que la durĂ©e de la fenĂȘtre.

Pour illustrer les diffĂ©rences entre les fenĂȘtres «saut pĂ©rilleux» et «sautant», revenons Ă  l'exemple du calcul des opĂ©rations de change. Notre objectif, comme prĂ©cĂ©demment, est de compter le nombre de transactions, mais nous ne voudrions pas attendre tout le temps avant de mettre Ă  jour le compteur. Au lieu de cela, nous mettrons Ă  jour le compteur Ă  des intervalles plus courts. Par exemple, nous continuerons Ă  compter le nombre de transactions toutes les 20 secondes, mais Ă  mettre Ă  jour le compteur toutes les 5 secondes, comme le montre la Fig. 5.15. Dans le mĂȘme temps, nous avons trois fenĂȘtres de rĂ©sultats avec des donnĂ©es qui se chevauchent.

image

Le listing 5.7 montre le code pour spĂ©cifier les fenĂȘtres coulissantes (il peut ĂȘtre trouvĂ© dans src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java).

image

«» «» advanceBy(). 15 .

, . , , :

  • , ;
  • «» ;
  • «» , .

, KTable KStream .

5.3.3. KStream KTable


4 KStream. KTable KStream. . KStream — , KTable — , KTable.

. , .

  1. KTable KStream , , .
  2. KTable, . KTable .
  3. .

, .

KTable KStream


KTable KStream .

  1. KTable.toStream().
  2. KStream.map , Windowed TransactionSummary.

( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.8).

image

KStream.map, KStream .

, KTable .

KTable


, KTable ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.9).

image

, Serde , Serde. EARLIEST .

— .


. , ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.10).

image

leftJoin . 4, JoinWindow , KStream-KTable KTable . : KTable, . : KTable KStream .

KStream.

5.3.4. GlobalKTable


, . 4 KStream, — KStream KTable. . , Kafka Streams . , , ( 4, « » 4.2.4).


— , ; . , , .


, , , . Kafka Streams GlobalKTable.

GlobalKTable , . , , . GlobalKTable . .

KStream GlobalKTable


Dans la sous-section 5.3.2, nous avons procĂ©dĂ© Ă  l'agrĂ©gation des fenĂȘtres des transactions d'Ă©change par les clients. Les rĂ©sultats de cette agrĂ©gation ressemblaient Ă  ceci:

{customerId='074-09-3705', stockTicker='GUTM'}, 17 {customerId='037-34-5184', stockTicker='CORK'}, 16 

Bien que ces résultats soient conformes à l'objectif, il serait plus pratique d'afficher également le nom du client et le nom complet de l'entreprise. Pour ajouter le nom d'un client et le nom d'une entreprise, vous pouvez effectuer des connexions normales, mais vous devrez effectuer deux mappages de clés et re-partitionner. Avec GlobalKTable, vous pouvez éviter le coût de telles opérations.

Pour ce faire, nous allons utiliser l'objet countStream du Listing 5.11 (le code correspondant peut ĂȘtre trouvĂ© dans le fichier src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java), en le connectant avec deux objets GlobalKTable.

image

Nous en avons déjà discuté auparavant, donc je ne le répéterai pas. Mais je note que le code dans la fonction toStream (). Map est abstrait dans l'objet fonction pour des raisons de lisibilité au lieu de l'expression lambda intégrée.

L'Ă©tape suivante consiste Ă  dĂ©clarer deux instances de GlobalKTable (le code affichĂ© peut ĂȘtre trouvĂ© dans src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java) (Listing 5.12).

image


Notez que les noms de rubrique sont décrits à l'aide de types énumérés.

Maintenant que nous avons préparé tous les composants, il reste à écrire le code de la connexion (qui se trouve dans le fichier src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java) (Listing 5.13).

image

Bien qu'il existe deux composés dans ce code, ils sont organisés en chaßne, car aucun de leurs résultats n'est utilisé séparément. Les résultats sont affichés à la fin de toute l'opération.

Lorsque vous démarrez l'opération de connexion ci-dessus, vous obtiendrez les résultats suivants:

 {customer='Barney, Smith' company="Exxon", transactions= 17} 

L'essence n'a pas changé, mais ces résultats semblent plus clairs.

En comptant le chapitre 4, vous avez déjà vu plusieurs types de connexions en action. Ils sont répertoriés dans le tableau. 5.2. Ce tableau reflÚte la connectivité pertinente à la version 1.0.0 de Kafka Streams; quelque chose va changer dans les prochaines versions.

image

En conclusion, je vous rappelle l'essentiel: vous pouvez connecter des flux d'Ă©vĂ©nements (KStream) et des flux de mise Ă  jour (KTable) en utilisant l'Ă©tat local. De plus, si la taille des donnĂ©es de rĂ©fĂ©rence n'est pas trop grande, vous pouvez utiliser l'objet GlobalKTable. GlobalKTable rĂ©plique toutes les sections sur chacun des nƓuds de l'application Kafka Streams, garantissant ainsi la disponibilitĂ© de toutes les donnĂ©es quelle que soit la section Ă  laquelle la clĂ© correspond.

Ensuite, nous verrons la possibilité de flux Kafka, grùce auxquels vous pouvez observer les changements d'état sans consommer les données du sujet Kafka.

5.3.5. Statut de la demande


Nous avons déjà effectué plusieurs opérations impliquant l'état et toujours restituer les résultats à la console (à des fins de développement) ou les écrire dans le sujet (pour une opération industrielle). Lorsque vous écrivez des résultats dans un sujet, vous devez utiliser le consommateur Kafka pour les afficher.

La lecture des donnĂ©es de ces sujets peut ĂȘtre considĂ©rĂ©e comme une sorte de vues matĂ©rialisĂ©es. Pour nos tĂąches, nous pouvons utiliser la dĂ©finition d'une vue matĂ©rialisĂ©e de Wikipedia: «... un objet de base de donnĂ©es physique contenant les rĂ©sultats d'une requĂȘte. Par exemple, il peut s'agir d'une copie locale des donnĂ©es supprimĂ©es, ou d'un sous-ensemble des lignes et / ou colonnes d'une table ou des rĂ©sultats de jointure, ou d'un tableau croisĂ© dynamique obtenu Ă  l'aide de l'agrĂ©gation »(https://en.wikipedia.org/wiki/Materialized_view).

Kafka Streams vous permet Ă©galement d'effectuer des requĂȘtes interactives sur les magasins d'Ă©tat, ce qui vous permet de lire directement ces vues matĂ©rialisĂ©es. Il est important de noter que la demande au magasin d'Ă©tat est de la nature d'une opĂ©ration en lecture seule. GrĂące Ă  cela, vous ne pouvez pas avoir peur de rendre accidentellement l'Ă©tat d'une application incohĂ©rente lors du traitement des donnĂ©es.

La possibilité d'interroger directement les magasins d'état est importante. Cela signifie que vous pouvez créer des applications - des tableaux de bord sans avoir à recevoir au préalable les données d'un consommateur Kafka. Il augmente l'efficacité de l'application, car il n'est pas nécessaire d'enregistrer à nouveau les données:

  • En raison de la localisation des donnĂ©es, vous pouvez y accĂ©der rapidement;
  • La duplication des donnĂ©es est exclue, car elles ne sont pas Ă©crites sur un stockage externe.

La chose principale dont je voudrais que vous vous souveniez: vous pouvez exĂ©cuter directement les requĂȘtes d'Ă©tat depuis l'application. Vous ne pouvez pas surestimer les opportunitĂ©s que cela vous offre. Au lieu de consommer des donnĂ©es de Kafka et de stocker des enregistrements dans la base de donnĂ©es pour l'application, vous pouvez interroger les magasins d'Ă©tat avec le mĂȘme rĂ©sultat. Les demandes directes aux magasins d'Ă©tat signifient moins de code (pas de consommateur) et moins de logiciel (pas besoin d'une table de base de donnĂ©es pour stocker les rĂ©sultats).

Nous avons couvert une quantitĂ© considĂ©rable d'informations dans ce chapitre, nous allons donc arrĂȘter temporairement notre discussion sur les requĂȘtes interactives aux magasins d'État. Mais ne vous inquiĂ©tez pas: au chapitre 9, nous allons crĂ©er une application simple - un panneau d'informations avec des requĂȘtes interactives. Pour dĂ©montrer les requĂȘtes interactives et les possibilitĂ©s de les ajouter aux applications Kafka Streams, il utilisera certains des exemples de ce chapitre et des prĂ©cĂ©dents.

Résumé


  • Les objets KStream reprĂ©sentent des flux d'Ă©vĂ©nements comparables aux insertions de base de donnĂ©es. Les objets KTable reprĂ©sentent des flux de mise Ă  jour, ils sont plus similaires aux mises Ă  jour de la base de donnĂ©es. La taille de l'objet KTable n'augmente pas; les anciens enregistrements sont remplacĂ©s par de nouveaux.
  • Les objets KTable sont requis pour les opĂ©rations d'agrĂ©gation.
  • À l'aide des opĂ©rations de fenĂȘtre, vous pouvez diviser les donnĂ©es agrĂ©gĂ©es en paniers de temps.
  • GrĂące aux objets GlobalKTable, vous pouvez accĂ©der aux donnĂ©es de rĂ©fĂ©rence n'importe oĂč dans l'application, indĂ©pendamment de la section.
  • Les connexions entre les objets KStream, KTable et GlobalKTable sont possibles.

Jusqu'Ă  prĂ©sent, nous nous sommes concentrĂ©s sur la crĂ©ation d'applications Kafka Streams Ă  l'aide du DSL de haut niveau KStream. Bien qu'une approche de haut niveau vous permette de crĂ©er des programmes soignĂ©s et concis, son utilisation est un compromis certain. Travailler avec DSL KStream signifie augmenter la concision du code en rĂ©duisant le degrĂ© de contrĂŽle. Dans le chapitre suivant, nous allons examiner l'API de bas niveau des nƓuds de gestionnaire et essayer d'autres compromis. Les programmes deviendront plus longs qu'ils ne l'Ă©taient jusqu'Ă  prĂ©sent, mais nous aurons la possibilitĂ© de crĂ©er presque tous les nƓuds de traitement dont nous pourrions avoir besoin.

→ Plus de dĂ©tails sur le livre peuvent ĂȘtre trouvĂ©s sur le site Web de l'Ă©diteur

→ Pour Khabrozhiteley 25% de rĂ©duction sur le coupon - Kafka Streams

→ Lors du paiement de la version papier du livre, un livre Ă©lectronique est envoyĂ© par e-mail.

Source: https://habr.com/ru/post/fr457756/


All Articles