Cassandra Sink untuk Streaming Terstruktur Spark

Beberapa bulan yang lalu saya mulai mempelajari Spark, dan pada titik tertentu saya dihadapkan dengan masalah penghematan perhitungan Streaming Terstruktur dalam database Cassandra.

Dalam posting ini, saya memberikan contoh sederhana tentang membuat dan menggunakan Cassandra Sink untuk Spark Structured Streaming. Saya berharap posting ini akan bermanfaat bagi mereka yang baru saja mulai bekerja dengan Spark Structured Streaming dan bertanya-tanya bagaimana cara mengunggah hasil perhitungan ke database.

Gagasan aplikasi ini sangat sederhana - untuk menerima dan mem-parsing pesan dari Kafka, melakukan transformasi sederhana berpasangan dan menyimpan hasilnya dalam cassandra.

Kelebihan Streaming Terstruktur


Anda dapat membaca lebih lanjut tentang Streaming Terstruktur dalam dokumentasi . Singkatnya, Structured Streaming adalah mesin pengolah informasi streaming berskala baik yang didasarkan pada mesin Spark SQL. Hal ini memungkinkan Anda untuk menggunakan Dataset / DataFrame untuk mengumpulkan data, menghitung fungsi jendela, koneksi, dll. Artinya, Streaming Terstruktur memungkinkan Anda untuk menggunakan SQL lama yang baik untuk bekerja dengan aliran data.

Apa masalahnya?


Rilis stabil Spark Structured Streaming dirilis pada 2017. Artinya, ini adalah API yang cukup baru yang mengimplementasikan fungsionalitas dasar, tetapi beberapa hal harus dilakukan sendiri. Misalnya, Streaming Terstruktur memiliki fungsi standar untuk menulis output ke file, ubin, konsol atau memori, tetapi untuk menyimpan data ke database, Anda harus menggunakan penerima foreach yang tersedia di Structured Streaming dan mengimplementasikan antarmuka ForeachWriter . Dimulai dengan Spark 2.3.1, fungsi ini hanya dapat diimplementasikan di Scala dan Java .

Saya berasumsi bahwa pembaca sudah tahu bagaimana Structured Streaming bekerja secara umum, tahu bagaimana menerapkan transformasi yang diperlukan dan sekarang siap untuk mengunggah hasilnya ke database. Jika beberapa langkah di atas tidak jelas, dokumentasi resmi dapat berfungsi sebagai titik awal yang baik dalam mempelajari Streaming Terstruktur. Pada artikel ini, saya ingin fokus pada langkah terakhir ketika Anda perlu menyimpan hasilnya dalam database.

Di bawah ini, saya akan menjelaskan contoh penerapan wastafel Cassandra untuk Structured Streaming dan menjelaskan cara menjalankannya dalam sebuah cluster. Kode lengkap tersedia di sini .

Ketika saya pertama kali menemukan masalah di atas, proyek ini ternyata sangat berguna. Namun, mungkin tampak sedikit rumit jika pembaca baru saja mulai bekerja dengan Structured Streaming dan sedang mencari contoh sederhana tentang cara mengunggah data ke cassandra. Selain itu, proyek ditulis untuk bekerja dalam mode lokal dan memerlukan beberapa perubahan untuk dijalankan di cluster.

Saya juga ingin memberikan contoh cara menyimpan data ke MongoDB dan basis data lainnya menggunakan JDBC .

Solusi sederhana


Untuk mengunggah data ke sistem eksternal, Anda harus menggunakan penerima di muka . Baca lebih lanjut tentang ini di sini . Singkatnya, antarmuka ForeachWriter harus diimplementasikan. Artinya, perlu untuk menentukan cara membuka koneksi, cara memproses setiap bagian data dan cara menutup koneksi pada akhir pemrosesan. Kode sumber adalah sebagai berikut:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 

Definisi CassandraDriver dan struktur tabel output akan saya jelaskan nanti, tetapi untuk sekarang, mari kita lihat lebih dekat bagaimana cara kerja kode di atas. Untuk terhubung ke Kasandra dari Spark, saya membuat objek CassandraDriver yang menyediakan akses ke CassandraConnector , konektor yang dikembangkan oleh DataStax . CassandraConnector bertanggung jawab untuk membuka dan menutup koneksi ke database, jadi saya hanya menampilkan pesan debug pada metode buka dan tutup dari kelas CassandraSinkForeach .

Kode di atas dipanggil dari aplikasi utama sebagai berikut:

 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 

CassandraSinkForeach dibuat untuk setiap baris data, sehingga setiap node yang bekerja memasukkan bagian dari baris ke dalam database. Artinya, setiap simpul yang bekerja mengeksekusi val cassandraDriver = new CassandraDriver (); Seperti inilah bentuk CassandraDriver:

 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 

Mari kita melihat lebih dekat pada objek percikan . Kode untuk SparkSessionBuilder adalah sebagai berikut:

 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 

Pada setiap node yang berfungsi, SparkSessionBuilder menyediakan akses ke SparkSession yang dibuat pada driver. Untuk memungkinkan akses seperti itu, perlu untuk membuat serial SparkSessionBuilder dan menggunakan transient lazy val , yang memungkinkan sistem serialisasi untuk mengabaikan conf dan spark objek ketika program diinisialisasi dan sampai objek diakses. Jadi, ketika program buildSparkSession dimulai, ia diserialisasi dan dikirim ke setiap simpul yang berfungsi, tetapi objek conf dan spark hanya diizinkan ketika simpul yang bekerja mengaksesnya.

Sekarang mari kita lihat kode aplikasi utama:

 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 

Ketika aplikasi dikirim untuk dieksekusi, buildSparkSession adalah serial dan dikirim ke node yang bekerja, namun, conf dan spark objek tetap tidak terselesaikan. Kemudian pengemudi membuat objek percikan di dalam KafkaToCassandra dan mendistribusikan pekerjaan antara node yang bekerja. Setiap node yang bekerja membaca data dari Kafka, membuat transformasi sederhana pada bagian yang diterima dari catatan, dan ketika node yang bekerja siap untuk menulis hasilnya ke database, itu memungkinkan conf dan spark objek, sehingga mendapatkan akses ke SparkSession yang dibuat pada driver.

Bagaimana cara membangun dan menjalankan aplikasi?


Ketika saya pindah dari PySpark ke Scala, butuh beberapa saat untuk mencari cara membangun aplikasi. Karena itu, saya memasukkan Maven pom.xml dalam proyek saya. Pembaca dapat membangun aplikasi menggunakan Maven dengan menjalankan perintah paket mvn . Setelah aplikasi dapat dikirim untuk dieksekusi menggunakan

 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 

Untuk membangun dan menjalankan aplikasi, perlu mengganti nama mesin AWS saya dengan milik Anda (mis. Ganti semua yang tampak seperti ec2-xx-xxx-xx-xx.xx.compute-1.amazonaws.com).

Spark dan Structured Streaming pada khususnya adalah topik baru bagi saya, jadi saya akan sangat berterima kasih kepada pembaca atas komentar, diskusi dan koreksi.

Source: https://habr.com/ru/post/id425503/


All Articles