Apache Kafka dan Streaming dengan Spark Streaming

Halo, Habr! Hari ini kita akan membangun sistem yang akan menggunakan Apark Kafka untuk memproses aliran pesan menggunakan Spark Streaming dan menulis hasil pemrosesan ke basis data cloud AWS RDS.

Bayangkan bahwa lembaga kredit tertentu telah menetapkan tugas memproses transaksi masuk dengan cepat di semua cabangnya. Ini dapat dilakukan untuk menghitung dengan cepat posisi mata uang terbuka untuk Perbendaharaan, batasan atau hasil keuangan pada transaksi, dll.

Bagaimana menerapkan kasus ini tanpa menggunakan sihir dan mantra sihir - kita baca di bawah! Ayo pergi!


(Sumber gambar)

Pendahuluan


Tentu saja, memproses array data yang besar secara real time memberikan banyak peluang untuk digunakan dalam sistem modern. Salah satu kombinasi paling populer untuk ini adalah Apache Kafka dan Spark Streaming tandem, di mana Kafka menciptakan aliran paket pesan masuk, dan Spark Streaming memproses paket-paket ini pada interval waktu tertentu.

Untuk meningkatkan toleransi kesalahan aplikasi, kami akan menggunakan pos pemeriksaan - pos pemeriksaan. Menggunakan mekanisme ini, ketika modul Spark Streaming perlu memulihkan data yang hilang, hanya perlu kembali ke titik kontrol terakhir dan melanjutkan perhitungan dari itu.

Arsitektur sistem dalam pengembangan




Komponen yang Digunakan:

  • Apache Kafka adalah sistem pesan terdistribusi dengan menerbitkan dan berlangganan. Cocok untuk konsumsi pesan offline dan online. Untuk mencegah kehilangan data, pesan Kafka disimpan di disk dan direplikasi di dalam cluster. Sistem Kafka dibangun di atas layanan sinkronisasi ZooKeeper;
  • Apache Spark Streaming - Komponen Spark untuk memproses data streaming. Modul Spark Streaming dibangun menggunakan arsitektur mikro-batch, ketika aliran data ditafsirkan sebagai urutan berkelanjutan dari paket data kecil. Spark Streaming menerima data dari berbagai sumber dan menggabungkannya ke dalam paket kecil. Paket-paket baru dibuat secara berkala. Pada awal setiap interval waktu, paket baru dibuat, dan data apa pun yang diterima selama interval ini termasuk dalam paket. Pada akhir interval, pertumbuhan paket berhenti. Ukuran interval ditentukan oleh parameter yang disebut interval batch;
  • Apache Spark SQL - Menggabungkan pemrosesan relasional dengan pemrograman fungsional Spark. Data terstruktur mengacu pada data yang memiliki skema, yaitu, satu set bidang untuk semua catatan. Spark SQL mendukung input dari berbagai sumber data terstruktur dan, berkat ketersediaan informasi tentang skema tersebut, Spark secara efisien hanya dapat mengambil bidang rekaman yang diperlukan, dan juga menyediakan API DataFrame;
  • AWS RDS adalah basis data relasional berbasis cloud yang relatif murah, layanan web yang menyederhanakan konfigurasi, operasi, dan penskalaan, dan secara langsung dikelola oleh Amazon.

Instal dan mulai server Kafka


Sebelum menggunakan Kafka secara langsung, Anda perlu memastikan Java tersedia, sebagai JVM digunakan untuk bekerja:

sudo apt-get update sudo apt-get install default-jre java -version 

Buat pengguna baru untuk bekerja dengan Kafka:

 sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo 

Selanjutnya, unduh distribusi dari situs web resmi Apache Kafka:

 wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz" 

Buka paket arsip yang diunduh:
 tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka 

Langkah selanjutnya adalah opsional. Faktanya adalah bahwa pengaturan default tidak memungkinkan penggunaan penuh semua fitur dari Apache Kafka. Misalnya, hapus topik, kategori, grup yang pesannya dapat dipublikasikan. Untuk mengubah ini, edit file konfigurasi:

 vim ~/kafka/config/server.properties 

Tambahkan berikut ini ke akhir file:

 delete.topic.enable = true 

Sebelum memulai server Kafka, Anda perlu memulai server ZooKeeper, kami akan menggunakan skrip bantu yang datang dengan distribusi Kafka:

 Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties 

Setelah ZooKeeper berhasil dimulai, di terminal terpisah kami meluncurkan server Kafka:

 bin/kafka-server-start.sh config/server.properties 

Buat topik baru yang disebut Transaction:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction 

Pastikan topik dengan jumlah partisi dan replikasi yang tepat telah dibuat:

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 



Kami akan kehilangan momen menguji produsen dan konsumen untuk topik yang baru dibuat. Untuk detail lebih lanjut tentang cara menguji pengiriman dan penerimaan pesan, lihat dokumentasi resmi - Kirim beberapa pesan . Baiklah, kami beralih ke menulis produser dengan Python menggunakan API KafkaProducer.

