Wie Kafka wahr wurde


Hallo habr


Ich arbeite im Tinkoff-Team, das ein eigenes Benachrichtigungszentrum entwickelt. Zum größten Teil entwickle ich in Java mit Spring Boot und löse verschiedene technische Probleme, die im Projekt auftreten.


Die meisten unserer Microservices interagieren asynchron miteinander über einen Nachrichtenbroker. Zuvor verwendeten wir IBM MQ als Broker, der die Last nicht mehr bewältigte, aber gleichzeitig hohe Zustellgarantien hatte.


Als Ersatz wurde uns Apache Kafka angeboten, das eine hohe Skalierbarkeit aufweist, aber leider einen fast individuellen Konfigurationsansatz für verschiedene Szenarien erfordert. Darüber hinaus ermöglichte der mindestens einmalige Übermittlungsmechanismus, der standardmäßig in Kafka funktioniert, nicht die Einhaltung des erforderlichen Konsistenzniveaus. Als nächstes werde ich unsere Erfahrungen mit der Konfiguration von Kafka teilen und Ihnen insbesondere erklären, wie Sie genau eine Lieferung konfigurieren und damit leben können.


Garantierte Lieferung und mehr


Die Parameter, die später erläutert werden, tragen dazu bei, eine Reihe von Problemen mit den Standardverbindungseinstellungen zu vermeiden. Aber zuerst möchte ich auf einen Parameter achten, der ein mögliches Debugging erleichtert.


Client.id für Produzenten und Konsumenten hilft dabei. Auf den ersten Blick können Sie den Namen der Anwendung als Wert verwenden. In den meisten Fällen funktioniert dies. Die Situation, in der mehrere Consumer in der Anwendung verwendet werden und Sie denselben Client angeben, führt zu der folgenden Warnung:


org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0 

Wenn Sie JMX in einer Anwendung mit Kafka verwenden möchten, kann dies ein Problem sein. In diesem Fall ist es am besten, eine Kombination aus dem Anwendungsnamen und beispielsweise dem Themennamen als Wert für client.id zu verwenden. Das Ergebnis unserer Konfiguration ist in der Ausgabe des Befehls kafka-consumer-groups der Dienstprogramme von Confluent zu sehen:



Jetzt analysieren wir das Szenario der garantierten Nachrichtenübermittlung. Kafka Producer verfügt über einen Acks- Parameter, mit dem Sie konfigurieren können, nach wie vielen Bestätigungen der Cluster-Leiter die erfolgreich aufgezeichnete Nachricht berücksichtigen muss. Dieser Parameter kann folgende Werte annehmen:


  • 0 - Acknowledge wird nicht berücksichtigt.
  • 1 - Standardparameter, Bestätigung ist nur ab 1 Replikat erforderlich.
  • −1 - Bestätigung ist für alle synchronisierten Replikate erforderlich ( min.insync.replicas- Clusterkonfiguration).

Aus den obigen Werten ist ersichtlich, dass Acks gleich -1 die stärksten Garantien dafür geben, dass die Nachricht nicht verloren geht.


Wie wir alle wissen, sind verteilte Systeme unzuverlässig. Zum Schutz vor vorübergehenden Fehlfunktionen bietet Kafka Producer einen Wiederholungsparameter , mit dem Sie die Anzahl der Wiederholungsversuche während der Zeitüberschreitung der Zustellung festlegen können. Da der Parameter retries standardmäßig Integer.MAX_VALUE (2147483647) ist, kann die Anzahl der erneuten Übertragungen einer Nachricht gesteuert werden, indem nur delivery.timeout.ms geändert wird.


Auf genau eine Lieferung zu


Mit diesen Einstellungen kann unser Produzent Nachrichten mit hoher Garantie übermitteln. Lassen Sie uns nun darüber sprechen, wie Sie sicherstellen, dass nur eine Kopie einer Nachricht in einem Kafka-Thema aufgezeichnet wird. Setzen Sie im einfachsten Fall den Parameter enable.idempotence für Producer auf true. Idempotency garantiert die Aufzeichnung von nur einer Nachricht in einer bestimmten Partition eines Themas. Voraussetzung für die Aktivierung der Idempotenz ist acks = all, erneut versuchen> 0, max.in.flight.requests.per.connection ≤ 5 . Wenn diese Parameter nicht vom Entwickler festgelegt werden, werden die oben genannten Werte automatisch festgelegt.


Wenn die Idempotenz eingerichtet ist, muss sichergestellt werden, dass jedes Mal dieselben Nachrichten in dieselben Partitionen fallen. Dies kann durch Konfigurieren des Schlüssels und des Parameters partitioner.class auf Producer erfolgen. Beginnen wir mit dem Schlüssel. Für jede Sendung muss es gleich sein. Dies lässt sich leicht mit einer beliebigen Geschäftskennung aus der Originalnachricht erreichen. Der Parameter partitioner.class hat den Standardwert DefaultPartitioner . Bei dieser Partitionierungsstrategie ist das Standardverhalten wie folgt:


  • Wenn die Partition beim Senden der Nachricht explizit angegeben wird, verwenden wir sie.
  • Wenn die Partition nicht angegeben ist, der Schlüssel jedoch angegeben ist, wählen Sie die Partition per Hash aus dem Schlüssel aus.
  • Wenn Partition und Schlüssel nicht angegeben sind, wählen Sie die Partitionen der Reihe nach aus (Round-Robin).

