Hallo Habr!
Wir haben die letzten Reserven des Buches "
Apache Kafka. Stream-Verarbeitung und Datenanalyse " aufgedeckt und an die Druckvorstufe gesendet. Darüber hinaus haben wir einen Vertrag für das Buch "
Kafka Streams in Action " erhalten und beginnen nächste Woche damit, es wörtlich zu übersetzen.

Um den interessanten Fall der Verwendung der Kafka Streams-Bibliothek aufzuzeigen, haben wir beschlossen, den Artikel über das Event Sourcing-Paradigma in Kafka von Adam Worski zu übersetzen, dessen
Artikel über die Scala-Sprache vor zwei Wochen veröffentlicht wurde. Noch interessanter ist, dass die Meinung von Adam Worski nicht unbestreitbar ist:
Hier wird beispielsweise argumentiert, dass dieses Paradigma definitiv nicht für Kafka geeignet ist. Umso denkwürdiger, wir hoffen, wir bekommen den Eindruck des Artikels.
Der Begriff „Event Sourcing“ wird sowohl in unserer Veröffentlichung von
Clean Architecture von Robert Martin als auch in diesem Artikel als „Event Logging“ übersetzt. Wenn jemand von der Übersetzung von "Pumping Events" beeindruckt ist, lassen Sie es mich bitte wissen.
Wenn wir ein System erstellen, das die Registrierung von Ereignissen (Event Sourcing) ermöglicht, stehen wir früher oder später vor dem Problem der Persistenz (Persistenz) - und hier haben wir einige Optionen. Erstens gibt es
EventStore , eine ausgereifte Implementierung, die im Kampf gehärtet wurde. Alternativ können Sie die
Akka-Persistenz verwenden , um
die Skalierbarkeit von
Cassandra voll auszunutzen und sich auf die Leistung des
Akteurmodells zu verlassen. Eine weitere Option ist die gute alte
relationale Datenbank , in der der
CRUD
Ansatz mit der Verwendung von Ereignissen kombiniert wird und der maximale Nutzen aus Transaktionen herausgepresst wird.
Zusätzlich zu diesen (und vielleicht vielen anderen) Möglichkeiten, die sich aufgrund mehrerer kürzlich implementierter Dinge ergeben haben, ist es heute recht einfach geworden, die Registrierung von Veranstaltungen über
Kafka zu organisieren. Schauen wir uns an, wie.
Was ist Ereignisprotokollierung?Es gibt eine Reihe
ausgezeichneter Einführungsartikel zu diesem Thema, daher beschränke ich mich auf die prägnanteste Einführung. Bei der Registrierung von Ereignissen speichern wir nicht den „aktuellen“ Status der in unserem System verwendeten Entitäten, sondern den Ereignisstrom, der sich auf diese Entitäten bezieht. Jedes
Ereignis ist eine
Tatsache , die eine Zustandsänderung (bereits!) Beschreibt,
die mit dem Objekt
aufgetreten ist . Wie Sie wissen, werden die Fakten nicht diskutiert und bleiben
unverändert .
Wenn wir einen Strom solcher Ereignisse haben, kann der aktuelle Status einer Entität geklärt werden, indem alle damit verbundenen Ereignisse minimiert werden. Beachten Sie jedoch, dass das Gegenteil nicht möglich ist. Wenn wir nur den "aktuellen" Zustand beibehalten, verwerfen wir viele wertvolle chronologische Informationen.
Die Ereignisprotokollierung kann friedlich mit traditionelleren Methoden zum Speichern von Status
koexistieren . In der Regel verarbeitet das System eine Reihe von Entitätstypen (z. B. Benutzer, Bestellungen, Waren usw.), und es ist durchaus möglich, dass die Registrierung von Ereignissen nur für einige dieser Kategorien nützlich ist. Es ist wichtig anzumerken, dass wir hier nicht vor der Wahl von „alles oder nichts“ stehen; Es geht nur um die zusätzliche Statusverwaltungsfunktion in unserer Anwendung.
Veranstaltungsspeicher in KafkaDas erste zu lösende Problem: Wie speichere ich Ereignisse in Kafka? Es gibt drei mögliche Strategien:
- Speichern Sie alle Ereignisse für alle Arten von Entitäten in einem einzigen Thema (mit vielen Segmenten).
- Nach Thema für jeden Entitätstyp, d. H. Wir nehmen alle Ereignisse, die sich auf den Benutzer beziehen, in einem separaten Thema, in einem separaten - alle auf das Produkt bezogenen - usw. auf.
- Nach Themen, d. H. Nach einem separaten Thema für jeden bestimmten Benutzer und jeden Produktnamen
Die dritte Strategie (thematisch) ist praktisch nicht praktikabel. Wenn jeder neue Benutzer, wenn er im System erscheint, ein separates Thema starten müsste, würde die Anzahl der Themen bald unbegrenzt sein. Eine Aggregation wäre in diesem Fall sehr schwierig, zum Beispiel wäre es schwierig, alle Benutzer in einer Suchmaschine zu indizieren. Sie müssten nicht nur eine Vielzahl von Themen konsumieren, sondern auch nicht alle waren im Voraus bekannt.
Daher bleibt die Wahl zwischen 1 und 2. Beide Optionen haben ihre Vor- und Nachteile. Ein einziges Thema erleichtert es, eine
globale Ansicht aller Ereignisse zu erhalten. Auf der anderen Seite können Sie durch Hervorheben des Themas für jeden Entitätstyp den Fluss jeder Entität einzeln skalieren und segmentieren. Die Wahl einer von zwei Strategien hängt vom jeweiligen Anwendungsfall ab.
Darüber hinaus können Sie beide Strategien gleichzeitig implementieren, wenn Sie über zusätzlichen Speicherplatz verfügen: Erstellen Sie Themen nach Entitätstyp aus einem umfassenden Thema.

