
Hallo habrozhiteli! Dieses Buch eignet sich für alle Entwickler, die die Streaming-Verarbeitung verstehen möchten. Wenn Sie die verteilte Programmierung verstehen, können Sie Kafka und Kafka Streams besser verstehen. Es wäre schön, das Kafka-Framework selbst zu kennen, aber das ist nicht notwendig: Ich werde Ihnen alles erzählen, was Sie brauchen. Dank dieses Buches lernen erfahrene Kafka-Entwickler wie Anfänger, wie sie mithilfe der Kafka Streams-Bibliothek interessante Streaming-Anwendungen erstellen. Fortgeschrittene und hochrangige Java-Entwickler, die bereits mit Konzepten wie der Serialisierung vertraut sind, lernen, wie sie ihre Fähigkeiten anwenden können, um Kafka Streams-Anwendungen zu erstellen. Der Quellcode des Buches ist in Java 8 geschrieben und verwendet im Wesentlichen die Syntax von Lambda-Ausdrücken von Java 8, sodass die Fähigkeit, mit Lambda-Funktionen (auch in einer anderen Programmiersprache) zu arbeiten, für Sie nützlich ist.
Auszug. 5.3. Aggregations- und Fensteroperationen
In diesem Abschnitt gehen wir zu den vielversprechendsten Teilen von Kafka Streams über. Bisher haben wir folgende Aspekte von Kafka Streams behandelt:
- Erstellen einer Verarbeitungstopologie;
- Verwendung des Status in Streaming-Anwendungen;
- Herstellen von Datenstromverbindungen;
- Unterschiede zwischen Ereignisströmen (KStream) und Aktualisierungsströmen (KTable).
In den folgenden Beispielen werden wir alle diese Elemente zusammenfassen. Darüber hinaus werden Sie in die Fensteroperationen eingeführt - eine weitere großartige Funktion von Streaming-Anwendungen. Unser erstes Beispiel wird die einfache Aggregation sein.
5.3.1. Aggregation der Aktienverkäufe nach Branchen
Aggregation und Gruppierung sind wichtige Werkzeuge für die Arbeit mit Streaming-Daten. Die Prüfung einzelner Unterlagen auf Zulassungsbasis reicht oft nicht aus. Um zusätzliche Informationen aus den Daten zu extrahieren, sind deren Gruppierung und Kombination erforderlich.
In diesem Beispiel müssen Sie den Anzug eines Intraday-Händlers anprobieren, der das Verkaufsvolumen von Aktien von Unternehmen in verschiedenen Branchen verfolgen muss. Sie interessieren sich insbesondere für die fünf Unternehmen mit den größten Umsatzverkäufen in jeder Branche.
Für eine solche Aggregation benötigen Sie mehrere der folgenden Schritte, um die Daten (allgemein) in die gewünschte Form zu übersetzen.
- Erstellen Sie eine themenbasierte Quelle, die Informationen zum Rohstoffhandel veröffentlicht. Wir müssen ein Objekt vom Typ StockTransaction einem Objekt vom Typ ShareVolume zuordnen. Tatsache ist, dass das StockTransaction-Objekt Verkaufsmetadaten enthält und wir nur Daten zur Anzahl der verkauften Aktien benötigen.
- Gruppieren Sie ShareVolume-Daten nach Aktiensymbolen. Nach der Gruppierung nach Symbolen können Sie diese Daten auf Zwischensummen der Aktienverkäufe reduzieren. Es ist erwähnenswert, dass die KStream.groupBy-Methode eine Instanz vom Typ KGroupedStream zurückgibt. Und Sie können eine KTable-Instanz erhalten, indem Sie später die KGroupedStream.reduce-Methode aufrufen.
Was ist die KGroupedStream-Schnittstelle?
Die Methoden KStream.groupBy und KStream.groupByKey geben eine Instanz von KGroupedStream zurück. KGroupedStream ist eine Zwischendarstellung des Ereignisstroms nach Gruppierung nach Schlüssel. Es ist überhaupt nicht dafür gedacht, direkt damit zu arbeiten. Stattdessen wird KGroupedStream für Aggregationsoperationen verwendet, deren Ergebnis immer KTable ist. Und da das Ergebnis von Aggregationsoperationen KTable ist und sie den Statusspeicher verwenden, ist es möglich, dass nicht alle Aktualisierungen als Ergebnis weiter unten in der Pipeline gesendet werden.
Die KTable.groupBy-Methode gibt eine ähnliche KGroupedTable zurück - eine Zwischendarstellung des nach Schlüssel neu gruppierten Aktualisierungsstroms.
Machen wir eine kurze Pause und schauen uns Abb. 5.9 zeigt, was wir erreicht haben. Diese Topologie sollte Ihnen bereits bekannt sein.
Schauen wir uns nun den Code für diese Topologie an (er befindet sich in der Datei src / main / java / bbejeck / kapitel_5 / AggregationsAndReducingExample.java) (Listing 5.2).
Der angegebene Code unterscheidet sich in der Kürze und in einer großen Anzahl von Aktionen, die in mehreren Zeilen ausgeführt werden. Im ersten Parameter der Methode builder.stream können Sie etwas Neues für sich feststellen: den Wert des Aufzählungstyps AutoOffsetReset.EARLIEST (es gibt auch LATEST), der mit der Methode Consumed.withOffsetResetPolicy festgelegt wurde. Mit diesem Aufzählungstyp können Sie eine Strategie zum Zurücksetzen von Offsets für KStream oder KTable angeben, die Vorrang vor dem Parameter zum Zurücksetzen von Offsets aus der Konfiguration hat.
GroupByKey und GroupBy
Die KStream-Schnittstelle verfügt über zwei Methoden zum Gruppieren von Datensätzen: GroupByKey und GroupBy. Beide geben KGroupedTable zurück, sodass Sie möglicherweise eine berechtigte Frage haben: Was ist der Unterschied zwischen ihnen und wann welche zu verwenden sind?
Die GroupByKey-Methode wird verwendet, wenn die Schlüssel in KStream bereits nicht leer sind. Und am wichtigsten ist, dass das Flag "Neupartitionierung erforderlich" nie gesetzt wurde.
Bei der GroupBy-Methode wird davon ausgegangen, dass Sie die Schlüssel für die Gruppierung geändert haben, sodass das Flag für die erneute Partitionierung auf true gesetzt ist. Das Durchführen von Verbindungen, Aggregationen usw. nach der GroupBy-Methode führt zu einer automatischen Neupartitionierung.
Zusammenfassung: Sie sollten nach Möglichkeit GroupByKey anstelle von GroupBy verwenden.
Was die Methoden mapValues und groupBy tun, ist verständlich. Schauen Sie sich also die sum () -Methode an (sie befindet sich in der Datei src / main / java / bbejeck / model / ShareVolume.java) (Listing 5.3).
Die ShareVolume.sum-Methode gibt die Zwischensumme des Aktienverkaufsvolumens zurück, und das Ergebnis der gesamten Berechnungskette ist ein KTable <String, ShareVolume> -Objekt. Jetzt verstehen Sie, welche Rolle KTable spielt. Wenn ShareVolume-Objekte eintreffen, wird das neueste aktuelle Update in der entsprechenden KTable gespeichert. Es ist wichtig, nicht zu vergessen, dass alle Aktualisierungen in der vorherigen shareVolumeKTable wiedergegeben werden, aber nicht alle weiter gesendet werden.
Darüber hinaus führen wir mit Hilfe dieser KTable eine Aggregation (nach Anzahl der verkauften Aktien) durch, um die fünf Unternehmen mit den höchsten Aktienverkäufen in jeder Branche zu erhalten. Unsere Aktionen in diesem Fall ähneln den Aktionen während der ersten Aggregation.
- Führen Sie eine weitere groupBy-Operation aus, um einzelne ShareVolume-Objekte nach Branchen zu gruppieren.
- Fahren Sie fort, um ShareVolume-Objekte zusammenzufassen. Dieses Mal ist das Aggregationsobjekt eine Prioritätswarteschlange fester Größe. Nur fünf Unternehmen mit der größten Anzahl verkaufter Aktien werden in einer solchen Warteschlange mit fester Größe gehalten.
- Zeigen Sie die Zeilen aus dem vorherigen Absatz in einem Zeichenfolgenwert an und geben Sie die fünf meistverkauften nach Anzahl der Aktien nach Branchen zurück.
- Schreiben Sie die Ergebnisse in Zeichenfolgenform in das Thema.
In Abb. 5.10 zeigt ein Diagramm der Topologie der Datenbewegung. Wie Sie sehen können, ist die zweite Verarbeitungsrunde recht einfach.
Nachdem Sie die Struktur dieser zweiten Verarbeitungsrunde klar verstanden haben, können Sie auf den Quellcode verweisen (Sie finden ihn in der Datei src / main / java / bbejeck / kapitel_5 / AggregationsAndReducingExample.java) (Listing 5.4).
In diesem Initialisierer befindet sich eine Variable fixedQueue. Dies ist ein benutzerdefiniertes Objekt - ein Adapter für java.util.TreeSet, mit dem N höchste Ergebnisse in absteigender Reihenfolge der Anzahl der verkauften Aktien verfolgt werden.
Sie haben bereits Aufrufe von groupBy und mapValues festgestellt, sodass wir nicht damit aufhören (wir rufen die KTable.toStream-Methode auf, da die KTable.print-Methode veraltet ist). Sie haben die KTable-Version der aggregate () -Methode jedoch noch nicht gesehen, daher werden wir einige Zeit damit verbringen, sie zu diskutieren.
Wie Sie sich erinnern, zeichnet sich KTable dadurch aus, dass Datensätze mit denselben Schlüsseln als Aktualisierungen betrachtet werden. KTable ersetzt den alten Datensatz durch den neuen. Die Aggregation erfolgt auf die gleiche Weise: Die letzten Datensätze mit einem Schlüssel werden aggregiert. Wenn ein Datensatz eintrifft, wird er mithilfe eines Addierers (der zweite Parameter im Aufruf der Aggregatmethode) zu einer Instanz der FixedSizePriorityQueue-Klasse hinzugefügt. Wenn jedoch bereits ein anderer Datensatz mit demselben Schlüssel vorhanden ist, wird der alte Datensatz mit dem Subtrahierer gelöscht (der dritte Parameter im Aufruf der Aggregatmethode).
Dies alles bedeutet, dass unser Aggregator FixedSizePriorityQueue nicht alle Werte mit einem Schlüssel aggregiert, sondern die gleitende Summe der Mengen N der meistverkauften Arten von Aktien speichert. Jeder Eintrag enthält die Gesamtzahl der bisher verkauften Aktien. KTable gibt Ihnen Auskunft darüber, welche Aktien von Unternehmen derzeit am meisten verkauft werden. Eine fortlaufende Aggregation jedes Updates ist nicht erforderlich.
Wir haben gelernt, zwei wichtige Dinge zu tun:
- Gruppieren Sie die Werte in KTable nach einem ihnen gemeinsamen Schlüssel.
- Führen Sie nützliche Operationen wie Faltung und Aggregation für diese gruppierten Werte aus.
Die Fähigkeit, diese Vorgänge auszuführen, ist wichtig, um die Bedeutung der Daten zu verstehen, die durch die Kafka Streams-Anwendung übertragen werden, und um herauszufinden, welche Informationen sie enthalten.
Wir haben auch einige der Schlüsselkonzepte zusammengestellt, die weiter oben in diesem Buch erörtert wurden. In Kapitel 4 haben wir darüber gesprochen, wie wichtig ein ausfallsicherer lokaler Status für eine Streaming-Anwendung ist. Das erste Beispiel in diesem Kapitel hat gezeigt, warum der lokale Status so wichtig ist - es ermöglicht, zu verfolgen, welche Informationen Sie bereits gesehen haben. Durch den lokalen Zugriff werden Netzwerkverzögerungen vermieden, wodurch die Anwendung produktiver und fehlerresistenter wird.
Wenn Sie eine Faltungs- oder Aggregationsoperation ausführen, müssen Sie den Namen des Statusspeichers angeben. Faltungs- und Aggregationsoperationen geben eine KTable-Instanz zurück, und KTable verwendet einen Statusspeicher, um alte Ergebnisse durch neue zu ersetzen. Wie Sie gesehen haben, werden nicht alle Aktualisierungen weiter unten in der Pipeline gesendet. Dies ist wichtig, da Aggregationsvorgänge darauf ausgelegt sind, die endgültigen Informationen zu erhalten. Wenn der lokale Status nicht angewendet wird, sendet KTable alle Aggregations- und Faltungsergebnisse weiter.
Als nächstes betrachten wir die Ausführung von Operationen wie der Aggregation innerhalb eines bestimmten Zeitraums - der sogenannten Fensteroperationen.
5.3.2. Fensteroperationen
Im vorherigen Abschnitt haben wir die „rollende“ Faltung und Aggregation eingeführt. Die Anwendung führte eine kontinuierliche Faltung der Aktienverkäufe mit anschließender Aggregation der fünf meistverkauften Aktien durch.
Manchmal ist eine solche kontinuierliche Aggregation und Faltung der Ergebnisse notwendig. Und manchmal müssen Sie Operationen nur in einem bestimmten Zeitraum ausführen. Berechnen Sie beispielsweise, wie viele Börsentransaktionen mit Aktien eines bestimmten Unternehmens in den letzten 10 Minuten durchgeführt wurden. Oder wie viele Nutzer in den letzten 15 Minuten auf ein neues Werbebanner geklickt haben. Eine Anwendung kann solche Vorgänge mehrmals ausführen, wobei sich die Ergebnisse jedoch nur auf bestimmte Zeitintervalle (Zeitfenster) beziehen.
Zählen von Umtauschtransaktionen durch den Käufer
Im folgenden Beispiel werden wir Börsentransaktionen für mehrere Händler verfolgen - entweder große Organisationen oder intelligente Einhandfinanzierer.
Es gibt zwei mögliche Gründe für diese Verfolgung. Eine davon ist die Notwendigkeit zu wissen, welche Marktführer kaufen / verkaufen. Wenn diese großen Akteure und anspruchsvollen Investoren Chancen für sich selbst sehen, ist es sinnvoll, ihrer Strategie zu folgen. Der zweite Grund ist der Wunsch, mögliche Anzeichen für illegale Transaktionen mithilfe von Insiderinformationen zu erkennen. Dazu müssen Sie die Korrelation großer Umsatzspitzen mit wichtigen Pressemitteilungen analysieren.
Eine solche Verfolgung besteht aus folgenden Schritten:
- Erstellen eines Streams zum Lesen aus dem Thema Börsentransaktionen;
- Gruppierung eingehender Datensätze nach Kunden-ID und Bestandsymbol des Bestands. Ein Aufruf der groupBy-Methode gibt eine Instanz der KGroupedStream-Klasse zurück.
- KGroupedStream.windowedBy gibt einen Datenstrom zurück, der durch ein temporäres Fenster begrenzt ist und die Fensteraggregation ermöglicht. Je nach Fenstertyp wird entweder TimeWindowedKStream oder SessionWindowedKStream zurückgegeben.
- Zählen von Transaktionen für eine Aggregationsoperation. Der Fensterdatenstrom bestimmt, ob ein bestimmter Datensatz bei dieser Berechnung berücksichtigt wird.
- Schreiben Sie Ergebnisse in ein Thema oder geben Sie sie während der Entwicklung an die Konsole aus.
Die Topologie dieser Anwendung ist einfach, aber das visuelle Bild tut nicht weh. Schauen Sie sich das Bild an. 5.11.
Weiter werden wir die Funktionalität von Fensteroperationen und den entsprechenden Code betrachten.
Fenstertypen
In Kafka Streams gibt es drei Arten von Fenstern:
- Sitzung
- Stolpern (Stolpern);
- gleiten / "springen" (gleiten / hüpfen).
Welche Sie wählen müssen, hängt von den Geschäftsanforderungen ab. Die Fenster "Tumbling" und "Jumping" sind zeitlich begrenzt, während Sitzungsbeschränkungen mit Benutzeraktionen verbunden sind. Die Dauer der Sitzung (en) hängt ausschließlich davon ab, wie aktiv sich der Benutzer verhält. Die Hauptsache ist nicht zu vergessen, dass alle Fenstertypen auf Datums- / Zeitstempeln von Datensätzen und nicht auf der Systemzeit basieren.
Als nächstes implementieren wir unsere Topologie mit jedem der Fenstertypen. Der vollständige Code wird nur im ersten Beispiel angegeben. Für andere Fenstertypen ändert sich nichts, außer für die Art der Fensteroperation.
Sitzungsfenster
Sitzungsfenster unterscheiden sich stark von allen anderen Fenstertypen. Sie sind weniger zeitlich als vielmehr durch die Aktivität des Benutzers (oder die Aktivität der Entität, die Sie verfolgen möchten) begrenzt. Sitzungsfenster werden durch Inaktivitätsperioden begrenzt.
Abbildung 5.12 zeigt das Konzept der Sitzungsfenster. Eine kleinere Sitzung wird mit der Sitzung auf der linken Seite zusammengeführt. Und die Sitzung auf der rechten Seite wird getrennt sein, da sie auf eine lange Zeit der Inaktivität folgt. Sitzungsfenster basieren auf Benutzeraktionen, wenden jedoch Datums- / Zeitstempel aus Datensätzen an, um zu bestimmen, zu welcher Sitzung der Datensatz gehört.
Verwenden von Sitzungsfenstern zum Verfolgen von Exchange-Transaktionen
Wir werden Sitzungsfenster verwenden, um Informationen über Austauschtransaktionen zu erfassen. Die Implementierung der Sitzungsfenster ist in Listing 5.5 dargestellt (zu finden in src / main / java / bbejeck / kapitel_5 / CountingWindowingAndKTableJoinExample.java).
Sie haben die meisten Operationen dieser Topologie bereits ausgeführt, sodass Sie sie hier nicht erneut betrachten müssen. Es gibt jedoch einige neue Elemente, die wir jetzt diskutieren werden.
Für jede groupBy-Operation wird normalerweise eine Art Aggregationsoperation (Aggregation, Faltung oder Zählung) ausgeführt. Sie können entweder eine kumulative Aggregation mit einer kumulierten Summe oder eine Fensteraggregation durchführen, bei der Datensätze innerhalb eines bestimmten Zeitfensters berücksichtigt werden.
Der Code in Listing 5.5 zählt die Anzahl der Transaktionen in Sitzungsfenstern. In Abb. 5.13 Diese Aktionen werden Schritt für Schritt analysiert.
Durch Aufrufen von windowedBy (SessionWindows.with (zwanzig Sekunden). Bis (fünfzehn Minuten)) erstellen wir ein Sitzungsfenster mit einem Leerlaufintervall von 20 Sekunden und einem Aufbewahrungsintervall von 15 Minuten. Ein Inaktivitätsintervall von 20 Sekunden bedeutet, dass die Anwendung alle Datensätze enthält, die innerhalb von 20 Sekunden nach dem Ende oder Beginn der aktuellen Sitzung in der aktuellen (aktiven) Sitzung eintreffen.