Durch die Verwendung des Schlüssels und des idempotenten Sendens mit dem Parameter max.in.flight.requests.per.connection = 1 erhalten Sie außerdem eine ordnungsgemäße Verarbeitung von Nachrichten an Consumer. Unabhängig davon sollten Sie bedenken, dass Sie, wenn die Zugriffssteuerung in Ihrem Cluster konfiguriert ist, die Rechte zum vorübergehenden Schreiben in das Thema benötigen.


Wenn Sie plötzlich nicht mehr über genügend Möglichkeiten zum idempotenten Senden per Schlüssel verfügen oder die Logik auf der Producer-Seite die Aufrechterhaltung der Datenkonsistenz zwischen verschiedenen Partitionen erfordert, werden Transaktionen zu Hilfe kommen. Darüber hinaus können Sie mithilfe einer Kettentransaktion einen Datensatz in Kafka bedingt mit einem Datensatz in der Datenbank synchronisieren. Um das Transaktionssenden an Producer zu ermöglichen, muss das Gerät über die ID-Potenz verfügen und optional die transactional.id festlegen. Wenn die Zugriffssteuerung in Ihrem Kafka-Cluster konfiguriert ist, benötigen Sie für die Transaktionsaufzeichnung und für die idempotente Aufzeichnung Schreibberechtigungen, die mithilfe des in transactional.id gespeicherten Werts per Maske erteilt werden können.


Formal kann eine beliebige Zeichenfolge als Transaktionskennung verwendet werden, z. B. der Name einer Anwendung. Wenn Sie jedoch mehrere Instanzen derselben Anwendung mit derselben transactional.id ausführen, wird die erste gestartete Instanz mit einem Fehler beendet, da Kafka sie als Zombie-Prozess betrachtet.


 org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. 

Um dieses Problem zu lösen, fügen wir dem Anwendungsnamen ein Suffix in Form des Hostnamens hinzu, der aus den Umgebungsvariablen abgerufen wird.


Der Produzent ist konfiguriert, aber Transaktionen auf Kafka steuern nur den Nachrichtenbereich. Unabhängig vom Status der Transaktion fällt die Nachricht sofort in das Thema, verfügt jedoch über zusätzliche Systemattribute.


Um zu verhindern, dass solche Nachrichten von Consumer vorzeitig gelesen werden, muss der Parameter isolation.level auf read_committed gesetzt werden. Ein solcher Consumer kann nicht-transaktionale Nachrichten wie zuvor und transaktionale Nachrichten erst nach einem Commit lesen.
Wenn Sie alle oben aufgeführten Einstellungen installiert haben, haben Sie genau eine Lieferung konfiguriert. Herzlichen Glückwunsch!


Aber es gibt noch eine Nuance. Transactional.id, die wir oben konfiguriert haben, ist eigentlich ein Transaktionspräfix. Auf dem Transaktionsmanager wird eine Seriennummer hinzugefügt. Die empfangene ID wird in transactional.id.expiration.ms ausgegeben, das im Kafka-Cluster konfiguriert ist und einen Standardwert von "7 Tage" hat. Wenn die Anwendung während dieser Zeit keine Nachrichten empfangen hat, erhalten Sie beim nächsten Transaktionsversuch eine InvalidPidMappingException . Danach vergibt der Transaktionskoordinator eine neue Sequenznummer für die nächste Transaktion. Die Nachricht kann jedoch verloren gehen, wenn die InvalidPidMappingException nicht korrekt verarbeitet wird.


Anstelle von Summen


Wie Sie sehen, reicht es nicht aus, nur Nachrichten an Kafka zu senden. Sie müssen eine Kombination von Parametern auswählen und auf schnelle Änderungen vorbereitet sein. In diesem Artikel habe ich versucht, genau einmal die Übermittlungseinstellungen im Detail darzustellen, und einige Probleme bei der Konfiguration von client.id und transactional.id beschrieben, auf die wir gestoßen sind. Die Zusammenfassung der Einstellungen für Hersteller und Verbraucher ist nachstehend zusammengefasst.


Produzent:


  1. Acks = alle
  2. Wiederholungen> 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 - für ordentliches senden)
  5. transactional.id = $ {Anwendungsname} - $ {Hostname}

Verbraucher:


  1. isolation.level = read_committed

Um Fehler in zukünftigen Anwendungen zu minimieren, haben wir unseren Wrapper über die Federkonfiguration erstellt, in der Werte für einige der aufgelisteten Parameter bereits festgelegt sind.


Und hier sind ein paar Materialien für das Selbststudium:


Source: https://habr.com/ru/post/de481784/


All Articles