Im Rest des Artikels werden wir nur mit einem Entitätstyp und mit einem einzelnen Thema arbeiten, obwohl das präsentierte Material leicht extrapoliert und auf die Arbeit mit vielen Themen oder Entitätstypen angewendet werden kann.
(BEARBEITEN: Wie
Chris Hunt feststellte, gibt es
einen ausgezeichneten Artikel von Martin Kleppman , in dem ausführlich untersucht wurde, wie Ereignisse nach Thema und Segment verteilt werden können.)
Die einfachsten Speicheroperationen im EreignisprotokollierungsparadigmaDie einfachste Operation, die von einem Geschäft, das die Ereignisprotokollierung unterstützt, logisch zu erwarten ist, besteht darin, den "aktuellen" (minimierten) Status einer bestimmten Entität zu lesen. In der Regel hat jede Entität die eine oder andere
id
. Wenn wir diese
id
, sollte unser Speichersystem den aktuellen Status des Objekts zurückgeben.
Die Wahrheit im letzten Ausweg ist das Ereignisprotokoll: Der aktuelle Status kann immer aus dem Ereignisstrom abgeleitet werden, der einer bestimmten Entität zugeordnet ist. Dazu benötigt das Datenbankmodul eine reine Funktion (ohne Nebenwirkungen), die das Ereignis und den Anfangszustand akzeptiert und den geänderten Zustand zurückgibt:
Event = > State => State
. Bei Vorhandensein einer solchen Funktion und des
Werts des Anfangszustands ist der aktuelle Zustand eine
Faltung des Ereignisflusses (die Zustandsänderungsfunktion muss
sauber sein, damit sie wiederholt frei auf dieselben Ereignisse angewendet werden kann.)
Eine vereinfachte Implementierung der Operation "Aktuellen Status lesen" in Kafka sammelt einen Stream
aller Ereignisse aus dem Thema, filtert sie, lässt nur Ereignisse mit der angegebenen
id
übrig und wird mit der angegebenen Funktion reduziert. Wenn es viele Ereignisse gibt (und im Laufe der Zeit nur die Anzahl der Ereignisse zunimmt), kann dieser Vorgang langsam werden und viele Ressourcen verbrauchen. Selbst wenn das Ergebnis im Speicher zwischengespeichert und auf dem Dienstknoten gespeichert wird, müssen diese Informationen regelmäßig neu erstellt werden, z. B. aufgrund von Knotenfehlern oder aufgrund von Verdrängung der Cache-Daten.