Penulisan Produser


Produser akan menghasilkan data acak - 100 pesan setiap detik. Dengan data acak yang kami maksud adalah kamus yang terdiri dari tiga bidang:

  • Cabang - nama tempat penjualan lembaga kredit;
  • Mata uang - mata uang transaksi;
  • Jumlah - jumlah transaksi. Jumlah tersebut akan menjadi angka positif jika itu adalah pembelian mata uang oleh Bank, dan negatif jika itu adalah penjualan.

Kode untuk produsen adalah sebagai berikut:

 from numpy.random import choice, randint def get_random_value(): new_dict = {} branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict 

Selanjutnya, menggunakan metode kirim, kami mengirim pesan ke server, dalam topik yang kami butuhkan, dalam format JSON:

 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: \ {}, partition: {}, offset: {}' \ .format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: {}'.format(e)) finally: producer.flush() 

Saat menjalankan skrip, kami menerima pesan berikut di terminal:


Ini berarti bahwa semuanya berfungsi seperti yang kita inginkan - produser menghasilkan dan mengirim pesan ke topik yang kita butuhkan.

Langkah selanjutnya adalah menginstal Spark dan memproses aliran pesan ini.

Instal Apache Spark


Apache Spark adalah platform komputasi cluster serbaguna dan berkinerja tinggi.

Dalam hal kinerja, Spark melampaui implementasi populer dari model MapReduce, secara bersamaan memberikan dukungan untuk jenis perhitungan yang lebih luas, termasuk kueri interaktif dan pemrosesan aliran. Kecepatan memainkan peran penting dalam memproses data dalam jumlah besar, karena kecepatan itulah yang memungkinkan Anda untuk bekerja secara interaktif tanpa menghabiskan beberapa menit atau berjam-jam menunggu. Salah satu kekuatan terbesar Spark pada kecepatan tinggi adalah kemampuannya untuk melakukan perhitungan dalam memori.

Kerangka kerja ini ditulis dalam Scala, jadi Anda harus menginstalnya terlebih dahulu:

 sudo apt-get install scala 

Unduh distribusi Spark dari situs web resmi:

 wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz" 

Buka paket arsip:

 sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark 

Tambahkan path ke Spark di file bash:

 vim ~/.bashrc 

Tambahkan baris berikut melalui editor:

 SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH 

Jalankan perintah di bawah ini setelah membuat perubahan ke bashrc:

 source ~/.bashrc 

Penerapan AWS PostgreSQL


Tetap menyebarkan database, tempat kami akan mengunggah informasi yang diproses dari stream. Untuk ini, kami akan menggunakan layanan AWS RDS.

Pergi ke konsol AWS -> AWS RDS -> Databases -> Buat database:


Pilih PostgreSQL dan klik tombol Berikutnya:


Karena Contoh ini dipahami hanya untuk tujuan pendidikan, kami akan menggunakan server gratis "minimal" (Tier Gratis):


Selanjutnya, beri tanda centang di blok Free Tier, dan setelah itu kami akan secara otomatis ditawari turunan dari kelas t2.micro - meskipun lemah, ini gratis dan cukup cocok untuk tugas kami:

Hal-hal yang sangat penting berikut: nama instance database, nama pengguna master dan kata sandinya. Sebutkan contohnya: myHabrTest, pengguna master: habr , kata sandi: habr12345 dan klik tombol Next:



Halaman berikutnya berisi parameter yang bertanggung jawab atas ketersediaan server database kami dari luar (Aksesibilitas publik) dan ketersediaan port:


Mari kita buat konfigurasi baru untuk grup keamanan VPC, yang akan memungkinkan kita untuk mengakses server basis data kita dari luar melalui port 5432 (PostgreSQL).

Di jendela browser terpisah, buka konsol AWS di Dasbor VPC -> Grup Keamanan -> Buat bagian grup keamanan:

Tetapkan nama untuk grup Keamanan - PostgreSQL, deskripsi, menunjukkan VPC mana yang harus dikaitkan dengan grup ini dan klik tombol Buat:


Isi grup aturan masuk yang baru dibuat untuk port 5432, seperti yang ditunjukkan pada gambar di bawah ini. Anda tidak harus menentukan porta manual, tetapi pilih PostgreSQL dari daftar drop-down Type.

Sebenarnya, nilai :: / 0 berarti ketersediaan lalu lintas masuk untuk server dari seluruh dunia, yang secara kanonik tidak sepenuhnya benar, tetapi untuk menguraikan contoh, mari kita gunakan pendekatan ini:


Kami kembali ke halaman browser, di mana kami memiliki "Konfigurasi pengaturan lanjutan" terbuka dan pilih di bagian Grup keamanan VPC -> Pilih grup keamanan VPC yang ada -> PostgreSQL:


Selanjutnya, di bagian Opsi basis data -> Nama basis data -> atur nama - habrDB .