Als nächstes geben wir an, welche Aggregationsoperation im Sitzungsfenster ausgeführt werden soll - in diesem Fall zählen. Wenn der eingehende Datensatz außerhalb des Inaktivitätsintervalls (auf beiden Seiten des Datums- / Zeitstempels) liegt, erstellt die Anwendung eine neue Sitzung. Ein Speicherintervall bedeutet, dass eine Sitzung für eine bestimmte Zeit beibehalten wird und späte Daten berücksichtigt werden, die über den Zeitraum der Inaktivität der Sitzung hinausgehen, aber dennoch angehängt werden können. Darüber hinaus entsprechen der Beginn und das Ende einer neuen Sitzung, die sich aus der Zusammenführung ergibt, dem frühesten und spätesten Datums- / Zeitstempel.
Schauen wir uns einige Einträge aus der Zählmethode an, um zu sehen, wie die Sitzungen funktionieren (Tabelle 5.1).
Nach Erhalt der Aufzeichnungen suchen wir nach bereits vorhandenen Sitzungen mit demselben Schlüssel. Die Endzeit ist kleiner als der aktuelle Datums- / Zeitstempel - das Inaktivitätsintervall und die Startzeit sind länger als der aktuelle Datums- / Zeitstempel + Inaktivitätsintervall. In diesem Sinne vier Datensätze aus der Tabelle 5.1 wie folgt zu einer einzigen Sitzung zusammenführen.
1. Datensatz 1 steht an erster Stelle, daher entspricht die Startzeit der Endzeit und ist 00:00:00.
2. Als nächstes kommt Datensatz 2, und wir suchen nach Sitzungen, die frühestens um 23:59:55 Uhr enden und spätestens um 00:00:35 Uhr beginnen. Suchen Sie Datensatz 1 und kombinieren Sie die Sitzungen 1 und 2. Nehmen Sie die Startzeit von Sitzung 1 (früher) und die Endzeit von Sitzung 2 (später), sodass unsere neue Sitzung um 00:00:00 Uhr beginnt und um 00:00:15 Uhr endet.
3. Datensatz 3 kommt an, wir suchen nach Sitzungen zwischen 00:00:30 und 00:01:10 und finden keine. Fügen Sie eine zweite Sitzung für den Schlüssel 123-345-654, FFBE, hinzu, die um 00:00:50 beginnt und endet.
4. Datensatz 4 kommt an und wir suchen nach Sitzungen zwischen 23:59:45 und 00:00:25. Diesmal gibt es beide Sitzungen - 1 und 2. Alle drei Sitzungen werden zu einer zusammengefasst, mit einer Startzeit von 00:00:00 und einer Endzeit von 00:00:15.
Nach dem, was in diesem Abschnitt gesagt wird, sollten die folgenden wichtigen Nuancen beachtet werden:
- Sitzungen sind keine Fenster mit fester Größe. Die Dauer einer Sitzung wird durch die Aktivität innerhalb eines bestimmten Zeitraums bestimmt.
- Datums- / Zeitstempel in den Daten bestimmen, ob ein Ereignis in eine vorhandene Sitzung oder in einen Zeitraum der Inaktivität fällt.
Weiter werden wir den folgenden Fenstertyp diskutieren - "Salto" -Fenster.
Fenster stürzen
"Tumbling" -Fenster erfassen Ereignisse, die in einen bestimmten Zeitraum fallen. Stellen Sie sich vor, Sie müssen alle 20 Sekunden alle Umtauschtransaktionen eines Unternehmens erfassen, damit Sie alle Ereignisse für diesen Zeitraum erfassen können. Am Ende des 20-Sekunden-Intervalls „stolpert“ das Fenster und wechselt zu einem neuen 20-Sekunden-Beobachtungsintervall. Abbildung 5.14 zeigt diese Situation.
Wie Sie sehen können, werden alle Ereignisse, die in den letzten 20 Sekunden empfangen wurden, in das Fenster aufgenommen. Am Ende dieses Zeitraums wird ein neues Fenster erstellt.
Listing 5.6 zeigt den Code, der die Verwendung von Tumbling-Fenstern zum Erfassen von Austauschtransaktionen alle 20 Sekunden demonstriert (Sie finden ihn in src / main / java / bbejeck / kapitel_5 / CountingWindowingAndKtableJoinExample.java).
Dank dieser kleinen Änderung beim Aufruf der Methode TimeWindows.of können Sie das Tumbling-Fenster verwenden. In diesem Beispiel wird die Methode till () nicht aufgerufen, wodurch das Standardspeicherintervall von 24 Stunden verwendet wird.
Schließlich ist es Zeit, mit der letzten der Fensteroptionen fortzufahren - dem Hüpfen von Fenstern.
Schiebefenster ("springen")
Schiebe- / "Sprung" -Fenster ähneln dem "Stolpern", unterscheiden sich jedoch geringfügig. Schiebefenster warten nicht auf das Ende des Zeitintervalls, bevor sie ein neues Fenster erstellen, um die letzten Ereignisse zu verarbeiten. Sie starten neue Berechnungen nach einem Wartezeitintervall, das kürzer als die Fensterdauer ist.
Um die Unterschiede zwischen "Salto" - und "Sprung" -Fenstern zu veranschaulichen, kehren wir zum Beispiel mit der Berechnung von Börsentransaktionen zurück. Unser Ziel ist es nach wie vor, die Anzahl der Transaktionen zu zählen, aber wir möchten nicht die ganze Zeit warten, bevor wir den Zähler aktualisieren. Stattdessen aktualisieren wir den Zähler in kürzeren Intervallen. Zum Beispiel werden wir weiterhin alle 20 Sekunden die Anzahl der Transaktionen zählen, aber den Zähler alle 5 Sekunden aktualisieren, wie in Abb. 2 gezeigt. 5.15. Gleichzeitig haben wir drei Ergebnisfenster mit überlappenden Daten.
Listing 5.7 zeigt den Code zum Festlegen von Schiebefenstern (er befindet sich in src / main / java / bbejeck / kapitel_5 / CountingWindowingAndKtableJoinExample.java).
«» «» advanceBy(). 15 .
, . , , :
, KTable KStream .
5.3.3. KStream KTable
4 KStream. KTable KStream. . KStream — , KTable — , KTable.
. , .
- KTable KStream , , .
- KTable, . KTable .
- .
, .
KTable KStream
KTable KStream .
- KTable.toStream().
- KStream.map , Windowed TransactionSummary.
( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.8).
KStream.map, KStream .
, KTable .
KTable
, KTable ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.9).
, Serde , Serde. EARLIEST .
— .
. , ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.10).
leftJoin . 4, JoinWindow , KStream-KTable KTable . : KTable, . : KTable KStream .
KStream.
5.3.4. GlobalKTable
, . 4 KStream, — KStream KTable. . , Kafka Streams . , , ( 4, « » 4.2.4).
— , ; . , , .
, , , . Kafka Streams GlobalKTable.
GlobalKTable , . , , . GlobalKTable . .
KStream GlobalKTable
5.3.2 . :
{customerId='074-09-3705', stockTicker='GUTM'}, 17 {customerId='037-34-5184', stockTicker='CORK'}, 16
Obwohl diese Ergebnisse mit dem Ziel übereinstimmten, wäre es bequemer, wenn auch der Name des Kunden und der vollständige Name des Unternehmens angezeigt würden. Um den Namen eines Kunden und eines Unternehmens hinzuzufügen, können Sie normale Verbindungen herstellen, müssen jedoch zwei Schlüsselzuordnungen vornehmen und neu partitionieren. Mit GlobalKTable können Sie die Kosten solcher Vorgänge vermeiden.
Dazu verwenden wir das countStream-Objekt aus Listing 5.11 (der entsprechende Code befindet sich in der Datei src / main / java / bbejeck / kapitel_5 / GlobalKTableExample.java) und verbinden es mit zwei GlobalKTable-Objekten.
Wir haben dies bereits zuvor besprochen, daher werde ich es nicht wiederholen. Ich stelle jedoch fest, dass der Code in der toStream (). Map-Funktion aus Gründen der Lesbarkeit anstelle des eingebetteten Lambda-Ausdrucks in das Funktionsobjekt abstrahiert wird.
Der nächste Schritt besteht darin, zwei Instanzen von GlobalKTable zu deklarieren (der angezeigte Code befindet sich in src / main / java / bbejeck / kapitel_5 / GlobalKTableExample.java) (Listing 5.12).
Beachten Sie, dass Themennamen mit Aufzählungstypen beschrieben werden.
Nachdem wir alle Komponenten vorbereitet haben, muss noch der Code für die Verbindung geschrieben werden (der in der Datei src / main / java / bbejeck / kapitel_5 / GlobalKTableExample.java zu finden ist) (Listing 5.13).
Obwohl dieser Code zwei Verbindungen enthält, sind sie in einer Kette organisiert, da keines ihrer Ergebnisse separat verwendet wird. Die Ergebnisse werden am Ende des gesamten Vorgangs angezeigt.
Wenn Sie den obigen Verbindungsvorgang starten, erhalten Sie die folgenden Ergebnisse:
{customer='Barney, Smith' company="Exxon", transactions= 17}
Das Wesen hat sich nicht geändert, aber diese Ergebnisse sehen klarer aus.
Wenn Sie Kapitel 4 zählen, haben Sie bereits verschiedene Arten von Verbindungen in Aktion gesehen. Sie sind in der Tabelle aufgeführt. 5.2. Diese Tabelle spiegelt die für Version 1.0.0 von Kafka Streams relevante Konnektivität wider. In zukünftigen Versionen wird sich etwas ändern.
Abschließend möchte ich Sie an die Hauptsache erinnern: Sie können Ereignisströme (KStream) und Aktualisierungsströme (KTable) über den lokalen Status verbinden. Wenn die Referenzdaten nicht zu groß sind, können Sie außerdem das GlobalKTable-Objekt verwenden. GlobalKTable repliziert alle Abschnitte auf jeden der Knoten der Kafka Streams-Anwendung und stellt so die Verfügbarkeit aller Daten sicher, unabhängig davon, welchem Abschnitt der Schlüssel entspricht.
Als nächstes sehen wir die Möglichkeit von Kafka-Streams, dank derer Sie Statusänderungen beobachten können, ohne Daten aus dem Kafka-Thema zu verbrauchen.
5.3.5. Status anfordern
Wir haben bereits mehrere Operationen durchgeführt, an denen der Staat beteiligt ist, und geben die Ergebnisse immer an die Konsole aus (für Entwicklungszwecke) oder schreiben sie in das Thema (für den industriellen Betrieb). Wenn Sie Ergebnisse zu einem Thema schreiben, müssen Sie den Kafka-Consumer verwenden, um sie anzuzeigen.
Das Lesen von Daten aus diesen Themen kann als eine Art materialisierte Ansichten betrachtet werden. Für unsere Aufgaben können wir die Definition einer materialisierten Ansicht aus Wikipedia verwenden: „... ein physisches Datenbankobjekt, das die Ergebnisse einer Abfrage enthält. Dies kann beispielsweise eine lokale Kopie gelöschter Daten oder eine Teilmenge der Zeilen und / oder Spalten einer Tabelle oder eines Verknüpfungsergebnisses oder eine mithilfe der Aggregation erhaltene Pivot-Tabelle sein “(https://en.wikipedia.org/wiki/Materialized_view).
Mit Kafka Streams können Sie auch interaktive Abfragen in State Stores durchführen, mit denen Sie diese materialisierten Ansichten direkt lesen können. Es ist wichtig zu beachten, dass die Anforderung an den Statusspeicher eine schreibgeschützte Operation ist. Dank dessen können Sie keine Angst haben, den Status versehentlich zu einer Anwendung zu machen, die während der Datenverarbeitung inkonsistent ist.
Die Möglichkeit, Statusspeicher direkt abzufragen, ist wichtig. Dies bedeutet, dass Sie Anwendungen erstellen können - Dashboards, ohne zuvor Daten von einem Kafka-Verbraucher empfangen zu müssen. Dies erhöht die Effizienz der Anwendung, da keine erneuten Datenaufzeichnungen erforderlich sind:
- Aufgrund der Lokalität der Daten kann schnell auf sie zugegriffen werden.
- Eine Vervielfältigung von Daten ist ausgeschlossen, da diese nicht in einen externen Speicher geschrieben werden.
Die Hauptsache, an die Sie sich erinnern sollten: Sie können Statusanforderungen direkt von der Anwendung ausführen. Sie können die Chancen, die sich daraus ergeben, nicht überschätzen. Anstatt Daten von Kafka zu verbrauchen und Datensätze in der Datenbank für die Anwendung zu speichern, können Sie Statusspeicher mit demselben Ergebnis abfragen. Direkte Anfragen an staatliche Speicher bedeuten weniger Code (kein Verbraucher) und weniger Software (keine Datenbanktabelle zum Speichern der Ergebnisse erforderlich).
Wir haben in diesem Kapitel eine beträchtliche Menge an Informationen behandelt, daher werden wir unsere Diskussion über interaktive Anfragen an staatliche Geschäfte vorübergehend einstellen. Aber keine Sorge: In Kapitel 9 erstellen wir eine einfache Anwendung - ein Informationsfenster mit interaktiven Abfragen. Um interaktive Abfragen und die Möglichkeiten zum Hinzufügen zu Kafka Streams-Anwendungen zu demonstrieren, werden einige Beispiele aus diesem und den vorherigen Kapiteln verwendet.
Zusammenfassung
- KStream-Objekte stellen Ereignisströme dar, die mit Datenbankeinfügungen vergleichbar sind. KTable-Objekte stellen Aktualisierungsströme dar. Sie ähneln Aktualisierungen in der Datenbank. Die Größe des KTable-Objekts wächst nicht, alte Datensätze werden durch neue ersetzt.
- KTable-Objekte sind für Aggregationsvorgänge erforderlich.
- Mithilfe von Fensteroperationen können Sie aggregierte Daten in Zeitkörbe aufteilen.
- Dank GlobalKTable-Objekten können Sie unabhängig von der Aufteilung überall in der Anwendung auf Referenzdaten zugreifen.
- Verbindungen zwischen Objekten KStream, KTable und GlobalKTable sind möglich.
Bisher haben wir uns auf die Erstellung von Kafka Streams-Anwendungen mit dem hochrangigen KStream DSL konzentriert. Obwohl Sie mit einem Ansatz auf hoher Ebene übersichtliche und präzise Programme erstellen können, ist die Verwendung ein eindeutiger Kompromiss. Die Arbeit mit DSL KStream bedeutet, die Prägnanz des Codes zu erhöhen, indem der Grad der Kontrolle verringert wird. Im nächsten Kapitel werden wir uns die Low-Level-API der Handlerknoten ansehen und andere Kompromisse ausprobieren. Programme werden länger als bisher, aber wir haben die Möglichkeit, fast jeden Verarbeitungsknoten zu erstellen, den wir möglicherweise benötigen.
→ Weitere Informationen zum Buch finden Sie auf
der Website des Herausgebers→ Für Khabrozhiteley 25% Rabatt auf Gutschein -
Kafka Streams→ Nach Zahlung der Papierversion des Buches wird ein elektronisches Buch per E-Mail verschickt.