Abonnieren Sie Kafka über HTTP oder vereinfachen Sie Ihre Web-Hooks

Es gibt viele Möglichkeiten, Nachrichten von Pub-Sub-Systemen zu verarbeiten: Verwenden eines separaten Dienstes, Isolieren eines isolierten Prozesses, Orchestrieren eines Pools von Prozessen / Threads, komplexer IPC, Poll-over-Http und viele andere. Heute möchte ich über die Verwendung von Pub-Sub über HTTP und über meinen speziell dafür geschriebenen Dienst sprechen.

Die Verwendung eines vorgefertigten HTTP-Dienst-Backends ist in einigen Fällen eine ideale Lösung für die Verarbeitung einer Nachrichtenwarteschlange:

  1. Aus der Box balancieren. Normalerweise befindet sich das Backend bereits hinter dem Balancer und verfügt über eine einladbare Infrastruktur, die die Arbeit mit Nachrichten erheblich vereinfacht.
  2. Verwenden eines regulären REST-Controllers (eine beliebige HTTP-Ressource). Durch das Konsumieren von HTTP-Nachrichten werden die Kosten für die Implementierung von Computern für verschiedene Sprachen minimiert, wenn das Backend gemischt ist.
  3. Vereinfachung der Verwendung von Web-Hooks anderer Dienste. Jetzt unterstützt fast jeder Dienst (Jira, Gitlab, Mattermost, Slack ...) irgendwie Web-Hooks für die Interaktion mit der Außenwelt. Sie können das Leben erleichtern, wenn Sie der Warteschlange beibringen, die Funktionen eines HTTP-Dispatchers auszuführen.

Dieser Ansatz hat auch Nachteile:

  1. Sie können die Leichtigkeit der Lösung vergessen. HTTP ist ein umfangreiches Protokoll, und die Verwendung von Frameworks auf der Seite des Verbrauchers erhöht sofort die Latenz und die Last.
  2. Wir verlieren die Stärken des Poll-Ansatzes und bekommen die Schwächen von Push.
  3. Das Verarbeiten von Nachrichten durch dieselben Dienstinstanzen, die Clients verarbeiten, kann die Reaktionsfähigkeit beeinträchtigen. Dies ist nicht signifikant, da es mit Ausgleich und Isolation behandelt wird.

Ich habe die Idee als Queue-Over-Http-Dienst implementiert, auf den später noch eingegangen wird. Das Projekt wird in Kotlin mit Spring Boot 2.1 geschrieben. Als Broker ist derzeit nur Apache Kafka verfügbar.

Weiter im Artikel wird angenommen, dass der Leser mit Kafka vertraut ist und die Commits (Commit) und Offsets (Offset) von Nachrichten, die Prinzipien von Gruppen (Gruppe) und Konsumenten (Konsumenten) kennt und auch versteht, wie sich Partition (Partition) vom Thema (Thema) unterscheidet. . Wenn es Lücken gibt, empfehle ich Ihnen, diesen Abschnitt der Kafka-Dokumentation zu lesen, bevor Sie fortfahren.

Inhalt



Rückblick


Queue-Over-Http ist ein Dienst, der als Vermittler zwischen einem Nachrichtenbroker und dem endgültigen HTTP-Konsumenten fungiert (der Dienst erleichtert die Implementierung der Unterstützung für das Senden von Nachrichten an Verbraucher auf andere Weise, z. B. auf verschiedene * RPCs). Derzeit ist nur das Abonnieren, Abbestellen und Anzeigen der Verbraucherliste verfügbar. Das Senden von Nachrichten an den Broker (Produzieren) über HTTP wurde noch nicht implementiert, da die Reihenfolge der Nachrichten ohne besondere Unterstützung des Herstellers nicht garantiert werden kann.

Die Schlüsselfigur des Dienstes ist der Verbraucher, der bestimmte Partitionen oder nur Themen abonnieren kann (das Themenmuster wird unterstützt). Im ersten Fall ist die automatische Balance von Partitionen deaktiviert. Nach dem Abonnieren empfängt die angegebene HTTP-Ressource Nachrichten von den zugewiesenen Kafka-Partitionen. Architektonisch ist jeder Abonnent einem nativen Kafka Java-Client zugeordnet.

unterhaltsame Geschichte über KafkaConsumer
Kafka hat einen wunderbaren Java-Client, der viel kann. Ich verwende es im Warteschlangenadapter, um Nachrichten vom Broker zu empfangen und sie dann an die lokalen Servicewarteschlangen zu senden. Erwähnenswert ist, dass der Client ausschließlich im Kontext eines einzelnen Threads arbeitet.