Daher ist ein rationalerer Weg erforderlich. Hier bieten sich Kafka-Streams und State Repositories an. Kafka-Streams-Anwendungen werden auf einem ganzen Cluster von Knoten ausgeführt, die bestimmte Themen zusammen verwenden. Jedem Knoten wird eine Reihe von konsumierten Themensegmenten zugewiesen, genau wie beim normalen Kafka-Konsumenten. Kafka-Streams bieten jedoch Datenoperationen auf höherer Ebene, die das Erstellen abgeleiteter Streams erheblich vereinfachen.
Eine solche Operation in
Kafka-Streams ist die Faltung eines Streams im lokalen Speicher. Jeder lokale Speicher enthält nur Daten aus den Segmenten, die von einem bestimmten Knoten verwendet werden.
Standardmäßig sind zwei lokale Speicherimplementierungen verfügbar:
im RAM und basierend auf
RocksDB .
Zurück zum Thema Ereignisregistrierung: Wir stellen fest, dass es möglich ist, den Ereignisstrom im
Statusspeicher zu reduzieren, indem auf dem lokalen Knoten der "aktuelle Status" jeder Entität aus den dem Knoten zugewiesenen Segmenten gehalten wird. Wenn wir die Implementierung des auf RocksDB basierenden Statusspeichers verwenden, hängt die Anzahl der Entitäten, die wir auf einem einzelnen Knoten verfolgen können, nur von der Größe des Speicherplatzes ab.
So sieht die Faltung von Ereignissen im lokalen Speicher bei Verwendung der Java-API aus (serde bedeutet "Serializer / Deserializer"):
KStreamBuilder builder = new KStreamBuilder(); builder.stream(keySerde, valueSerde, "my_entity_events") .groupByKey(keySerde, valueSerde)
Ein vollständiges Beispiel
für die Auftragsabwicklung auf Basis von Microservices finden Sie auf der Confluent-Website.
(BEARBEITEN: Wie von
Sergei Egorov und
Nikita Salnikov auf Twitter festgestellt, müssen Sie für ein System mit Ereignisprotokollierung wahrscheinlich die Standard-Datenspeichereinstellungen in Kafka ändern, damit keine Zeit- oder Größenbeschränkungen funktionieren, und optional , Datenkomprimierung aktivieren.)
Aktuellen Status anzeigenWir haben ein Status-Repository erstellt, in dem sich die aktuellen Status aller Entitäten befinden, die aus Segmenten stammen, die dem Knoten zugewiesen sind. Wie kann dieses Repository jetzt angefordert werden? Wenn die Anforderung lokal ist (dh von demselben Knoten stammt, auf dem sich das Repository befindet), ist alles ziemlich einfach:
streams .store("my_entity_store", QueryableStoreTypes.keyValueStore()); .get(entityId);
Was aber, wenn wir Daten anfordern möchten, die sich auf einem anderen Knoten befinden? Und wie kann man herausfinden, was dieser Knoten ist? Hier ist eine weitere kürzlich in Kafka eingeführte Funktion nützlich:
interaktive Abfragen . Mit ihrer Hilfe können Sie auf die Kafka-Metadaten zugreifen und herausfinden, welcher Knoten das Themensegment mit der angegebenen
id
(in diesem Fall wird implizit das Tool zur Themensegmentierung verwendet):
metadataService .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)
Als nächstes müssen Sie die Anforderung irgendwie an den richtigen Knoten umleiten. Bitte beachten Sie: Die spezifische Art und Weise, wie die standortübergreifende Kommunikation implementiert und gehandhabt wird - ob REST, Akka-Remote oder eine andere -, gehört nicht zum Verantwortungsbereich von Kafka-Streams. Kafka bietet einfach Zugriff auf den Statusspeicher und gibt Auskunft darüber, auf welchem Knoten sich der Statusspeicher für die angegebene
id
.
NotfallwiederherstellungState Stores sehen gut aus, aber was passiert, wenn ein Knoten ausfällt? Die Rekonstruktion eines lokalen staatlichen Speichers für ein bestimmtes Segment kann ebenfalls eine kostspielige Operation sein. Dies kann für längere Zeit zu erhöhten Verzögerungen oder zum Verlust von Anforderungen führen, da Kafka-Streams neu ausgeglichen werden müssen (nach dem Hinzufügen oder Entfernen eines Knotens).
Aus diesem Grund werden standardmäßig langfristige Statusspeicher protokolliert. Das heißt, alle am Speicher vorgenommenen Änderungen werden zusätzlich in das Changelog-Thema geschrieben. Dieses Thema ist komprimiert (da wir für jede
id
nur den letzten Datensatz ohne Änderungsverlauf interessieren, da der Verlauf in den Ereignissen selbst gespeichert ist) - daher ist er so klein wie möglich. Aus diesem Grund kann die Wiederherstellung des Speichers auf einem anderen Knoten viel schneller erfolgen.
Bei einer Neuausrichtung sind in diesem Fall jedoch immer noch Verzögerungen möglich. Um sie weiter zu reduzieren, bieten kafka-Streams die Möglichkeit, mehrere
Sicherungsreplikate (
num.standby.replicas
) für jedes Repository zu
num.standby.replicas
. Diese Replikate wenden alle Aktualisierungen an, die aus Themen mit Änderungsprotokollen abgerufen wurden, sobald sie verfügbar sind, und können für ein bestimmtes Segment in den Hauptstatus-Speichermodus wechseln, sobald der aktuelle Hauptspeicher ausfällt.
KohärenzMit den Standardeinstellungen bietet Kafka mindestens eine einmalige Lieferung. Das heißt, im Falle eines Knotenausfalls können einige Nachrichten mehrmals zugestellt werden. Beispielsweise ist es möglich, dass ein bestimmtes Ereignis zweimal auf den Statusspeicher angewendet wird, wenn das System abstürzt, nachdem der Statusspeicher in das Protokoll geändert wurde, jedoch bevor der Offset für dieses bestimmte Ereignis ausgeführt wurde. Vielleicht verursacht dies keine Schwierigkeiten: Unsere Statusaktualisierungsfunktion (
Event = > State => State
) kann solche Situationen ganz normal bewältigen. Es ist jedoch möglicherweise nicht in der Lage, dies zu bewältigen: In einem solchen Fall können die von Kafka gewährten Garantien einer
streng einmaligen Lieferung verwendet werden. Solche Garantien gelten nur beim Lesen und Schreiben von Kafka-Themen. Dies tun wir jedoch hier:
Im Hintergrund werden alle Einträge in Kafka-Themen darauf reduziert, das Änderungsprotokoll für den Statusspeicher
zu aktualisieren und Offsets durchzuführen. All dies kann
in Form von Transaktionen erfolgen .
Wenn unsere Funktion zum Aktualisieren des Status dies erfordert, können wir daher die Semantik der Verarbeitung von Flüssen "ausschließlich einmalig" mithilfe einer einzigen Konfigurationsoption aktivieren:
processing.guarantee
. Aus diesem Grund sinkt die Leistung, aber nichts ist umsonst.
Ereignis hörenNachdem wir uns nun mit den Grundlagen befasst haben - den „aktuellen Status“ abzufragen und ihn für jede Entität zu aktualisieren -, was ist mit dem Auslösen von
Nebenwirkungen ? Irgendwann wird dies notwendig sein, zum Beispiel für:
- Senden von Benachrichtigungs-E-Mails
- Indizierung von Suchmaschinenentitäten
- Aufrufen externer Dienste über REST (oder SOAP, CORBA usw.)
Alle diese Aufgaben sind bis zu einem gewissen Grad blockierend und beziehen sich auf E / A-Vorgänge (dies ist natürlich für Nebenwirkungen). Daher ist es wahrscheinlich keine gute Idee, sie im Rahmen der Statusaktualisierungslogik auszuführen: Infolgedessen kann die Häufigkeit von Fehlern in der Hauptschleife zunehmen Ereignisse und in Bezug auf die Leistung wird es einen Engpass geben.
Darüber hinaus kann eine Funktion mit Statusaktualisierungslogik (E
Event = > State => State
Status
Event = > State => State
Status) mehrmals ausgeführt werden (bei Fehlern oder Neustarts), und meistens möchten wir die Anzahl der Fälle minimieren, in denen Nebenwirkungen für ein bestimmtes Ereignis mehrmals ausgeführt werden.
Glücklicherweise haben wir, da wir mit Kafka-Themen arbeiten, einiges an Flexibilität. In der Flows-Phase, in der der Statusspeicher aktualisiert wird, können Ereignisse unverändert (oder, falls erforderlich, auch in modifizierter Form) ausgegeben werden, und der resultierende Stream / das resultierende Thema (in Kafka sind diese Konzepte gleichwertig) kann nach Belieben verwendet werden. Darüber hinaus kann es entweder vor oder nach der Statusaktualisierungsphase verwendet werden. Schließlich können wir steuern, wie wir Nebenwirkungen auslösen: mindestens einmal oder höchstens einmal. Die erste Option ist verfügbar, wenn Sie den Versatz des verbrauchten Themenereignisses erst ausführen, nachdem alle Nebenwirkungen erfolgreich abgeschlossen wurden. Umgekehrt führen wir mit maximal einem Lauf Verschiebungen durch, bis Nebenwirkungen ausgelöst werden.
Es gibt verschiedene Möglichkeiten, um Nebenwirkungen auszulösen, die von der spezifischen praktischen Situation abhängen. Zunächst können Sie die Kafka-Streams-Phase definieren, in der Nebenwirkungen für jedes Ereignis als Teil der Stream-Verarbeitungsfunktion ausgelöst werden.
Das Einrichten eines solchen Mechanismus ist recht einfach, aber diese Lösung ist nicht flexibel, wenn Sie Wiederholungsversuche durchführen, Offsets steuern und Offsets für viele Ereignisse gleichzeitig konkurrieren müssen. In solch komplexeren Fällen kann es zweckmäßiger sein, die Verarbeitung unter Verwendung von beispielsweise
reaktivem Kafka oder einem anderen Mechanismus zu bestimmen, der Kafka-Themen "direkt" konsumiert.
Es ist auch möglich, dass ein Ereignis
andere Ereignisse auslöst. Beispielsweise kann das Ereignis "Bestellung" die Ereignisse "Vorbereitung für den Versand" und "Kundenbenachrichtigung" auslösen. Dies kann auch in der Kafka-Streams-Phase implementiert werden.
Wenn wir Ereignisse oder Daten, die aus Ereignissen extrahiert wurden, in einer Datenbank oder Suchmaschine speichern möchten, beispielsweise in ElasticSearch oder PostgreSQL, können wir den
Kafka Connect- Connector verwenden, der für uns alle Details zum Verbrauch von Themen verarbeitet.
Ansichten und Projektionen erstellenIn der Regel beschränken sich die Systemanforderungen nicht darauf, nur einzelne Entitätsströme abzufragen und zu verarbeiten. Aggregation, Kombination mehrerer Ereignisströme sollte ebenfalls unterstützt werden. Solche kombinierten Streams werden oft als
Projektionen bezeichnet . Wenn sie reduziert sind, können sie zum Erstellen von
Darstellungen von Daten verwendet werden . Ist es möglich, sie mit Kafka umzusetzen?

