Integration von Spark Streaming und Kafka

Hallo Kollegen! Wir erinnern Sie daran, dass wir vor nicht allzu langer Zeit ein Buch über Spark veröffentlicht haben und derzeit ein Buch über Kafka das neueste Korrekturlesen durchläuft.


Wir hoffen, dass diese Bücher erfolgreich genug sind, um das Thema fortzusetzen - zum Beispiel für die Übersetzung und Veröffentlichung von Literatur zu Spark Streaming. Wir wollten Ihnen heute eine Übersetzung zur Integration dieser Technologie in Kafka anbieten.

1. Begründung

Apache Kafka + Spark Streaming ist eine der besten Kombinationen zum Erstellen von Echtzeitanwendungen. In diesem Artikel werden wir die Details einer solchen Integration ausführlich diskutieren. Außerdem sehen wir uns ein Beispiel mit Spark Streaming-Kafka an. Anschließend diskutieren wir den „Empfängeransatz“ und die Option der direkten Integration von Kafka und Spark Streaming. Beginnen wir also mit der Integration von Kafka und Spark Streaming.



2. Integration von Kafka und Spark Streaming

Bei der Integration von Apache Kafka und Spark Streaming gibt es zwei mögliche Ansätze zum Konfigurieren von Spark Streaming zum Empfangen von Daten von Kafka - d. H. zwei Ansätze zur Integration von Kafka und Spark Streaming. Erstens können Sie Empfänger und die übergeordnete Kafka-API verwenden. Der zweite (neuere) Ansatz ist die Arbeit ohne Empfänger. Für beide Ansätze gibt es unterschiedliche Programmiermodelle, die sich beispielsweise in Bezug auf Leistung und semantische Garantien unterscheiden.



Lassen Sie uns diese Ansätze genauer betrachten.

a. Empfängerbasierter Ansatz

In diesem Fall wird der Empfang der Daten vom Empfänger bereitgestellt. Mit der von Kafka bereitgestellten High-Level-Verbrauchs-API implementieren wir den Empfänger. Ferner werden die empfangenen Daten in Spark Artists gespeichert. Anschließend werden Jobs in Kafka - Spark Streaming gestartet, in denen Daten verarbeitet werden.

Bei Verwendung dieses Ansatzes bleibt jedoch das Risiko eines Datenverlusts im Fehlerfall (mit der Standardkonfiguration) bestehen. Infolgedessen muss in Kafka - Spark Streaming zusätzlich ein Write-Ahead-Protokoll eingefügt werden, um Datenverluste zu vermeiden. Somit werden alle von Kafka empfangenen Daten synchron im Write-Ahead-Protokoll in einem verteilten Dateisystem gespeichert. Deshalb können auch nach einem Systemausfall alle Daten wiederhergestellt werden.

Als Nächstes sehen wir uns an, wie dieser Ansatz mit Empfängern in einer Anwendung mit Kafka - Spark Streaming verwendet wird.

ich. Bindung

Jetzt verbinden wir unsere Streaming-Anwendung mit dem folgenden Artefakt für Scala / Java-Anwendungen. Wir verwenden die Projektdefinitionen für SBT / Maven.

groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.2.0 

Bei der Bereitstellung unserer Anwendung müssen wir jedoch die oben genannte Bibliothek und ihre Abhängigkeiten hinzufügen. Dies wird für Python-Anwendungen benötigt.

ii. Programmierung

Erstellen Sie als Nächstes einen DStream Eingabestream, indem Sie KafkaUtils in den Stream-Anwendungscode importieren:

 import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) 

Darüber hinaus können Sie mit den Optionen createStream Schlüsselklassen und Werteklassen sowie die entsprechenden Klassen für deren Dekodierung angeben.

iii. Bereitstellung

Wie bei jeder Spark-Anwendung wird der Befehl spark-submit zum Starten verwendet. Die Details unterscheiden sich jedoch geringfügig in Scala / Java-Anwendungen und in Python-Anwendungen.

Darüber hinaus können Sie mit –packages das spark-streaming-Kafka-0-8_2.11 und seine Abhängigkeiten direkt zum spark-submit spark-streaming-Kafka-0-8_2.11 hinzufügen. Dies ist nützlich für Python-Anwendungen, bei denen es unmöglich ist, Projekte mit SBT / Maven zu verwalten.

 ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ... 

Sie können auch das JAR-Archiv der Maven-Artefakt- spark-streaming-Kafka-0-8-assembly aus dem Maven-Repository herunterladen. Fügen Sie es dann zu spark-submit mit - jars .

b. Direkte Ansprache (keine Empfänger)

Nach dem Ansatz unter Verwendung von Empfängern wurde ein neuerer Ansatz entwickelt - der „direkte“. Es bietet zuverlässige End-to-End-Garantien. In diesem Fall fragen wir Kafka regelmäßig nach Offsets von Offsets für jedes Thema / jeden Abschnitt und sorgen nicht für die Datenlieferung durch die Empfänger. Zusätzlich wird die Größe des Lesefragments bestimmt, dies ist für die korrekte Verarbeitung jedes Pakets notwendig. Schließlich wird eine einfache konsumierende API verwendet, um Bereiche mit Daten von Kafka mit den angegebenen Offsets zu lesen, insbesondere wenn Datenverarbeitungsjobs gestartet werden. Der gesamte Vorgang ist wie das Lesen von Dateien aus einem Dateisystem.

Hinweis: Diese Funktion wurde in Spark 1.3 für Scala und die Java-API sowie in Spark 1.4 für die Python-API angezeigt.