Die Idee des Adapters ist einfach. Wir beginnen in einem Thread und schreiben den einfachsten Scheduler für native Clients, wobei wir uns auf die Reduzierung der Latenz konzentrieren. Das heißt, wir schreiben etwas Ähnliches:

while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) /*      */ if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) { // ,    Thread.sleep(1) } } 

Es scheint, dass alles wunderbar ist, die Latenz ist selbst bei Dutzenden von Verbrauchern minimal. In der Praxis stellte sich heraus, dass KafkaConsumer auf diese Betriebsart KafkaConsumer und in der Leerlaufzeit eine Zuordnungsrate von ca. 1,5 MB / s ergibt. Bei 100 Kurieren erreicht die Zuweisungsrate 150 MB / s und lässt GC häufig an die Anwendung denken. Natürlich ist all dieser Müll in der jungen Gegend, GC ist durchaus in der Lage, damit umzugehen, aber die Lösung ist immer noch nicht perfekt.

KafkaConsumer müssen Sie den für KafkaConsumer typischen Weg KafkaConsumer und jetzt platziere ich jeden Abonnenten in meinem Stream. Dies bedeutet einen Overhead für Speicher und Zeitplanung, aber es gibt keinen anderen Weg.

Ich schreibe den Code von oben um, entferne die innere Schleife und ändere Duration.ZERO in Duration.ofMillis(100) . Es stellt sich heraus, dass die Zuweisungsrate auf akzeptable 80-150 KB / s pro Verbraucher fällt. Eine Abfrage mit einer Zeitüberschreitung von 100 ms verzögert jedoch die gesamte Warteschlange der Festschreibungen auf dieselben 100 ms, und dies ist häufig inakzeptabel.

Bei der Suche nach Lösungen für das Problem erinnere KafkaConsumer::wakeup mich an KafkaConsumer::wakeup , das eine WakeupException und alle Blockierungsvorgänge für den Verbraucher unterbricht. Mit dieser Methode ist der Weg zu einer geringen Latenz einfach: Wenn eine neue Anforderung für das Festschreiben eintrifft, stellen wir sie in die Warteschlange und rufen auf dem nativen Konsumenten wakeup . Fangen WakeupException im Arbeitszyklus WakeupException und WakeupException Sie fest, was sich angesammelt hat. Für die Übertragung der Kontrolle mit Hilfe von Ausnahmen müssen Sie diese sofort in Ihre Hände geben, aber da sonst nichts ...

Es stellt sich heraus, dass diese Option WakeupException perfekt ist, da jede Operation auf dem nativen Verbraucher jetzt eine WakeupException , einschließlich des Commits selbst. wakeup diese Situation verarbeiten, wird der Code mit einem Flag wakeup , das das wakeup ermöglicht.

Ich komme zu dem Schluss, dass es schön wäre, die KafkaConsumer::poll Methode so zu ändern, dass sie gemäß einem zusätzlichen Flag normal unterbrochen werden kann. Infolgedessen wurde Frankenstein aus der Reflexion geboren, die genau die ursprüngliche Abfragemethode kopiert und einen Ausgang aus der Schleife durch die Flagge hinzufügt. Dieses Flag wird durch eine separate InterruptPoll-Methode gesetzt, die darüber hinaus Wakeup auf dem Client-Selektor aufruft, um die Thread-Sperre für E / A-Operationen aufzuheben.

Nachdem ich den Client auf diese Weise implementiert habe, erhalte ich die Reaktionsgeschwindigkeit ab dem Moment, in dem eine Anforderung für ein Commit bei der Verarbeitung eingeht, bis zu 100 Mikrosekunden und eine hervorragende Latenz für das Abrufen von Nachrichten von einem Broker, was in Ordnung ist.

Jede Partition wird durch eine separate lokale Warteschlange dargestellt, in die der Adapter Nachrichten vom Broker schreibt. Der Worker nimmt Nachrichten von ihm und sendet sie zur Ausführung, dh zum Senden über HTTP.

Der Dienst unterstützt die Stapelnachrichtenverarbeitung, um den Durchsatz zu erhöhen. Beim Abonnieren können Sie den concurrencyFactor jedes Thema angeben (gilt für jede zugewiesene Partition unabhängig). concurrencyFactor=1000 bedeutet beispielsweise, dass 1000 Nachrichten in Form von HTTP-Anforderungen gleichzeitig an den Verbraucher gesendet werden können. Sobald alle Nachrichten aus dem Paket vom Verbraucher eindeutig ausgearbeitet wurden, entscheidet der Dienst über das nächste Festschreiben des Offsets der letzten Nachricht in Kafka. Daher ist der zweite Wert von concurrencyFactor die maximale Anzahl von Nachrichten, die vom Verbraucher im Falle eines Kafka- oder Queue-Over-Http-Absturzes verarbeitet werden.

Um Verzögerungen zu loadFactor = concurrencyFactor * 2 , verfügt die Warteschlange über loadFactor = concurrencyFactor * 2 , loadFactor = concurrencyFactor * 2 Sie doppelt so viele Nachrichten vom Broker lesen können, wie gesendet werden können. Da die automatische Festschreibung auf dem nativen Client deaktiviert ist, verstößt ein solches Schema nicht gegen die Mindestgarantien.
Ein hoher concurrencyFactor Wert erhöht den Durchsatz der Warteschlange, indem die Anzahl der Commits verringert wird, die im schlimmsten Fall bis zu 10 ms dauern. Gleichzeitig steigt die Belastung des Verbrauchers.

Die Reihenfolge des Sendens von Nachrichten innerhalb des Bundles ist nicht garantiert, kann jedoch durch Setzen von concurrencyFactor=1 .

Commits


Commits sind ein wichtiger Bestandteil des Service. Wenn das nächste Datenpaket bereit ist, wird der Offset der letzten Nachricht aus dem Paket sofort an Kafka festgeschrieben, und erst nach einem erfolgreichen Festschreiben wird das nächste Paket für die Verarbeitung verfügbar. Oft reicht dies nicht aus und ein automatisches Festschreiben ist erforderlich. Zu diesem autoCommitPeriodMs gibt es den Parameter autoCommitPeriodMs , der mit der klassischen Autocommit-Periode für native Clients, die die zuletzt von der Partition gelesene Nachricht autoCommitPeriodMs , wenig gemein hat. Stellen Sie sich concurrencyFactor=10 . Der Dienst hat alle 10 Nachrichten gesendet und wartet darauf, dass jede von ihnen bereit ist. Die Verarbeitung von Nachricht 3 ist zuerst abgeschlossen, dann von Nachricht 1 und dann von Nachricht 10. Zu diesem Zeitpunkt ist es Zeit für die automatische Festschreibung. Es ist wichtig, die Semantik von mindestens einmal nicht zu verletzen. Daher können Sie nur die erste Nachricht festschreiben, dh Offset 2, da zu diesem Zeitpunkt nur die Nachricht erfolgreich verarbeitet wurde. Bis zur nächsten automatischen Festschreibung werden die Nachrichten 2, 5, 6, 4 und 8 verarbeitet. Jetzt müssen Sie nur noch den Offset 7 festschreiben und so weiter. Autocommit hat fast keinen Einfluss auf den Durchsatz.

Fehlerbehandlung


Im normalen Betriebsmodus sendet der Dienst einmal eine Nachricht an den Supervisor. Wenn aus irgendeinem Grund ein 4xx- oder 5xx-Fehler verursacht wurde, sendet der Dienst die Nachricht erneut und wartet auf die erfolgreiche Verarbeitung. Die Zeit zwischen den Versuchen kann als separater Parameter konfiguriert werden.

Es ist auch möglich, die Anzahl der Versuche festzulegen, nach denen die Nachricht als verarbeitet markiert wird, wodurch Neuübertragungen unabhängig vom Status der Antwort gestoppt werden. Ich rate nicht, dies für sensible Daten zu verwenden. Versagenssituationen von Verbrauchern sollten immer manuell angepasst werden. Sticky-Nachrichten können durch Serviceprotokolle und die Überwachung des Status der Antwort des Verbrauchers überwacht werden.

über das Festhalten
Normalerweise sendet der HTTP-Server, der 4xx oder 5xx den Status der Antwort gibt, auch den Header Connection: close . Eine auf diese Weise geschlossene TCP-Verbindung bleibt im Status TIME_WAITED , bis sie nach einiger Zeit vom Betriebssystem gelöscht wird. Das Problem ist, dass solche Verbindungen einen gesamten Port belegen, der erst freigegeben werden kann. Dies kann dazu führen, dass auf dem Computer keine freien Ports zum Herstellen einer TCP-Verbindung vorhanden sind und der Dienst bei jedem Senden mit Ausnahmen in den Protokollen ausgelöst wird. In der Praxis enden die Ports unter Windows 10 nach 10 bis 20 000 fehlerhaften Nachrichten innerhalb von 1 bis 2 Minuten. Im Standardmodus ist dies kein Problem.

Nachrichten


Jede vom Broker extrahierte Nachricht wird über HTTP an den Berater an die im Abonnement angegebene Ressource gesendet. Standardmäßig wird eine Nachricht durch eine POST-Anforderung im Hauptteil gesendet. Dieses Verhalten kann durch Angabe einer anderen Methode geändert werden. Wenn die Methode das Senden von Daten im Hauptteil nicht unterstützt, können Sie den Namen des Zeichenfolgenparameters angeben, in dem die Nachricht gesendet wird. Darüber hinaus können Sie beim Abonnieren zusätzliche Header angeben, die jeder Nachricht hinzugefügt werden. Dies ist praktisch für die grundlegende Autorisierung mithilfe von Token. Zu jeder Nachricht werden Header mit der Kennung des Verbrauchers, des Themas und der Partition, von der die Nachricht gelesen wurde, der Nachrichtennummer, ggf. des Partitionsschlüssels sowie dem Namen des Brokers hinzugefügt.

Leistung


Um die Leistung zu bewerten, verwendete ich einen PC (Windows 10, OpenJDK-11 (G1 ohne Optimierung), i7-6700K, 16 GB), auf dem der Dienst ausgeführt wird, und einen Laptop (Windows 10, i5-8250U, 8 GB), auf dem der Nachrichtenproduzent HTTP arbeitet Resource Consumer und Kafka mit Standardeinstellungen. Der PC ist über eine 1-Gbit / s-Kabelverbindung mit dem Router verbunden, der Laptop über 802.11ac. Der Produzent schreibt alle 110 ms alle 100 ms für 110 Byte Nachrichten aus verschiedenen Gruppen zu den festgelegten Themen, für die die Follower abonniert sind ( concurrencyFactor=500 , Auto-Commit ist deaktiviert). Der Stand ist alles andere als ideal, aber Sie können sich ein Bild machen.

Ein wichtiger Messparameter ist die Auswirkung des Dienstes auf die Latenz.

Lassen Sie:
- t q - Zeitstempel des Dienstes, der Nachrichten vom nativen Client empfängt
- d t0 ist die Zeit zwischen t q und der Zeit, zu der die Nachricht von der lokalen Warteschlange an den Pool von Führungskräften gesendet wurde
- d t ist die Zeit zwischen t q und dem Zeitpunkt, zu dem die HTTP-Anforderung gesendet wurde. Dies ist der Einfluss des Dienstes auf die Latenz der Nachricht.

Während der Messungen wurden folgende Ergebnisse erhalten (C - Verbraucher, T - Themen, M - Nachrichten):



In der Standardbetriebsart hat der Dienst selbst fast keinen Einfluss auf die Latenz, und der Speicherverbrauch ist minimal. Die Maximalwerte von d t (ca. 60 ms) sind nicht speziell angegeben, da sie vom Betrieb des GC und nicht vom Dienst selbst abhängen. Eine spezielle Abstimmung des GC oder das Ersetzen von G1 durch Shenandoah kann dazu beitragen, die Verteilung der Maximalwerte auszugleichen.

Alles ändert sich dramatisch, wenn der Verbraucher den Nachrichtenfluss aus der Warteschlange nicht bewältigt und der Dienst den Drosselungsmodus aktiviert. In diesem Modus steigt der Speicherverbrauch, da die Antwortzeit auf Anforderungen erheblich zunimmt, wodurch eine rechtzeitige Bereinigung der Ressourcen verhindert wird. Die Auswirkung auf die Latenz bleibt hier auf dem Niveau der vorherigen Ergebnisse, und hohe dt-Werte werden durch das Vorladen von Nachrichten in der lokalen Warteschlange verursacht.

Leider ist es nicht möglich, bei einer höheren Last zu testen, da sich der Laptop bereits bei 1300 RPS verbiegt. Wenn jemand bei der Organisation von Messungen bei hohen Lasten helfen kann, stelle ich gerne eine Baugruppe für Tests zur Verfügung.

Demonstration


Fahren wir nun mit der Demonstration fort. Dafür brauchen wir:

  • Kafka Broker, bereit zu gehen. Ich werde die am 192.168.99.100:9092 von Bitnami erhobene Instanz übernehmen.
  • Eine HTTP-Ressource, die Nachrichten empfängt. Aus Gründen der Klarheit habe ich Slack Web-Hooks abgenommen.

Zunächst müssen Sie den Queue-Over-Http-Dienst selbst aufrufen. Erstellen Sie dazu die folgenden Inhalte in einem leeren Verzeichnis application.yml :

 spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092" 

Hier geben wir dem Dienst die Verbindungsparameter eines bestimmten Brokers sowie den Speicherort für Abonnenten an, damit diese zwischen den Starts nicht verloren gehen. In `app.brokers []. Config` können Sie alle Verbindungsparameter angeben, die vom nativen Kafka-Client unterstützt werden. Eine vollständige Liste finden Sie hier .

Da die Konfigurationsdatei von Spring verarbeitet wird, können Sie dort viele interessante Dinge schreiben. Konfigurieren Sie auch die Protokollierung.

Führen Sie nun den Dienst selbst aus. Wir verwenden den einfachsten Weg - docker-compose.yml :

 version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist 

Wenn diese Option nicht zu Ihnen passt, können Sie den Dienst aus der Quelle kompilieren. Montageanleitung im Readme-Projekt, auf die am Ende des Artikels verwiesen wird.

Der nächste Schritt ist die Registrierung des ersten Teilnehmers. Dazu müssen Sie eine HTTP-Anforderung an den Dienst mit einer Beschreibung des Verbrauchers ausführen:

 POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } } 

