Klasifikasi sejumlah besar data pada Apache Spark menggunakan model pembelajaran mesin sewenang-wenang

Bagian 1: Pernyataan Masalah


Halo, Habr! Saya seorang arsitek solusi di CleverDATA. Hari ini saya akan berbicara tentang bagaimana kita mengklasifikasikan data dalam jumlah besar menggunakan model yang dibangun menggunakan hampir semua perpustakaan pembelajaran mesin yang tersedia. Dalam seri dua bagian ini, kami akan mempertimbangkan pertanyaan-pertanyaan berikut.

  • Bagaimana cara menyajikan model pembelajaran mesin sebagai layanan (Model as a Service)?
  • Bagaimana tugas pemrosesan terdistribusi data dalam jumlah besar dilakukan secara fisik menggunakan Apache Spark?
  • Masalah apa yang muncul ketika Apache Spark berinteraksi dengan layanan eksternal?
  • Bagaimana interaksi Apache Spark dengan layanan eksternal diorganisasikan menggunakan pustaka akka-stream dan akka-http, serta pendekatan Reactive Streams?

Awalnya, saya berencana untuk menulis satu artikel, tetapi karena volume materi ternyata cukup besar, saya memutuskan untuk memecahnya menjadi dua bagian. Hari ini di bagian pertama kita akan mempertimbangkan pernyataan umum masalah, serta masalah utama yang perlu diselesaikan selama implementasi. Pada bagian kedua, kita akan berbicara tentang implementasi praktis dari solusi untuk masalah ini menggunakan pendekatan Reactive Streams.


Perusahaan kami CleverDATA memiliki tim analis data yang, dengan bantuan berbagai alat (seperti scikit-learn, facebook fastText, xgboost, tensorFlow, dll.), Melatih model pembelajaran mesin. Bahasa pemrograman inti de facto yang digunakan analis adalah Python. Hampir semua perpustakaan untuk pembelajaran mesin, bahkan awalnya diimplementasikan dalam bahasa lain, memiliki antarmuka Python dan terintegrasi dengan perpustakaan Python utama (terutama dengan NumPy).

Di sisi lain, ekosistem Hadoop banyak digunakan untuk menyimpan dan memproses sejumlah besar data tidak terstruktur. Di dalamnya, data disimpan pada sistem file HDFS dalam bentuk blok replikasi terdistribusi dengan ukuran tertentu (biasanya 128 MB, tetapi dimungkinkan untuk dikonfigurasi). Algoritma pemrosesan data terdistribusi yang paling efisien mencoba meminimalkan interaksi jaringan antara mesin cluster. Untuk melakukan ini, data harus diproses pada mesin yang sama di mana mereka disimpan.

Tentu saja, dalam banyak kasus, interaksi jaringan tidak dapat sepenuhnya dihindari, tetapi, bagaimanapun, Anda perlu mencoba untuk melakukan semua tugas secara lokal dan meminimalkan jumlah data yang perlu dikirim melalui jaringan.

Prinsip pemrosesan data terdistribusi ini disebut "pindahkan perhitungan dekat dengan data". Semua kerangka kerja utama, terutama Hadoop MapReduce dan Apache Spark, mematuhi prinsip ini. Mereka menentukan komposisi dan urutan operasi spesifik yang perlu dijalankan pada mesin di mana blok data yang diperlukan disimpan.

Gambar 1. Cluster HDFS terdiri dari beberapa mesin, salah satunya adalah Name Node, dan sisanya adalah Data Node. Name Node menyimpan informasi tentang file yang membentuk blok mereka, dan tentang mesin di mana mereka berada secara fisik. Blok itu sendiri disimpan di Data Node, yang direplikasi ke beberapa mesin untuk meningkatkan keandalan. Data Node juga menjalankan tugas pemrosesan data. Tugas terdiri dari proses utama (Master, M), yang mengoordinasikan peluncuran proses kerja (Pekerja, W) pada mesin di mana blok data yang diperlukan disimpan.