Lassen Sie uns nun diskutieren, wie dieser Ansatz in unserer Streaming-Anwendung angewendet wird.
Die Consumer-API wird unter folgendem Link ausführlicher beschrieben:

Apache Kafka Verbraucher | Beispiele für Kafka Consumer

ich. Bindung

Dieser Ansatz wird zwar nur in Scala / Java-Anwendungen unterstützt. Erstellen Sie mit dem folgenden Artefakt das SBT / Maven-Projekt.

 groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.2.0 

ii. Programmierung

Importieren Sie als Nächstes KafkaUtils und erstellen Sie einen Eingabe- DStream im Stream-Anwendungscode:

 import org.apache.spark.streaming.kafka._ val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume]) 

In den Kafka-Parametern müssen Sie entweder metadata.broker.list oder bootstrap.servers angeben. Daher werden standardmäßig Daten ab dem letzten Versatz in jedem Abschnitt von Kafka verwendet. Wenn Sie jedoch möchten, dass der Messwert vom kleinsten Fragment auto.offset.reset müssen Sie in den Kafka-Parametern die Konfigurationsoption auto.offset.reset .

KafkaUtils.createDirectStream Sie mit den Optionen KafkaUtils.createDirectStream , können Sie außerdem von einem beliebigen Versatz aus mit dem Lesen beginnen. Dann werden wir Folgendes tun, um auf die in jedem Paket verbrauchten Kafka-Fragmente zuzugreifen.

 //      ,        var offsetRanges = Array.empty[OffsetRange] directKafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map { ... }.foreachRDD { rdd => for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } ... } 

Wenn wir die Überwachung von Kafka basierend auf Zookeeper mithilfe spezieller Tools organisieren möchten, können wir Zookeeper mithilfe ihrer Hilfe selbst aktualisieren.

iii. Bereitstellung

Der Bereitstellungsprozess ähnelt in diesem Fall dem Bereitstellungsprozess in der Variante mit dem Empfänger.

3. Die Vorteile eines direkten Ansatzes

Der zweite Ansatz zur Integration von Spark Streaming in Kafka übertrifft den ersten aus folgenden Gründen:

a. Vereinfachte Parallelität

In diesem Fall müssen Sie nicht viele Kafka-Eingabestreams erstellen und kombinieren. Mit Kafka-Spark-Streaming werden jedoch so viele RDD-Segmente erstellt, wie Kafka-Segmente für den Verbrauch vorhanden sind. Alle diese Kafka-Daten werden parallel gelesen. Daher können wir sagen, dass wir eine Eins-zu-Eins-Entsprechung zwischen den Segmenten Kafka und RDD haben werden, und ein solches Modell ist verständlicher und einfacher zu konfigurieren.

b. Wirksamkeit

Um Datenverluste während des ersten Ansatzes vollständig zu vermeiden, mussten die Informationen in einem Protokoll führender Datensätze gespeichert und dann repliziert werden. Tatsächlich ist dies ineffizient, da die Daten zweimal repliziert werden: das erste Mal von Kafka selbst und das zweite Mal vom Write-Ahead-Protokoll. Beim zweiten Ansatz wird dieses Problem beseitigt, da kein Empfänger vorhanden ist und daher kein führendes Schreibjournal benötigt wird. Wenn wir einen ausreichend langen Datenspeicher in Kafka haben, können Sie Nachrichten direkt von Kafka wiederherstellen.

s Genau einmalige Semantik

Grundsätzlich haben wir beim ersten Ansatz die übergeordnete Kafka-API verwendet, um verbrauchte Lesefragmente in Zookeeper zu speichern. Dies ist jedoch die Gewohnheit, Daten von Kafka zu konsumieren. Während Datenverluste zuverlässig beseitigt werden können, besteht eine geringe Wahrscheinlichkeit, dass bei einigen Fehlern einzelne Datensätze zweimal verwendet werden können. Der springende Punkt ist die Inkonsistenz zwischen dem zuverlässigen Datenübertragungsmechanismus in Kafka - Spark Streaming und dem Fragmentlesen in Zookeeper. Daher verwenden wir im zweiten Ansatz die einfache Kafka-API, für die kein Zugriff auf Zookeeper erforderlich ist. Hier werden die gelesenen Fragmente in Kafka - Spark Streaming verfolgt, dazu werden Kontrollpunkte verwendet. In diesem Fall wird die Inkonsistenz zwischen Spark Streaming und Zookeeper / Kafka beseitigt.

Daher empfängt Spark Streaming auch bei Fehlern jeden Datensatz streng einmal. Hier müssen wir sicherstellen, dass unsere Ausgabeoperation, in der die Daten in einem externen Speicher gespeichert werden, entweder idempotent oder eine atomare Transaktion ist, in der sowohl die Ergebnisse als auch die Offsets gespeichert werden. So wird genau einmalige Semantik bei der Ableitung unserer Ergebnisse erreicht.

Es gibt jedoch einen Nachteil: Offsets in Zookeeper werden nicht aktualisiert. Mit den auf Zookeeper basierenden Überwachungstools von Kafka können Sie daher den Fortschritt nicht verfolgen.
Wenn die Verarbeitung auf diese Weise angeordnet ist, können wir jedoch weiterhin auf Offsets verweisen. Wir wenden uns an jedes Paket und aktualisieren Zookeeper selbst.

Das ist alles, worüber wir über die Integration von Apache Kafka und Spark Streaming sprechen wollten. Wir hoffen es hat euch gefallen.

Source: https://habr.com/ru/post/de417123/


All Articles