Wenn alles gut gegangen ist, wird die Antwort fast der gleiche gesendete Inhalt sein.

Lassen Sie uns jeden Parameter durchgehen:

  • Consumer.id - ID unseres Abonnenten
  • Consumer.group.id - Gruppenkennung
  • Consumer.broker - Geben Sie an, welchen der Service Broker Sie abonnieren müssen
  • Consumer.topics[0].name - Der Name des Themas, von dem wir Nachrichten empfangen möchten
  • Consumer.topics[0].config. concurrencyFactor Consumer.topics[0].config. concurrencyFactor - maximale Anzahl gleichzeitig gesendeter Nachrichten
  • Consumer.topics[0].config. autoCommitPeriodMs Consumer.topics[0].config. autoCommitPeriodMs - erzwungene Consumer.topics[0].config. autoCommitPeriodMs für fertige Nachrichten
  • Consumer.subscriptionMethod.type - Abonnementtyp. Derzeit ist nur HTTP verfügbar.
  • Consumer.subscriptionMethod.delayOnErrorMs - Zeit vor dem erneuten Senden einer Nachricht, die mit einem Fehler endete
  • Consumer.subscriptionMethod.retryBeforeCommit - Die Anzahl der Versuche, die Fehlermeldung erneut zu senden. Wenn 0 - wird die Nachricht bis zur erfolgreichen Verarbeitung gedreht. In unserem Fall ist die Garantie der vollständigen Lieferung nicht so wichtig wie die Konstanz des Flusses.
  • Consumer.subscriptionMethod.uri - die Ressource, an die Nachrichten gesendet werden
  • Consumer.subscriptionMethod.additionalHeader - zusätzliche Header, die mit jeder Nachricht gesendet werden. Beachten Sie, dass sich im Text jeder Nachricht JSON befindet, damit Slack die Anforderung korrekt interpretieren kann.

