Vor ein paar Monaten habe ich angefangen, Spark zu studieren, und irgendwann hatte ich das Problem, strukturierte Streaming-Berechnungen in der Cassandra-Datenbank zu speichern.
In diesem Beitrag gebe ich ein einfaches Beispiel für das Erstellen und Verwenden von Cassandra Sink für Spark Structured Streaming. Ich hoffe, dass der Beitrag für diejenigen nützlich sein wird, die kürzlich mit Spark Structured Streaming begonnen haben und sich fragen, wie sie die Berechnungsergebnisse in die Datenbank hochladen können.
Die Idee der Anwendung ist sehr einfach: Nachrichten von Kafka zu empfangen und zu analysieren, einfache Transformationen in einem Paar durchzuführen und die Ergebnisse in Cassandra zu speichern.
Vorteile von strukturiertem Streaming
Weitere Informationen zu Structured Streaming finden Sie in der
Dokumentation . Kurz gesagt, Structured Streaming ist eine gut skalierbare Streaming-Informationsverarbeitungs-Engine, die auf der Spark SQL-Engine basiert. Mit Dataset / DataFrame können Sie Daten aggregieren, Fensterfunktionen, Verbindungen usw. berechnen. Mit Structured Streaming können Sie das gute alte SQL für die Arbeit mit Datenströmen verwenden.
Was ist das Problem?
Die stabile Version von Spark Structured Streaming wurde 2017 veröffentlicht. Das heißt, dies ist eine ziemlich neue API, die die Grundfunktionalität implementiert, aber einige Dinge müssen von uns selbst erledigt werden. Beispielsweise verfügt Structured Streaming über Standardfunktionen zum Schreiben von Ausgaben in eine Datei, eine Kachel, eine Konsole oder einen Speicher. Um jedoch Daten in der Datenbank zu speichern, müssen Sie den in Structured Streaming verfügbaren
foreach- Empfänger verwenden und die
ForeachWriter- Schnittstelle implementieren.
Ab Spark 2.3.1 kann diese Funktionalität nur in Scala und Java implementiert werden .
Ich gehe davon aus, dass der Leser bereits weiß, wie strukturiertes Streaming allgemein funktioniert, wie er die erforderlichen Transformationen implementiert und nun bereit ist, die Ergebnisse in die Datenbank hochzuladen. Wenn einige der oben genannten Schritte unklar sind, kann die offizielle Dokumentation als guter Ausgangspunkt für das Erlernen von strukturiertem Streaming dienen. In diesem Artikel möchte ich mich auf den letzten Schritt konzentrieren, wenn Sie die Ergebnisse in einer Datenbank speichern müssen.
Im Folgenden werde ich eine Beispielimplementierung der Cassandra-Senke für strukturiertes Streaming beschreiben und erläutern, wie sie in einem Cluster ausgeführt wird. Den vollständigen Code finden Sie
hier .
Als ich zum ersten Mal auf das oben genannte Problem stieß, erwies sich
dieses Projekt als sehr nützlich. Es mag jedoch etwas kompliziert erscheinen, wenn der Leser gerade mit Structured Streaming begonnen hat und nach einem einfachen Beispiel für das Hochladen von Daten auf Cassandra sucht. Darüber hinaus ist das Projekt für die Arbeit im lokalen Modus geschrieben und erfordert einige Änderungen, um im Cluster ausgeführt zu werden.
Ich möchte auch Beispiele geben, wie Daten mit
JDBC in
MongoDB und jeder anderen Datenbank gespeichert werden.
Einfache Lösung
Um Daten auf ein externes System hochzuladen, müssen Sie den
foreach- Empfänger verwenden. Lesen Sie hier mehr darüber. Kurz gesagt, die
ForeachWriter- Schnittstelle muss implementiert sein. Das heißt, es muss festgelegt werden, wie die Verbindung geöffnet werden soll, wie jedes Datenelement verarbeitet werden soll und wie die Verbindung am Ende der Verarbeitung geschlossen werden soll. Der Quellcode lautet wie folgt:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
Ich werde die Definition von
CassandraDriver und die Struktur der Ausgabetabelle später beschreiben, aber jetzt schauen wir uns genauer an, wie der obige Code funktioniert. Um von Spark aus eine Verbindung zu Kasandra herzustellen, erstelle ich ein
CassandraDriver- Objekt, das den Zugriff auf den von
DataStax entwickelten
CassandraConnector ermöglicht . Der CassandraConnector ist für das Öffnen und Schließen der Verbindung zur Datenbank verantwortlich. Daher zeige ich nur Debugging-Meldungen in den Methoden
open und
close der
CassandraSinkForeach- Klasse an.
Der obige Code wird von der Hauptanwendung wie folgt aufgerufen:
val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start()
CassandraSinkForeach wird für jede
Datenzeile erstellt, sodass jeder Arbeitsknoten seinen Teil der Zeilen in die Datenbank einfügt. Das heißt, jeder Arbeitsknoten führt
val cassandraDriver = new CassandraDriver () aus; So sieht CassandraDriver aus:
class CassandraDriver extends SparkSessionBuilder {
Schauen wir uns das Funkenobjekt genauer an. Der Code für
SparkSessionBuilder lautet wie folgt:
class SparkSessionBuilder extends Serializable {
Auf jedem Arbeitsknoten bietet
SparkSessionBuilder Zugriff auf die
SparkSession , die auf dem Treiber erstellt wurde. Um einen solchen Zugriff zu ermöglichen, muss
SparkSessionBuilder serialisiert und ein
vorübergehender Lazy Val verwendet werden , wodurch das Serialisierungssystem
conf- und
spark- Objekte ignorieren
kann , wenn das Programm initialisiert wird und bis auf die Objekte zugegriffen wird. Wenn das
buildSparkSession- Programm
gestartet wird
, wird es serialisiert und an jeden Arbeitsknoten gesendet.
Conf- und
Spark- Objekte sind jedoch nur zulässig, wenn der Arbeitsknoten auf sie zugreift.
Schauen wir uns nun den Hauptanwendungscode an:
object KafkaToCassandra extends SparkSessionBuilder {
Wenn die Anwendung zur Ausführung
gesendet wird , wird
buildSparkSession serialisiert und an die Arbeitsknoten gesendet.
Conf- und
Spark- Objekte bleiben jedoch ungelöst. Anschließend erstellt der Treiber ein
Funkenobjekt in
KafkaToCassandra und verteilt die Arbeit auf die Arbeitsknoten. Jeder Arbeitsknoten liest Daten aus Kafka, führt einfache Transformationen für den empfangenen Teil der Datensätze durch. Wenn der Arbeitsknoten bereit ist, die Ergebnisse in die Datenbank zu schreiben, ermöglicht er
Conf- und
Spark- Objekte und erhält so Zugriff auf die auf dem Treiber erstellte
SparkSession .
Wie erstelle ich die Anwendung und führe sie aus?
Als ich von PySpark zu Scala wechselte, brauchte ich eine Weile, um herauszufinden, wie die Anwendung erstellt werden sollte. Deshalb habe ich Maven
pom.xml in mein Projekt aufgenommen. Der Reader kann die Anwendung mit Maven erstellen, indem er den Befehl
mvn package ausführt . Nachdem die Anwendung zur Ausführung gesendet werden kann mit
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
Um die Anwendung zu erstellen und auszuführen, müssen Sie die Namen meiner AWS-Computer durch Ihre eigenen ersetzen (d. H. Alles ersetzen, was wie ec2-xx-xxx-xx-xx.compute-1.amazonaws.com aussieht).
Insbesondere Spark und Structured Streaming ist für mich ein neues Thema, daher bin ich den Lesern für Kommentare, Diskussionen und Korrekturen sehr dankbar.