Hampir semua komponen ekosistem Hadoop diluncurkan menggunakan Java Virtual Machine (JVM) dan terintegrasi erat satu sama lain. Misalnya, untuk menjalankan tugas yang ditulis menggunakan Apache Spark untuk bekerja dengan data yang disimpan pada HDFS, hampir tidak ada manipulasi tambahan yang diperlukan: kerangka kerja ini menyediakan fungsionalitas ini di luar kotak.

Sayangnya, sebagian besar perpustakaan yang dirancang untuk pembelajaran mesin menganggap bahwa data disimpan dan diproses secara lokal. Pada saat yang sama, ada perpustakaan yang terintegrasi erat dengan ekosistem Hadoop, misalnya, Spark ML atau Apache Mahout. Namun, mereka memiliki sejumlah kelemahan signifikan. Pertama, mereka menyediakan implementasi algoritma pembelajaran mesin yang jauh lebih sedikit. Kedua, tidak semua analis data dapat bekerja dengannya. Keuntungan dari perpustakaan ini termasuk fakta bahwa mereka dapat digunakan untuk melatih model pada volume data yang besar menggunakan komputasi terdistribusi.

Namun, analis data sering menggunakan metode alternatif untuk melatih model, khususnya perpustakaan yang memungkinkan penggunaan GPU. Saya tidak akan mempertimbangkan masalah model pelatihan dalam artikel ini, karena saya ingin fokus pada penggunaan model yang sudah jadi yang dibangun menggunakan perpustakaan pembelajaran mesin yang tersedia untuk mengklasifikasikan sejumlah besar data.

Jadi, tugas utama yang kami coba selesaikan di sini adalah untuk menerapkan model pembelajaran mesin pada sejumlah besar data yang disimpan di HDFS. Jika kita dapat menggunakan modul SparkML dari perpustakaan Apache Spark, yang mengimplementasikan algoritma pembelajaran mesin dasar, maka mengklasifikasikan sejumlah besar data akan menjadi tugas yang sepele:

val model: LogisticRegressionModel = LogisticRegressionModel.load("/path/to/model") val dataset = spark.read.parquet("/path/to/data") val result = model.transform(dataset) 

Sayangnya, pendekatan ini hanya berfungsi untuk algoritma yang diterapkan dalam modul SparkML (daftar lengkap dapat ditemukan di sini ). Dalam hal menggunakan perpustakaan lain, apalagi, diimplementasikan bukan pada JVM, semuanya menjadi jauh lebih rumit.

Untuk mengatasi masalah ini, kami memutuskan untuk membungkus model dalam layanan REST. Karena itu, ketika memulai tugas mengklasifikasikan data yang disimpan pada HDFS, perlu untuk mengatur interaksi antara mesin tempat data disimpan dan mesin (atau kelompok mesin) tempat layanan klasifikasi berjalan.

Gambar 2. Konsep Model sebagai Layanan

Deskripsi layanan klasifikasi python


Untuk menyajikan model sebagai layanan, perlu untuk menyelesaikan tugas-tugas berikut:

  1. menerapkan akses efisien ke model melalui HTTP;
  2. memastikan penggunaan sumber daya mesin yang paling efisien (terutama semua inti prosesor dan memori);
  3. memberikan ketahanan terhadap beban tinggi;
  4. memberikan kemampuan untuk skala secara horizontal.

Akses ke model melalui HTTP cukup sederhana untuk diterapkan: sejumlah besar perpustakaan telah dikembangkan untuk Python yang memungkinkan Anda untuk menerapkan titik akses REST menggunakan sejumlah kecil kode. Salah satu dari microframes ini adalah Flask . Implementasi layanan klasifikasi pada Flask adalah sebagai berikut:

 from flask import Flask, request, Response model = load_model() n_features = 100 app = Flask(__name__) @app.route("/score", methods=['PUT']) def score(): inp = np.frombuffer(request.data, dtype='float32').reshape(-1, n_features) result = model.predict(inp) return Response(result.tobytes(), mimetype='application/octet-stream') if __name__ == "__main__": app.run() 

Di sini, ketika layanan dimulai, kami memuat model ke dalam memori, dan kemudian menggunakannya saat memanggil metode klasifikasi. Fungsi load_model memuat model dari beberapa sumber eksternal, baik itu sistem file, penyimpanan nilai kunci, dll.