Kami dapat membiarkan parameter lainnya, dengan pengecualian menonaktifkan cadangan (periode penyimpanan cadangan - 0 hari), pemantauan, dan Wawasan Kinerja, secara default. Klik pada tombol Create database :


Penangan arus


Langkah terakhir adalah pengembangan Spark-jobs, yang akan memproses setiap dua detik data baru yang berasal dari Kafka dan memasukkan hasilnya ke dalam basis data.

Seperti disebutkan di atas, pos-pos pemeriksaan adalah mekanisme utama dalam SparkStreaming yang harus dikonfigurasi untuk memberikan toleransi kesalahan. Kami akan menggunakan titik kontrol dan, jika terjadi penurunan prosedur, modul Spark Streaming hanya perlu kembali ke titik kontrol terakhir dan melanjutkan perhitungan darinya untuk mengembalikan data yang hilang.

Anda dapat mengaktifkan pos pemeriksaan dengan mengatur direktori dalam sistem file yang toleran terhadap kesalahan, andal (misalnya, HDFS, S3, dll.), Di mana informasi pos pemeriksaan akan disimpan. Ini dilakukan dengan menggunakan, misalnya:

 streamingContext.checkpoint(checkpointDirectory) 

Dalam contoh kita, kita akan menggunakan pendekatan berikut, yaitu, jika checkpointDirectory ada, maka konteksnya akan dibuat kembali dari data titik kontrol. Jika direktori tidak ada (mis. Itu dijalankan untuk pertama kali), fungsi functionToCreateContext dipanggil untuk membuat konteks baru dan mengonfigurasi DStreams:

 from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) 

Buat objek DirectStream untuk terhubung ke topik "transaksi" menggunakan metode createDirectStream dari pustaka KafkaUtils:

 from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list}) 

Parsing data yang masuk dalam format JSON:

 rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream") 

Menggunakan Spark SQL, kami membuat pengelompokan sederhana dan mencetak hasilnya ke konsol:

 select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency 

Mendapatkan teks kueri dan menjalankannya melalui Spark SQL:

 sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5) 

Dan kemudian kami menyimpan data agregat yang diterima ke dalam tabel di AWS RDS. Untuk menyimpan hasil agregasi ke tabel database, kami akan menggunakan metode tulis objek DataFrame:

 testResultDataFrame.write \ .format("jdbc") \ .mode("append") \ .option("driver", 'org.postgresql.Driver') \ .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") \ .option("dbtable", "transaction_flow") \ .option("user", "habr") \ .option("password", "habr12345") \ .save() 

Beberapa kata tentang pengaturan koneksi ke AWS RDS. Kami menciptakan pengguna dan kata sandi untuk itu pada langkah "Menyebarkan AWS PostgreSQL". Untuk url server database, gunakan Endpoint, yang ditampilkan di bagian Konektivitas & keamanan:


Untuk menghubungkan Spark dan Kafka dengan benar, Anda harus menjalankan pekerjaan melalui smark-submit menggunakan artefak spark-streaming-kafka-0-8_2.11 . Selain itu, kami juga menerapkan artefak untuk berinteraksi dengan database PostgreSQL, kami akan mentransfernya melalui paket -.

Untuk fleksibilitas skrip, kami juga mengambil nama server pesan dan topik dari mana kami ingin menerima data sebagai parameter input.

Jadi, saatnya untuk memulai dan menguji sistem:

 spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\ org.postgresql:postgresql:9.4.1207 \ spark_job.py localhost:9092 transaction 

Semuanya berhasil! Seperti yang dapat Anda lihat dalam gambar di bawah ini, selama pekerjaan aplikasi, hasil agregasi baru ditampilkan setiap 2 detik, karena kami menetapkan interval batching menjadi 2 detik ketika membuat objek StreamingContext:


Selanjutnya, kami membuat kueri sederhana ke database untuk memeriksa catatan di tabel transaction_flow :


Kesimpulan


Artikel ini membahas contoh pemrosesan informasi streaming menggunakan Spark Streaming bersamaan dengan Apache Kafka dan PostgreSQL. Dengan pertumbuhan data dari berbagai sumber, sulit untuk melebih-lebihkan nilai praktis dari Spark Streaming untuk membuat aplikasi streaming dan aplikasi yang beroperasi secara real time.

Anda dapat menemukan kode sumber lengkap di repositori saya di GitHub .

Saya siap untuk membahas artikel ini dengan senang hati, saya menantikan komentar Anda, dan saya juga berharap untuk kritik yang membangun dari semua pembaca yang peduli.

Semoga sukses!

PS Awalnya direncanakan menggunakan database PostgreSQL lokal, tetapi karena saya mencintai AWS, saya memutuskan untuk meletakkan database di cloud. Pada artikel selanjutnya tentang topik ini, saya akan menunjukkan bagaimana menerapkan seluruh sistem yang dijelaskan di atas dalam AWS menggunakan AWS Kinesis dan AWS EMR. Ikuti beritanya!

Source: https://habr.com/ru/post/id451160/


All Articles