In dieser Anforderung wird die HTTP-Methode weggelassen, da die Standardeinstellung POST, Slack in Ordnung ist.

Ab diesem Moment überwacht der Dienst die zugewiesenen Partitionen des Themas slack.test auf neue Nachrichten.

Um Nachrichten zum Thema zu schreiben, verwende ich die in Kafka integrierten Dienstprogramme, die sich in /opt/bitnami/kafka/bin gestarteten Kafka-Images befinden (die Position der Dienstprogramme in anderen Kafka-Instanzen kann unterschiedlich sein):

 kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {“text”: “Hello!”} 

Gleichzeitig benachrichtigt Sie Slack über eine neue Nachricht:



Um einen Verbraucher abzumelden, reicht es aus, eine POST-Anfrage an "Broker / Abbestellen" mit demselben Inhalt wie während des Abonnements zu stellen.

Fazit


Derzeit ist nur die Grundfunktionalität implementiert. Darüber hinaus ist geplant, die Stapelverarbeitung zu verbessern, eine einmalige Semantik zu implementieren, die Möglichkeit zum Senden von Nachrichten an den Broker über HTTP hinzuzufügen und vor allem Unterstützung für andere beliebte Pub-Sub-Programme hinzuzufügen.

Der Queue-Over-Http-Dienst befindet sich derzeit in der aktiven Entwicklung. Version 0.1.3 ist stabil genug zum Testen auf Entwicklungs- und Bühnenständern. Die Leistung wurde unter Windows 10, Debian 9 und Ubuntu 18.04 getestet. Sie können prod auf eigenes Risiko verwenden. Wenn Sie bei der Entwicklung helfen oder Feedback zum Service geben möchten, heißen wir Sie beim Github- Projekt willkommen.

Source: https://habr.com/ru/post/de435346/


All Articles