Beberapa bulan yang lalu saya mulai mempelajari Spark, dan pada titik tertentu saya dihadapkan dengan masalah penghematan perhitungan Streaming Terstruktur dalam database Cassandra.
Dalam posting ini, saya memberikan contoh sederhana tentang membuat dan menggunakan Cassandra Sink untuk Spark Structured Streaming. Saya berharap posting ini akan bermanfaat bagi mereka yang baru saja mulai bekerja dengan Spark Structured Streaming dan bertanya-tanya bagaimana cara mengunggah hasil perhitungan ke database.
Gagasan aplikasi ini sangat sederhana - untuk menerima dan mem-parsing pesan dari Kafka, melakukan transformasi sederhana berpasangan dan menyimpan hasilnya dalam cassandra.
Kelebihan Streaming Terstruktur
Anda dapat membaca lebih lanjut tentang Streaming Terstruktur dalam
dokumentasi . Singkatnya, Structured Streaming adalah mesin pengolah informasi streaming berskala baik yang didasarkan pada mesin Spark SQL. Hal ini memungkinkan Anda untuk menggunakan Dataset / DataFrame untuk mengumpulkan data, menghitung fungsi jendela, koneksi, dll. Artinya, Streaming Terstruktur memungkinkan Anda untuk menggunakan SQL lama yang baik untuk bekerja dengan aliran data.
Apa masalahnya?
Rilis stabil Spark Structured Streaming dirilis pada 2017. Artinya, ini adalah API yang cukup baru yang mengimplementasikan fungsionalitas dasar, tetapi beberapa hal harus dilakukan sendiri. Misalnya, Streaming Terstruktur memiliki fungsi standar untuk menulis output ke file, ubin, konsol atau memori, tetapi untuk menyimpan data ke database, Anda harus menggunakan penerima
foreach yang tersedia di Structured Streaming dan mengimplementasikan antarmuka
ForeachWriter .
Dimulai dengan Spark 2.3.1, fungsi ini hanya dapat diimplementasikan di Scala dan Java .
Saya berasumsi bahwa pembaca sudah tahu bagaimana Structured Streaming bekerja secara umum, tahu bagaimana menerapkan transformasi yang diperlukan dan sekarang siap untuk mengunggah hasilnya ke database. Jika beberapa langkah di atas tidak jelas, dokumentasi resmi dapat berfungsi sebagai titik awal yang baik dalam mempelajari Streaming Terstruktur. Pada artikel ini, saya ingin fokus pada langkah terakhir ketika Anda perlu menyimpan hasilnya dalam database.
Di bawah ini, saya akan menjelaskan contoh penerapan wastafel Cassandra untuk Structured Streaming dan menjelaskan cara menjalankannya dalam sebuah cluster. Kode lengkap tersedia di
sini .
Ketika saya pertama kali menemukan masalah di atas,
proyek ini ternyata sangat berguna. Namun, mungkin tampak sedikit rumit jika pembaca baru saja mulai bekerja dengan Structured Streaming dan sedang mencari contoh sederhana tentang cara mengunggah data ke cassandra. Selain itu, proyek ditulis untuk bekerja dalam mode lokal dan memerlukan beberapa perubahan untuk dijalankan di cluster.
Saya juga ingin memberikan contoh cara menyimpan data ke
MongoDB dan basis data lainnya menggunakan
JDBC .
Solusi sederhana
Untuk mengunggah data ke sistem eksternal, Anda harus menggunakan penerima di
muka . Baca lebih lanjut tentang ini di
sini . Singkatnya, antarmuka
ForeachWriter harus diimplementasikan. Artinya, perlu untuk menentukan cara membuka koneksi, cara memproses setiap bagian data dan cara menutup koneksi pada akhir pemrosesan. Kode sumber adalah sebagai berikut:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
Definisi
CassandraDriver dan struktur tabel output akan saya jelaskan nanti, tetapi untuk sekarang, mari kita lihat lebih dekat bagaimana cara kerja kode di atas. Untuk terhubung ke Kasandra dari Spark, saya membuat objek
CassandraDriver yang menyediakan akses ke
CassandraConnector , konektor yang dikembangkan oleh
DataStax . CassandraConnector bertanggung jawab untuk membuka dan menutup koneksi ke database, jadi saya hanya menampilkan pesan debug pada metode
buka dan
tutup dari kelas
CassandraSinkForeach .
Kode di atas dipanggil dari aplikasi utama sebagai berikut:
val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start()
CassandraSinkForeach dibuat untuk setiap baris data, sehingga setiap node yang bekerja memasukkan bagian dari baris ke dalam database. Artinya, setiap simpul yang bekerja mengeksekusi
val cassandraDriver = new CassandraDriver (); Seperti inilah bentuk CassandraDriver:
class CassandraDriver extends SparkSessionBuilder {
Mari kita melihat lebih dekat pada objek
percikan . Kode untuk
SparkSessionBuilder adalah sebagai berikut:
class SparkSessionBuilder extends Serializable {
Pada setiap node yang berfungsi,
SparkSessionBuilder menyediakan akses ke
SparkSession yang dibuat pada driver. Untuk memungkinkan akses seperti itu, perlu untuk membuat serial
SparkSessionBuilder dan menggunakan
transient lazy val , yang memungkinkan sistem serialisasi untuk mengabaikan
conf dan
spark objek ketika program diinisialisasi dan sampai objek diakses. Jadi, ketika program
buildSparkSession dimulai, ia diserialisasi dan dikirim ke setiap simpul yang berfungsi, tetapi objek
conf dan
spark hanya diizinkan ketika simpul yang bekerja mengaksesnya.
Sekarang mari kita lihat kode aplikasi utama:
object KafkaToCassandra extends SparkSessionBuilder {
Ketika aplikasi dikirim untuk dieksekusi,
buildSparkSession adalah serial dan dikirim ke node yang bekerja, namun,
conf dan
spark objek tetap tidak terselesaikan. Kemudian pengemudi membuat objek percikan di dalam
KafkaToCassandra dan mendistribusikan pekerjaan antara node yang bekerja. Setiap node yang bekerja membaca data dari Kafka, membuat transformasi sederhana pada bagian yang diterima dari catatan, dan ketika node yang bekerja siap untuk menulis hasilnya ke database, itu memungkinkan
conf dan
spark objek, sehingga mendapatkan akses ke
SparkSession yang dibuat pada driver.
Bagaimana cara membangun dan menjalankan aplikasi?
Ketika saya pindah dari PySpark ke Scala, butuh beberapa saat untuk mencari cara membangun aplikasi. Karena itu, saya memasukkan Maven
pom.xml dalam proyek saya. Pembaca dapat membangun aplikasi menggunakan Maven dengan menjalankan perintah
paket mvn . Setelah aplikasi dapat dikirim untuk dieksekusi menggunakan
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
Untuk membangun dan menjalankan aplikasi, perlu mengganti nama mesin AWS saya dengan milik Anda (mis. Ganti semua yang tampak seperti ec2-xx-xxx-xx-xx.xx.compute-1.amazonaws.com).
Spark dan Structured Streaming pada khususnya adalah topik baru bagi saya, jadi saya akan sangat berterima kasih kepada pembaca atas komentar, diskusi dan koreksi.