Cassandra Sink für Spark Structured Streaming

Vor ein paar Monaten habe ich angefangen, Spark zu studieren, und irgendwann hatte ich das Problem, strukturierte Streaming-Berechnungen in der Cassandra-Datenbank zu speichern.

In diesem Beitrag gebe ich ein einfaches Beispiel für das Erstellen und Verwenden von Cassandra Sink für Spark Structured Streaming. Ich hoffe, dass der Beitrag für diejenigen nützlich sein wird, die kürzlich mit Spark Structured Streaming begonnen haben und sich fragen, wie sie die Berechnungsergebnisse in die Datenbank hochladen können.

Die Idee der Anwendung ist sehr einfach: Nachrichten von Kafka zu empfangen und zu analysieren, einfache Transformationen in einem Paar durchzuführen und die Ergebnisse in Cassandra zu speichern.

Vorteile von strukturiertem Streaming


Weitere Informationen zu Structured Streaming finden Sie in der Dokumentation . Kurz gesagt, Structured Streaming ist eine gut skalierbare Streaming-Informationsverarbeitungs-Engine, die auf der Spark SQL-Engine basiert. Mit Dataset / DataFrame können Sie Daten aggregieren, Fensterfunktionen, Verbindungen usw. berechnen. Mit Structured Streaming können Sie das gute alte SQL für die Arbeit mit Datenströmen verwenden.

Was ist das Problem?


Die stabile Version von Spark Structured Streaming wurde 2017 veröffentlicht. Das heißt, dies ist eine ziemlich neue API, die die Grundfunktionalität implementiert, aber einige Dinge müssen von uns selbst erledigt werden. Beispielsweise verfügt Structured Streaming über Standardfunktionen zum Schreiben von Ausgaben in eine Datei, eine Kachel, eine Konsole oder einen Speicher. Um jedoch Daten in der Datenbank zu speichern, müssen Sie den in Structured Streaming verfügbaren foreach- Empfänger verwenden und die ForeachWriter- Schnittstelle implementieren. Ab Spark 2.3.1 kann diese Funktionalität nur in Scala und Java implementiert werden .

Ich gehe davon aus, dass der Leser bereits weiß, wie strukturiertes Streaming allgemein funktioniert, wie er die erforderlichen Transformationen implementiert und nun bereit ist, die Ergebnisse in die Datenbank hochzuladen. Wenn einige der oben genannten Schritte unklar sind, kann die offizielle Dokumentation als guter Ausgangspunkt für das Erlernen von strukturiertem Streaming dienen. In diesem Artikel möchte ich mich auf den letzten Schritt konzentrieren, wenn Sie die Ergebnisse in einer Datenbank speichern müssen.

Im Folgenden werde ich eine Beispielimplementierung der Cassandra-Senke für strukturiertes Streaming beschreiben und erläutern, wie sie in einem Cluster ausgeführt wird. Den vollständigen Code finden Sie hier .

Als ich zum ersten Mal auf das oben genannte Problem stieß, erwies sich dieses Projekt als sehr nützlich. Es mag jedoch etwas kompliziert erscheinen, wenn der Leser gerade mit Structured Streaming begonnen hat und nach einem einfachen Beispiel für das Hochladen von Daten auf Cassandra sucht. Darüber hinaus ist das Projekt für die Arbeit im lokalen Modus geschrieben und erfordert einige Änderungen, um im Cluster ausgeführt zu werden.

Ich möchte auch Beispiele geben, wie Daten mit JDBC in MongoDB und jeder anderen Datenbank gespeichert werden.

Einfache Lösung


Um Daten auf ein externes System hochzuladen, müssen Sie den foreach- Empfänger verwenden. Lesen Sie hier mehr darüber. Kurz gesagt, die ForeachWriter- Schnittstelle muss implementiert sein. Das heißt, es muss festgelegt werden, wie die Verbindung geöffnet werden soll, wie jedes Datenelement verarbeitet werden soll und wie die Verbindung am Ende der Verarbeitung geschlossen werden soll. Der Quellcode lautet wie folgt:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 

Ich werde die Definition von CassandraDriver und die Struktur der Ausgabetabelle später beschreiben, aber jetzt schauen wir uns genauer an, wie der obige Code funktioniert. Um von Spark aus eine Verbindung zu Kasandra herzustellen, erstelle ich ein CassandraDriver- Objekt, das den Zugriff auf den von DataStax entwickelten CassandraConnector ermöglicht . Der CassandraConnector ist für das Öffnen und Schließen der Verbindung zur Datenbank verantwortlich. Daher zeige ich nur Debugging-Meldungen in den Methoden open und close der CassandraSinkForeach- Klasse an.

Der obige Code wird von der Hauptanwendung wie folgt aufgerufen:

 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 

CassandraSinkForeach wird für jede Datenzeile erstellt, sodass jeder Arbeitsknoten seinen Teil der Zeilen in die Datenbank einfügt. Das heißt, jeder Arbeitsknoten führt val cassandraDriver = new CassandraDriver () aus; So sieht CassandraDriver aus:

 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 

Schauen wir uns das Funkenobjekt genauer an. Der Code für SparkSessionBuilder lautet wie folgt:

 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 

Auf jedem Arbeitsknoten bietet SparkSessionBuilder Zugriff auf die SparkSession , die auf dem Treiber erstellt wurde. Um einen solchen Zugriff zu ermöglichen, muss SparkSessionBuilder serialisiert und ein vorübergehender Lazy Val verwendet werden , wodurch das Serialisierungssystem conf- und spark- Objekte ignorieren kann , wenn das Programm initialisiert wird und bis auf die Objekte zugegriffen wird. Wenn das buildSparkSession- Programm gestartet wird , wird es serialisiert und an jeden Arbeitsknoten gesendet. Conf- und Spark- Objekte sind jedoch nur zulässig, wenn der Arbeitsknoten auf sie zugreift.

Schauen wir uns nun den Hauptanwendungscode an:

 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 

Wenn die Anwendung zur Ausführung gesendet wird , wird buildSparkSession serialisiert und an die Arbeitsknoten gesendet. Conf- und Spark- Objekte bleiben jedoch ungelöst. Anschließend erstellt der Treiber ein Funkenobjekt in KafkaToCassandra und verteilt die Arbeit auf die Arbeitsknoten. Jeder Arbeitsknoten liest Daten aus Kafka, führt einfache Transformationen für den empfangenen Teil der Datensätze durch. Wenn der Arbeitsknoten bereit ist, die Ergebnisse in die Datenbank zu schreiben, ermöglicht er Conf- und Spark- Objekte und erhält so Zugriff auf die auf dem Treiber erstellte SparkSession .

Wie erstelle ich die Anwendung und führe sie aus?


Als ich von PySpark zu Scala wechselte, brauchte ich eine Weile, um herauszufinden, wie die Anwendung erstellt werden sollte. Deshalb habe ich Maven pom.xml in mein Projekt aufgenommen. Der Reader kann die Anwendung mit Maven erstellen, indem er den Befehl mvn package ausführt . Nachdem die Anwendung zur Ausführung gesendet werden kann mit

 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 

Um die Anwendung zu erstellen und auszuführen, müssen Sie die Namen meiner AWS-Computer durch Ihre eigenen ersetzen (d. H. Alles ersetzen, was wie ec2-xx-xxx-xx-xx.compute-1.amazonaws.com aussieht).

Insbesondere Spark und Structured Streaming ist für mich ein neues Thema, daher bin ich den Lesern für Kommentare, Diskussionen und Korrekturen sehr dankbar.

Source: https://habr.com/ru/post/de425503/


All Articles