Wieder ja! Denken Sie daran, dass wir uns im Prinzip nur mit dem Thema Kafka befassen, in dem unsere Ereignisse gespeichert sind. Daher haben wir die ganze Macht der rohen Kafka-Verbraucher / Produzenten, des Kafka-Streams-Kombinierers und sogar von
KSQL - all dies wird für uns nützlich sein, um Projektionen zu definieren. Mit Kafka-Streams können Sie beispielsweise einen Stream filtern, anzeigen, nach Schlüsseln gruppieren, in temporären Fenstern oder Sitzungsfenstern aggregieren usw. entweder auf Codeebene oder mit SQL-ähnlichem KSQL.
Solche Flows können mithilfe von State Stores und interaktiven Abfragen für eine lange Zeit gespeichert und für Abfragen bereitgestellt werden, genau wie wir es mit einzelnen Entity Flows getan haben.
Was weiterUm den unendlichen Fluss von Ereignissen während der Entwicklung des Systems zu verhindern, kann eine Komprimierungsoption wie das Speichern von
Schnappschüssen des „aktuellen Status“ hilfreich sein. Daher können wir uns darauf beschränken, nur einige aktuelle Schnappschüsse und Ereignisse zu speichern, die nach ihrer Erstellung aufgetreten sind.
Obwohl Kafka keine direkte Unterstützung für Snapshots bietet (und in einigen anderen Systemen, die nach dem Prinzip der Ereignisaufzeichnung arbeiten), können Sie diese Art von Funktionalität auf jeden Fall selbst hinzufügen, indem Sie einige der oben genannten Mechanismen verwenden, z. B. Streams, Verbraucher, staatliche Geschäfte usw. d.
ZusammenfassungObwohl Kafka anfangs nicht mit Blick auf das Ereignisregistrierungsparadigma entwickelt wurde, handelt es sich tatsächlich um eine Streaming-Daten-Engine mit Unterstützung für
Themenreplikation , Segmentierung,
Status-Repositorys und
Streaming-APIs und ist gleichzeitig sehr flexibel. Daher können Sie zusätzlich zu Kafka problemlos ein Ereignisregistrierungssystem implementieren. Da wir vor dem Hintergrund von allem, was passiert, immer ein Kafka-Thema haben, erhalten wir zusätzliche Flexibilität, da wir entweder mit Streaming-APIs auf hoher Ebene oder mit Verbrauchern auf niedriger Ebene arbeiten können.