Model adalah objek yang memiliki metode prediksi. Dalam kasus klasifikasi, dibutuhkan input ke beberapa vektor fitur dengan ukuran tertentu dan menghasilkan nilai Boolean yang menunjukkan apakah vektor yang ditentukan cocok untuk model ini, atau beberapa nilai dari 0 hingga 1, yang kemudian dapat Anda terapkan ambang batas cutoff: semuanya di atas ambang batas, adalah hasil positif dari klasifikasi, sisanya tidak.

Vektor fitur yang kita perlu mengklasifikasikan dilewatkan dalam bentuk biner dan deserialized ke array numpy. Ini akan menjadi overhead untuk membuat permintaan HTTP untuk setiap vektor. Misalnya, dalam kasus vektor 100 dimensi dan menggunakan untuk nilai tipe float32, permintaan HTTP lengkap, termasuk header, akan terlihat seperti ini:

 PUT /score HTTP/1.1 Host: score-node-1:8099 User-Agent: curl/7.58.0 Accept: */* Content-Type: application/binary Content-Length: 400 [400 bytes of data] 

Seperti yang Anda lihat, efisiensi permintaan seperti itu sangat rendah (400 byte payload / (133 byte byte + 400 byte tubuh) = 75%). Untungnya, di hampir semua perpustakaan, metode prediksi memungkinkan Anda untuk menerima bukan vektor [1 xn], tetapi matriks [mxn], dan, dengan demikian, hasilkan segera untuk nilai input m.

Selain itu, perpustakaan numpy dioptimalkan untuk bekerja dengan matriks besar, memungkinkan Anda untuk secara efektif menggunakan semua sumber daya mesin yang tersedia. Jadi, kita dapat mengirim bukan hanya satu tetapi sejumlah besar fitur vektor dalam satu permintaan, deserialize mereka menjadi matriks numpy ukuran [mxn], mengklasifikasikan, dan mengembalikan vektor [mx 1] dari nilai Boolean atau float32. Akibatnya, efisiensi interaksi HTTP saat menggunakan matriks 1000 baris menjadi hampir sama dengan 100%. Ukuran tajuk HTTP dalam hal ini dapat diabaikan.

Untuk menguji layanan Flask di mesin lokal, Anda dapat menjalankannya dari baris perintah. Namun, metode ini sama sekali tidak cocok untuk penggunaan industri. Faktanya adalah bahwa Flask adalah single-threaded dan, jika kita melihat diagram beban prosesor saat layanan sedang berjalan, kita akan melihat bahwa satu core 100% dimuat, dan sisanya tidak aktif. Untungnya, ada cara untuk menggunakan semua kernel mesin: untuk ini, Flask perlu dijalankan melalui server aplikasi web uwsgi. Ini memungkinkan Anda untuk mengkonfigurasi jumlah proses dan utas secara optimal untuk memastikan beban yang seragam pada semua inti prosesor. Rincian lebih lanjut tentang semua opsi untuk mengkonfigurasi uwsgi dapat ditemukan di sini .

Lebih baik menggunakan nginx sebagai titik masuk HTTP, karena uwsgi dapat bekerja secara tidak stabil jika bebannya tinggi. Nginx, di sisi lain, mengambil seluruh aliran input permintaan ke dirinya sendiri, menyaring permintaan yang tidak valid, dan dosis beban pada uwsgi. Nginx berkomunikasi dengan uwsgi melalui soket linux menggunakan file proses. Contoh konfigurasi nginx ditunjukkan di bawah ini:

 server { listen 80; server_name 127.0.0.1; location / { try_files $uri @score; } location @score { include uwsgi_params; uwsgi_pass unix:/tmp/score.sock; } } 

Seperti yang dapat kita lihat, itu ternyata merupakan konfigurasi yang agak rumit untuk satu mesin. Jika kita perlu mengklasifikasikan data dalam jumlah besar, sejumlah besar permintaan akan datang ke layanan ini, dan itu bisa menjadi hambatan. Solusi untuk masalah ini adalah penskalaan horizontal.

Untuk kenyamanan, kami mengemas layanan dalam wadah Docker dan kemudian menyebarkannya pada jumlah mesin yang diperlukan. Jika diinginkan, Anda dapat menggunakan alat penyebaran otomatis seperti Kubernetes. Contoh struktur Dockerfile untuk membuat wadah dengan layanan diberikan di bawah ini.

 FROM ubuntu #Installing required ubuntu and python modules RUN apt-get update RUN apt-get -y install python3 python3-pip nginx RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 RUN update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 RUN pip install uwsgi flask scipy scikit-learn #copying script files WORKDIR /etc/score COPY score.py . COPY score.ini . COPY start.sh . RUN chmod +x start.sh RUN rm /etc/nginx/sites-enabled/default COPY score.nginx /etc/nginx/sites-enabled/ EXPOSE 80 ENTRYPOINT ["./start.sh"] 

Jadi, struktur layanan untuk klasifikasi adalah sebagai berikut:

Gambar 3. Skema layanan untuk klasifikasi

Ringkasan singkat pekerjaan Apache Spark di ekosistem Hadoop


Sekarang pertimbangkan proses pemrosesan data yang disimpan pada HDFS. Seperti yang saya sebutkan sebelumnya, prinsip mentransfer perhitungan ke data digunakan untuk ini. Untuk mulai memproses tugas, Anda perlu tahu di mesin mana blok data yang kami butuhkan disimpan untuk menjalankan proses yang terlibat langsung dalam memprosesnya. Anda juga perlu mengoordinasikan peluncuran proses ini, memulai kembali jika terjadi keadaan darurat, jika perlu, mengagregasikan hasil dari berbagai subtugas, dll.

Semua tugas ini diselesaikan oleh berbagai kerangka kerja yang bekerja dengan ekosistem Hadoop. Salah satu yang paling populer dan nyaman adalah Apache Spark. Konsep utama di mana seluruh kerangka kerja dibangun adalah RDD (Resilient Distributed Dataset). Secara umum, RDD dapat dianggap sebagai koleksi terdistribusi yang tahan-drop. RDD dapat diperoleh dengan dua cara utama:

  1. pembuatan dari sumber eksternal, seperti koleksi dalam memori, file atau direktori pada sistem file, dll.
  2. konversi dari RDD lain dengan menerapkan operasi transformasi. RDD mendukung semua operasi dasar bekerja dengan koleksi, seperti peta, flatMap, filter, groupBy, gabung, dll.

Penting untuk dipahami bahwa RDD, tidak seperti koleksi, bukan secara langsung data, tetapi urutan operasi yang harus dilakukan pada data. Oleh karena itu, ketika operasi transformasi dipanggil, tidak ada pekerjaan yang benar-benar terjadi, dan kami hanya mendapatkan RDD baru, yang akan berisi satu operasi lebih banyak daripada yang sebelumnya. Pekerjaan itu sendiri dimulai ketika yang disebut operasi terminal, atau tindakan, dipanggil. Ini termasuk menyimpan ke file, menyimpan ke koleksi di memori, menghitung jumlah elemen, dll.

Ketika memulai operasi terminal, Spark membangun grafik operasi asiklik (DAG, Grafik Asiklik Langsung) berdasarkan RDD yang dihasilkan dan menjalankannya secara berurutan pada kluster sesuai dengan grafik yang diterima. Ketika membangun DAG berdasarkan RDD, Spark melakukan sejumlah optimasi, misalnya, jika mungkin, menggabungkan beberapa transformasi berturut-turut menjadi satu operasi.

RDD adalah unit utama interaksi dengan API Spark dalam versi Spark 1.x. Dalam Spark 2.x, para pengembang mengatakan bahwa sekarang konsep utama untuk interaksi adalah Dataset. Dataset adalah add-on untuk RDD dengan dukungan untuk interaksi seperti SQL. Saat menggunakan Dataset API, Spark memungkinkan Anda untuk menggunakan berbagai optimisasi, termasuk yang tingkatannya cukup rendah. Tetapi secara umum, prinsip-prinsip dasar yang berlaku untuk RDD juga berlaku untuk Dataset.

Rincian lebih lanjut tentang karya Spark dapat ditemukan dalam dokumentasi di situs web resmi .

Mari kita perhatikan contoh klasifikasi paling sederhana tentang Spark tanpa menggunakan layanan eksternal. Algoritma yang agak tidak berarti diterapkan di sini, yang mempertimbangkan proporsi masing-masing huruf Latin dalam teks, dan kemudian mempertimbangkan standar deviasi. Di sini, pertama-tama, penting untuk memperhatikan langsung langkah-langkah dasar yang digunakan saat bekerja dengan Spark.

 case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) def std(vector: Array[Float]): Float = ??? //(2) val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] //(3) val result: Dataset[Score] = ds.map {d: Data => //(4) val filteredText = d.text.toLowerCase.filter { letter => 'a' <= letter && letter <= 'z' } val featureVector = new Array[Float](26) if (filteredText.nonEmpty) { filteredText.foreach(letter => featureVector(letter) += 1) featureVector.indicies.foreach { i => featureVector(i) = featureVector(i) / filteredText.length() } } Features(d.id, featureVector) }.map {f: Features => Score(f.id, std(f.vector)) //(5) } result.write.parquet("/path/to/result") //(6) 

Dalam contoh ini, kami:

  1. kami menentukan struktur input, menengah dan data keluaran (data input didefinisikan sebagai beberapa teks yang terkait dengan pengidentifikasi tertentu, data menengah cocok dengan pengidentifikasi dengan vektor fitur, dan output cocok dengan pengidentifikasi dengan beberapa nilai numerik);
  2. kami mendefinisikan fungsi untuk menghitung nilai yang dihasilkan oleh vektor fitur (misalnya, standar deviasi, implementasi tidak ditampilkan);
  3. mendefinisikan Dataset asli sebagai data yang disimpan pada HDFS dalam format parket di sepanjang jalur / path / ke / data;
  4. Tetapkan Dataset perantara sebagai peta bitmap dari Dataset asli.
  5. Demikian pula, kami menentukan Dataset yang dihasilkan melalui transformasi bitwise dari perantara;
  6. simpan Dataset yang dihasilkan ke HDFS dalam format parket di sepanjang jalur / path / ke / hasil. Karena menyimpan ke file adalah operasi terminal, perhitungan sendiri diluncurkan tepat pada tahap ini.

Apache Spark bekerja berdasarkan prinsip master-pekerja. Ketika aplikasi dimulai, proses utama, yang disebut driver, dimulai. Itu mengeksekusi kode yang bertanggung jawab untuk pembentukan RDD, atas dasar di mana perhitungan akan dilakukan.

Ketika operasi terminal dipanggil, pengemudi menghasilkan DAG berdasarkan RDD yang dihasilkan. Kemudian pengemudi memulai peluncuran alur kerja yang disebut pelaksana, di mana data akan diproses secara langsung. Setelah memulai alur kerja, driver melewatinya blok yang dapat dieksekusi yang perlu dieksekusi, dan juga menunjukkan bagian data mana yang harus diterapkan.

Di bawah ini adalah kode dari contoh kita, di mana bagian-bagian kode yang dieksekusi pada eksekutor (antara baris eksekutor bagian dimulai dan eksekutor bagian akhir) disorot. Sisa kode dieksekusi pada driver.

 case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) def std(vector: Array[Float]): Float = ??? val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] val result: Dataset[Score] = ds.map { // --------------- EXECUTOR PART BEGIN ----------------------- d: Data => val filteredText = d.text.toLowerCase.filter { letter => 'a' <= letter && letter <= 'z' } val featureVector = new Array[Float](26) if (filteredText.nonEmpty) { filteredText.foreach(letter => featureVector(letter) += 1) featureVector.indicies.foreach { i => featureVector(i) = featureVector(i) / filteredText.length() } } Features(d.id, featureVector) // --------------- EXECUTOR PART END ----------------------- }.map { // --------------- EXECUTOR PART BEGIN ----------------------- f: Features => Score(f.id, std(f.vector)) // --------------- EXECUTOR PART END ----------------------- } result.write.parquet(β€œ/path/to/result”) 

Dalam ekosistem Hadoop, semua aplikasi berjalan dalam wadah. Wadah adalah proses yang berjalan di salah satu mesin dalam sebuah cluster yang dialokasikan sejumlah sumber daya. Peluncuran kontainer ditangani oleh YARN Resource Manager. Ini menentukan mesin mana yang memiliki jumlah inti prosesor dan RAM yang cukup, serta apakah itu berisi blok data yang diperlukan untuk diproses.

Saat meluncurkan aplikasi Spark, YARN membuat dan menjalankan kontainer di salah satu mesin cluster di mana ia meluncurkan driver. Kemudian, ketika pengemudi menyiapkan DAG dari operasi yang perlu dijalankan pada eksekutor, YARN meluncurkan kontainer tambahan pada mesin yang diinginkan.

Sebagai aturan, sudah cukup bagi pengemudi untuk mengalokasikan satu inti dan sejumlah kecil memori (kecuali, tentu saja, maka hasil perhitungan tidak akan diagregasi pada driver ke dalam memori). Untuk pelaksana, untuk mengoptimalkan sumber daya dan mengurangi jumlah total proses dalam sistem, lebih dari satu inti dapat dibedakan: dalam hal ini, pelaksana akan dapat melakukan beberapa tugas secara bersamaan.

Tetapi di sini penting untuk memahami bahwa jika terjadi kegagalan dari salah satu tugas yang berjalan di wadah atau jika sumber daya tidak mencukupi, YARN dapat memutuskan untuk menghentikan wadah, dan kemudian semua tugas yang dieksekusi di dalamnya harus dimulai kembali pada pelaksana lain. Selain itu, jika kami mengalokasikan jumlah core yang cukup besar per kontainer, maka kemungkinan YARN tidak akan dapat memulainya. Sebagai contoh, jika kita memiliki dua mesin yang dua core tidak digunakan, maka kita dapat memulai pada setiap kontainer yang membutuhkan dua core, tetapi tidak dapat memulai satu kontainer yang membutuhkan empat core.

Sekarang mari kita lihat bagaimana kode dari contoh kita akan dieksekusi langsung di cluster. Bayangkan bahwa ukuran data sumber adalah 2 Terabyte. Dengan demikian, jika ukuran blok pada HDFS adalah 128 megabita, maka akan ada total 16384 blok. Setiap blok direplikasi ke beberapa mesin untuk memastikan keandalan. Untuk kesederhanaan, kita ambil faktor replikasi sama dengan dua, yaitu, akan ada total 32768 blok yang tersedia. Misalkan kita menggunakan sekelompok 16 mesin untuk penyimpanan. Dengan demikian, pada masing-masing mesin dalam hal distribusi seragam akan ada sekitar 2048 blok, atau 256 Gigabytes per mesin. Di setiap mesin, kami memiliki 8 inti prosesor dan 64 gigabytes RAM.

Untuk tugas kami, driver tidak memerlukan banyak sumber daya, jadi kami akan mengalokasikan 1 inti dan 1 GB memori untuk itu. Kami akan memberikan pemain 2 core dan 4 GB memori. Misalkan kita ingin memaksimalkan penggunaan sumber daya klaster. Dengan demikian, kami mendapatkan 64 kontainer: satu untuk pengemudi, dan 63 untuk pemain.

Gambar 4. Proses yang berjalan pada Node Data dan sumber daya yang mereka gunakan.

Karena dalam kasus kami, kami hanya menggunakan operasi peta, DAG kami akan terdiri dari satu operasi. Ini terdiri dari tindakan berikut:

  1. ambil satu blok data dari hard drive lokal,
  2. Konversi data
  3. simpan hasilnya ke blok baru di disk lokal Anda sendiri.

Secara total, kita perlu memproses 16384 blok, sehingga setiap pelaksana harus melakukan 16384 / (63 pelaksana * 2 inti) = 130 operasi. Dengan demikian, siklus hidup pelaksana sebagai proses yang terpisah (jika semuanya terjadi tanpa jatuh) akan terlihat sebagai berikut.

  1. Peluncuran kontainer.
  2. Menerima dari pengemudi tugas di mana akan ada pengidentifikasi blok dan operasi yang diperlukan. Karena kami mengalokasikan dua inti ke wadah, pelaksana menerima dua tugas sekaligus.
  3. Melakukan tugas dan mengirimkan hasilnya ke pengemudi.
  4. Dapatkan tugas berikutnya dari driver dan ulangi langkah 2 dan 3 sampai semua blok untuk mesin lokal ini diproses.
  5. Kontainer Berhenti

Catatan : DAG yang lebih kompleks diperoleh jika perlu untuk mendistribusikan kembali data antara antara mesin, biasanya untuk operasi pengelompokan (groupBy, lessByKey, dll.) Dan koneksi (bergabung), pertimbangan yang berada di luar ruang lingkup artikel ini.

Masalah utama interaksi antara Apache Spark dan layanan eksternal


Jika, dalam kerangka operasi peta, kita perlu mengakses beberapa layanan eksternal, tugas menjadi kurang sepele. Misalkan objek kelas ExternalServiceClient bertanggung jawab untuk berinteraksi dengan layanan eksternal. Secara umum, sebelum mulai bekerja, kita perlu menginisialisasi, dan menyebutnya seperlunya:

 val client = ExternalServiceClient.create() // val score = client.score(featureVector) // . 

Biasanya, inisialisasi klien membutuhkan waktu, oleh karena itu, sebagai aturan, inisialisasi diinisialisasi pada startup aplikasi, dan kemudian digunakan untuk mendapatkan instance klien dari beberapa konteks global atau kumpulan. Oleh karena itu, ketika sebuah wadah dengan pelaksana Spark menerima tugas yang memerlukan interaksi dengan layanan eksternal, alangkah baiknya untuk mendapatkan klien yang sudah diinisialisasi sebelum mulai bekerja pada array data, dan kemudian menggunakannya kembali untuk setiap elemen.

Ada dua cara untuk melakukan ini di Spark. Pertama, jika klien serializable (klien itu sendiri dan semua bidangnya harus memperluas antarmuka java.io.Serializable), maka dapat diinisialisasi pada driver dan kemudian diteruskan ke pelaksana melalui mekanisme variabel siaran .

 val client = ExternalServiceClient.create() val clientBroadcast = sparkContext.broadcast(client) ds.map { f: Features => val score = clientBroadcast.value.score(f.vector) Score(f.id, score) } 

Dalam hal klien tidak serializable, atau inisialisasi klien adalah proses yang tergantung pada pengaturan mesin tertentu yang menjalankannya (misalnya, untuk menyeimbangkan, permintaan dari satu bagian mesin harus pergi ke mesin layanan pertama, dan untuk yang lain ke yang kedua), maka klien dapat diinisialisasi langsung pada pelaksana.

Untuk ini, RDD (dan Dataset) memiliki operasi mapPartitions, yang merupakan versi umum dari operasi peta (jika Anda melihat kode sumber dari kelas RDD, maka operasi peta diimplementasikan melalui mapPartitions). Fungsi yang diteruskan ke operasi mapPartitions dijalankan sekali untuk setiap blok. , , , :

 ds.mapPartitions {fi: Iterator[Features] => val client = ExternalServiceClient.create() fi.map { f: Features => val score = client.score(f.vector) Score(f.id, score) } } 

. , , , , . , , , .

. , hasNext next:

 while (i.hasNext()) { val item = i.next() … } 

, , . , 8 , YARN 4 2 , , 8 . , . .

. , , , , . : , , . , hasNext , . (, , ) , , , . , .

5. , , mapPartitions, . .

, , . , , , .

6.

, , , -, , , , -, , .


, . , . , . , . , , , , , , .

.

  1. , , , .
  2. , , . , . , .
  3. , hasNext false, , , , . : hasNext = false, , , . , , , .

, . Stay tuned!

Source: https://habr.com/ru/post/id413137/


All Articles