
Dalam posting ini, kita akan melihat mengakses API Spark dari berbagai bahasa pemrograman di JVM, serta beberapa masalah kinerja ketika melampaui bahasa Scala. Bahkan jika Anda bekerja di luar JVM, bagian ini mungkin berguna, karena bahasa non-JVM sering bergantung pada Java API, dan bukan pada Scala API.
Bekerja dalam bahasa pemrograman lain tidak selalu berarti Anda harus melampaui JVM, dan bekerja di JVM memiliki banyak keuntungan dalam hal kinerja - terutama karena Anda tidak perlu menyalin data. Meskipun tidak perlu menggunakan pustaka atau adapter yang mengikat khusus untuk mengakses Spark dari luar bahasa Scala, memohon kode Scala dari bahasa pemrograman lain bisa jadi sulit. Kerangka kerja Spark mendukung penggunaan Java 8 dalam ekspresi lambda, dan mereka yang menggunakan versi JDK yang lebih lama memiliki kesempatan untuk mengimplementasikan antarmuka yang sesuai dari paket fungsi org.apache.spark.api.ja.java.fungsi. Bahkan dalam kasus di mana Anda tidak perlu menyalin data, bekerja dalam bahasa pemrograman lain mungkin memiliki nuansa kecil namun penting terkait dengan kinerja.
Yang paling mencolok adalah kesulitan dalam mengakses berbagai Scala API ketika memanggil fungsi dengan tag kelas atau ketika menggunakan properti yang disediakan menggunakan konversi tipe implisit (misalnya, semua fungsi set RDD terkait dengan kelas Double dan Tuple). Untuk mekanisme yang bergantung pada konversi tipe implisit, kelas konkret yang setara sering diberikan bersama dengan konversi eksplisit kepada mereka. Tag kelas Dummy (katakanlah, AnyRef) dapat diteruskan ke fungsi yang bergantung pada tag kelas (seringkali adaptor melakukan ini secara otomatis. Menerapkan kelas spesifik alih-alih konversi tipe implisit biasanya tidak mengarah ke overhead tambahan, tetapi tag kelas dummy dapat memberlakukan batasan pada beberapa optimisasi kompiler.
API Java tidak terlalu berbeda dari Scala API dalam hal properti, hanya kadang-kadang beberapa fungsionalitas atau API pengembang hilang. Bahasa pemrograman JVM lainnya, seperti bahasa Clojure dengan DSL
Flambo dan perpustakaan
gemerlap ,
didukung menggunakan berbagai API Java alih-alih memanggil Scala API secara langsung. Karena sebagian besar binding bahasa, bahkan bahasa non-JVM seperti Python dan R, melalui
Java API, akan sangat berguna untuk menghadapinya.
API Java sangat mirip dengan API Scala, meskipun mereka tidak tergantung pada tag kelas dan konversi implisit. Tidak adanya yang terakhir berarti bahwa alih-alih secara otomatis mengonversi set RDD dari Tuple atau objek ganda ke kelas khusus dengan fungsi tambahan, Anda harus menggunakan fungsi konversi tipe eksplisit (misalnya, mapToDouble atau mapToPair). Fungsi yang ditentukan hanya ditentukan untuk set RDD Java; untungnya untuk kompatibilitas, tipe khusus ini hanya adapter untuk set Scala RDD. Selain itu, fungsi-fungsi khusus ini mengembalikan berbagai tipe data, seperti JavaDoubleRDD dan JavaPairRDD, dengan fitur-fitur yang disediakan oleh transformasi bahasa Scala implisit.
Mari kita kembali ke contoh kanonik penghitungan kata menggunakan Java API (Contoh 7.1). Karena memanggil Scala API dari Java kadang-kadang bisa menjadi tugas yang menakutkan, hampir semua API framework Java Spark diimplementasikan dalam bahasa Scala dengan tag kelas tersembunyi dan konversi tersirat. Karena itu, adapter Java adalah lapisan yang sangat tipis, rata-rata hanya terdiri dari beberapa baris kode, dan menulis ulang mereka hampir tanpa usaha.
Contoh 7.1 Penghitungan Kata (Jawa)
import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.api.java.JavaSparkContext; import java.util.regex.Pattern; import java.util.Arrays; public final class WordCount { private static final Pattern pattern = Pattern.compile(" "); public static void main(String[] args) throws Exception { JavaSparkContext jsc = new JavaSparkContext(); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(e -> Arrays.asList( pattern.split(e)).iterator()); JavaPairRDD<String, Integer> wordsIntial = words.mapToPair( e -> new Tuple2<String, Integer>(e, 1)); } }
Terkadang Anda mungkin perlu mengubah RDD Java ke RDD Scala atau sebaliknya. Ini paling sering diperlukan untuk pustaka yang membutuhkan input atau mengembalikan set Scala RDD, tapi kadang-kadang properti Spark dasar mungkin belum tersedia di Java API. Mengubah RDD Java ke Scala RDD adalah cara termudah untuk menggunakan fitur-fitur baru ini.
Jika Anda perlu mentransfer RDD Java yang diatur ke pustaka Scala, yang mengharapkan Spark RDD biasa pada input, Anda dapat mengakses RDD Scala yang mendasarinya menggunakan metode rdd (). Paling sering ini cukup untuk mentransfer RDD akhir ke perpustakaan Scala yang diinginkan; Di antara pengecualian yang menonjol adalah Scala libraries, yang mengandalkan konversi implisit dari jenis set konten konten atau informasi tag kelas dalam pekerjaan mereka. Dalam hal ini, cara termudah untuk mengakses konversi tersirat adalah dengan menulis adaptor kecil di Scala. Jika kerang Scala tidak dapat digunakan, maka Anda dapat memanggil fungsi yang sesuai dari kelas
JavaConverters dan membentuk tag kelas dummy.
Untuk membuat tag kelas dummy, Anda dapat menggunakan metode scala.reflect.ClassTag $ .MODULE $ .AnyRef () atau mendapatkan yang asli menggunakan scala.reflect.ClassTag $ .MODULE $ .apply (CLASS), seperti yang ditunjukkan dalam contoh 7.2 dan 7.3.
Untuk mengonversi dari Scala RDD ke RDD Java, informasi tag kelas seringkali lebih penting daripada kebanyakan perpustakaan Spark. Alasannya adalah bahwa meskipun berbagai kelas JavaRDD menyediakan konstruktor yang dapat diakses publik yang menggunakan Scala RDD sebagai argumen, mereka dimaksudkan untuk dipanggil dari kode Scala, dan oleh karena itu memerlukan informasi tentang tag kelas.
Tag kelas Dummy paling sering digunakan dalam kode generik atau templat, di mana jenis pastinya tidak diketahui pada waktu kompilasi. Tag semacam itu sering kali cukup, meskipun ada kemungkinan kehilangan beberapa nuansa di sisi kode Scala; dalam kasus yang sangat jarang, kode Scala memerlukan informasi tag kelas yang akurat. Dalam hal ini, Anda harus menggunakan tag asli. Dalam kebanyakan kasus, ini tidak memerlukan banyak usaha dan meningkatkan kinerja, jadi cobalah untuk menggunakan tag tersebut sedapat mungkin.
Contoh 7.2. Membuat RDD Java / Scala Kompatibel dengan Tag Kelas Dummy
public static JavaPairRDD wrapPairRDDFakeCt( RDD<Tuple2<String, Object>> RDD) { // AnyRef — // , // , // // ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef(); return new JavaPairRDD(rdd, fake, fake); }
Contoh 7.3. Memastikan Kompatibilitas RDD Java / Scala
public static JavaPairRDD wrapPairRDD( RDD<Tuple2<String, Object>> RDD) { // ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class); ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class); return new JavaPairRDD(rdd, strCt, longCt); }
Spark SQL dan API pipa ML sebagian besar dibuat konsisten di Jawa dan Scala. Namun, fungsi pembantu khusus Java ada, dan fungsi Scala yang setara dengannya tidak mudah untuk dipanggil. Berikut adalah contohnya: berbagai fungsi numerik, seperti plus, minus, dll., Untuk kelas Kolom. Sulit untuk memanggil ekivalen kelebihan muatan mereka dari bahasa Scala (+, -). Alih-alih menggunakan JavaDataFrame dan JavaSQLContext, metode yang disyaratkan Java tersedia di SQLContext dan set DataFrame biasa. Ini mungkin membingungkan Anda, karena beberapa metode yang disebutkan dalam dokumentasi Java tidak dapat digunakan dari kode Java, tetapi dalam kasus seperti itu fungsi dengan nama yang sama disediakan untuk panggilan dari Jawa.
Fungsi yang ditentukan pengguna (UDFs) dalam bahasa Jawa, dan dalam hal ini, di sebagian besar bahasa lain kecuali Scala, mereka memerlukan menentukan jenis nilai yang dikembalikan oleh fungsi, karena itu tidak dapat dideduksi secara logis, mirip dengan bagaimana hal itu dilakukan dalam Scala (contoh 7.4) .
Contoh 7.4. Sampel UDF untuk Java
sqlContext.udf() .register("strlen", (String s) -> s.length(), DataTypes.StringType);
Meskipun jenis yang dibutuhkan oleh Scala dan Java API berbeda, pembungkus tipe pengumpulan Java tidak memerlukan penyalinan tambahan. Dalam kasus iterator, konversi tipe yang diperlukan untuk adaptor dilakukan secara tertunda ketika elemen diakses, yang memungkinkan kerangka Spark untuk membuang data jika perlu (seperti yang dibahas dalam bagian "Melakukan transformasi iterator-iterator menggunakan fungsi mapPartitions" di halaman 121). Ini sangat penting karena bagi banyak operasi sederhana biaya menyalin data mungkin lebih tinggi daripada biaya perhitungan itu sendiri.
Melampaui Scala dan JVM
Jika Anda tidak membatasi diri pada JVM, maka jumlah bahasa pemrograman yang tersedia untuk pekerjaan meningkat secara dramatis. Namun, dengan arsitektur Spark saat ini, bekerja di luar JVM - terutama pada node kerja - dapat menyebabkan kenaikan biaya yang signifikan karena menyalin data dalam node kerja antara JVM dan kode bahasa target. Dalam operasi yang kompleks, bagian dari biaya menyalin data relatif kecil, tetapi dalam operasi sederhana dapat dengan mudah menyebabkan penggandaan total biaya komputasi.
Bahasa pemrograman non-JVM pertama yang secara langsung didukung di luar Spark adalah Python, API dan antarmuka-nya telah menjadi model yang menjadi dasar implementasi untuk bahasa pemrograman non-JVM lainnya.
Bagaimana PySpark Bekerja
PySpark terhubung ke JVM Spark menggunakan campuran saluran pada pekerja dan Py4J, perpustakaan khusus yang menyediakan interaksi Python / Java, pada driver. Di bawah ini, pada pandangan pertama, arsitektur sederhana menyembunyikan banyak nuansa kompleks, berkat yang berfungsi PySpark, seperti yang ditunjukkan pada Gambar. 7.1. Salah satu masalah utama: bahkan ketika data disalin dari pekerja Python ke JVM, itu bukan dalam bentuk bahwa mesin virtual dapat dengan mudah diurai. Upaya khusus diperlukan oleh pekerja Python dan Java untuk memastikan bahwa JVM memiliki informasi yang cukup untuk operasi seperti partisi.
Kit RDD PySpark
Biaya sumber daya untuk mentransfer data ke dan dari JVM, serta untuk menjalankan pelaksana Python, adalah signifikan. Anda dapat menghindari banyak masalah kinerja dengan API RDD Suite PySpark menggunakan API DataFrame / Dataset, karena data tetap berada di JVM selama mungkin.
Menyalin data dari JVM ke Python dilakukan dengan menggunakan soket dan byte serial. Versi yang lebih umum untuk berinteraksi dengan program dalam bahasa lain tersedia melalui antarmuka PipedRDD, aplikasi yang diperlihatkan dalam subbagian “Menggunakan pipa”.
Organisasi saluran untuk pertukaran data (dalam dua arah) untuk setiap transformasi akan terlalu mahal. Akibatnya, PySpark mengatur (jika mungkin) Python mengubah pipa di dalam juru bahasa Python, merantai operasi filter, dan setelah itu memetakannya, pada objek Python iterator menggunakan kelas PipelinedRDD khusus. Bahkan ketika Anda perlu mengacak data dan PySpark tidak dapat menghubungkan konversi di mesin virtual pekerja individu, Anda dapat menggunakan kembali juru bahasa Python, sehingga biaya memulai juru bahasa tidak akan melambat lebih jauh.
Ini hanya bagian dari teka-teki. PipedRDD biasa bekerja dengan tipe String, yang tidak begitu mudah untuk diacak karena kurangnya kunci alami. Di PySpark, dan dalam gambar dan kesamaan dalam pustaka yang mengikat banyak bahasa pemrograman lain, jenis khusus PairwiseRDD digunakan, di mana kuncinya adalah bilangan bulat panjang, dan deserialisasi dilakukan oleh kode pengguna dalam bahasa Scala, yang dimaksudkan untuk mengurai nilai Python. Biaya deserialisasi ini tidak terlalu tinggi, tetapi ini menunjukkan bahwa Scala dalam kerangka kerja Spark pada dasarnya menganggap hasil kode Python berfungsi sebagai array byte "buram".
Untuk semua kesederhanaannya, pendekatan integrasi ini bekerja sangat baik, dan sebagian besar operasi pada set Scala RDD tersedia dalam Python. Di beberapa tempat yang paling sulit dalam kode, perpustakaan diakses, misalnya, MLlib, serta memuat / menyimpan data dari berbagai sumber.
Bekerja dengan berbagai format data juga membebankan keterbatasannya, karena sebagian besar kode untuk memuat / menyimpan data dari kerangka kerja Spark didasarkan pada antarmuka Hadoop Java. Ini berarti bahwa semua data yang dimuat pertama kali dimuat ke dalam JVM, dan baru kemudian dipindahkan ke Python.
Dua pendekatan biasanya digunakan untuk berinteraksi dengan MLlib: PySpark menggunakan tipe data khusus dengan konversi tipe Scala, atau algoritmanya diimplementasikan kembali dengan Python. Masalah-masalah ini dapat dihindari dengan paket Spark ML, yang menggunakan antarmuka DataFrame / Dataset, yang biasanya menyimpan data dalam JVM.
PySpark DataFrame dan Kit Data
Set DataFrame dan Dataset tidak memiliki banyak masalah kinerja dengan API Set RDD Python karena mereka menyimpan data dalam JVM selama mungkin. Tes kinerja yang sama yang kami lakukan untuk menggambarkan keunggulan set DataFrame atas set RDD (lihat Gambar 3.1) menunjukkan perbedaan yang signifikan ketika berjalan dengan Python (Gambar 7.2).
Untuk banyak operasi dengan set DataFrame dan Dataset, Anda mungkin tidak perlu memindahkan data dari JVM sama sekali, meskipun menggunakan berbagai ekspresi UDF, UDAF, dan Python lambda secara alami membutuhkan pemindahan sebagian data ke dalam JVM. Ini mengarah ke skema yang disederhanakan berikut untuk banyak operasi, yang terlihat seperti yang ditunjukkan pada Gambar. 7.3.
Akses ke objek Java yang mendasari dan kode campuran dalam Scala
Konsekuensi penting dari arsitektur PySpark adalah bahwa banyak kelas kerangka kerja Spark Python sebenarnya adalah adaptor untuk menerjemahkan panggilan dari kode Python ke dalam bentuk JVM yang dapat dimengerti.
Jika Anda bekerja dengan pengembang Scala / Java dan ingin berinteraksi dengan kode mereka, maka sebelumnya tidak akan ada adaptor untuk mengakses kode Anda, tetapi Anda dapat mendaftarkan Java / Scala UDF Anda dan menggunakannya dari kode Python. Dimulai dengan Spark 2.1, ini dapat dilakukan dengan menggunakan metode registerJavaFunction dari objek sqlContext.
Terkadang adaptor ini tidak memiliki semua mekanisme yang diperlukan, dan karena Python tidak memiliki perlindungan yang kuat terhadap pemanggilan metode pribadi, Anda dapat segera beralih ke JVM. Teknik yang sama akan memungkinkan Anda untuk mengakses kode Anda sendiri di JVM dan, dengan sedikit usaha, mengubah hasilnya kembali ke objek Python.
Di subbagian "Rencana kueri besar dan algoritme berulang" pada hal. 91 kami mencatat pentingnya menggunakan versi JVM dari DataFrame dan set RDD untuk mengurangi rencana kueri. Ini adalah solusi, karena ketika rencana kueri menjadi terlalu besar untuk diproses oleh pengoptimal Spark SQL, pengoptimal SQL, karena menempatkan set RDD di tengah, kehilangan kemampuan untuk melihat melampaui saat data muncul di RDD. Hal yang sama dapat dicapai dengan bantuan API Python publik, namun, banyak keuntungan dari set DataFrame akan hilang, karena semua data harus bolak-balik melalui simpul kerja Python. Sebagai gantinya, Anda dapat mengurangi grafik asal dengan terus menyimpan data dalam JVM (seperti yang ditunjukkan pada Contoh 7.5).
Contoh 7.5 Memotong rencana permintaan besar untuk DataFrame menggunakan Python
def cutLineage(df): """ DataFrame — .. : >>> df = RDD.toDF() >>> cutDf = cutLineage(df) >>> cutDf.count() 3 """ jRDD = df._jdf.toJavaRDD() jSchema = df._jdf.schema() jRDD.cache() sqlCtx = df.sql_ctx try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema) newDF = DataFrame(newJavaDF, sqlCtx) return newDF
Secara umum, dengan konvensi, sintaks _j [disingkat_name] digunakan untuk mengakses versi Java internal sebagian besar objek Python. Jadi, misalnya, objek SparkContext memiliki _jsc, yang memungkinkan Anda untuk mendapatkan objek Java SparkContext internal. Ini hanya mungkin dalam program driver, jadi ketika Anda mengirim objek PySpark ke node kerja, Anda tidak akan dapat mengakses komponen Java internal dan sebagian besar API tidak akan berfungsi.
Untuk mengakses kelas Spark di JVM, yang tidak memiliki adaptor Python, Anda dapat menggunakan gateway Py4J pada driver. Objek SparkContext berisi tautan ke gateway di properti _gateway. Sintaksis sc._gateway.jvm. [Full_class_name_in_JVM] akan memungkinkan akses ke objek Java apa pun.
Teknik serupa akan bekerja untuk kelas Scala Anda sendiri jika diatur menurut classpath. Anda bisa menambahkan file JAR ke classpath menggunakan perintah spark-submit dengan parameter --jars atau dengan mengatur properti konfigurasi spark.driver.extraClassPath. Contoh 7.6, yang membantu menghasilkan beras. 7.2, sengaja dirancang untuk menghasilkan data untuk pengujian kinerja menggunakan kode Scala yang ada.
Contoh 7.6 Memanggil kelas non-Spark-JVM menggunakan Py4J
sc = sqlCtx._sc # SQL Context, 2.1, 2.0 , # 2.0, — , :p try: try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx except: javaSqlCtx = sqlCtx._jwrapped jsc = sc._jsc scalasc = jsc.sc() gateway = sc._gateway # java-, RDD JVM- # Row (Int, Double). RDD Python # RDD Java ( Row), # , . # Java-RDD Row — # DataFrame, # RDD Row. java_rdd = (gateway.jvm.com.highperformancespark.examples. tools.GenerateScalingData. generateMiniScaleRows(scalasc, rows, numCols)) # JSON . # Python- Java-. schema = StructType([ StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]) # 2.1 / 2.1 try: jschema = javaSqlCtx.parseDataType(schema.json()) except: jschema = sqlCtx._jsparkSession.parseDataType(schema.json()) # RDD (Java) DataFrame (Java) java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema) # DataFrame (Java) DataFrame (Python) python_dataframe = DataFrame(java_dataframe, sqlCtx) # DataFrame (Python) RDD pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1])) return (python_dataframe, pairRDD)
Meskipun banyak kelas Python hanyalah adapter dari objek Java, tidak semua objek Java dapat dibungkus dengan objek Python dan kemudian digunakan dalam Spark. Sebagai contoh, objek dalam set RDD PySpark direpresentasikan sebagai string serial, yang hanya dapat diurai dengan mudah dalam kode Python. Untungnya, objek DataFrame distandarisasi antara bahasa pemrograman yang berbeda, jadi jika Anda dapat mengonversi data Anda menjadi set DataFrame, Anda kemudian dapat membungkusnya dalam objek Python dan menggunakannya secara langsung sebagai Python DataFrame, atau mengonversi Python DataFrame ke RDD dari ini. bahasa yang sama.
»Informasi lebih lanjut tentang buku ini dapat ditemukan di
situs web penerbit»
Isi»
Kutipan20% diskon kupon untuk Penyemprot -
Spark