Spark SQL. Sedikit tentang pengoptimal permintaan

Halo semuanya. Sebagai pengantar, saya ingin memberi tahu Anda bagaimana saya sampai pada kehidupan seperti itu.


Sebelum bertemu dengan Big Data dan Spark, khususnya, saya punya banyak dan sering untuk mengoptimalkan query SQL, pertama untuk MSSQL, kemudian untuk Oracle, dan sekarang saya menemukan SparkSQL.


Dan jika sudah ada banyak buku bagus untuk DBMS yang menjelaskan metodologi dan "pena" yang dapat Anda putar untuk mendapatkan rencana kueri yang optimal, maka saya belum melihat buku seperti itu untuk Spark. Saya menemukan lebih banyak artikel dan serangkaian praktik, lebih terkait dengan bekerja melalui RDD / Dataset API, daripada SQL murni. Bagi saya, salah satu buku referensi tentang optimasi SQL adalah buku J. Lewis, Oracle. Dasar-dasar optimasi biaya. " Saya mencari sesuatu yang serupa dalam studi mendalam. Mengapa subjek penelitian khusus SparkSQL, dan bukan API yang mendasarinya? Kemudian ketertarikan itu disebabkan oleh fitur-fitur proyek yang sedang saya kerjakan.




Untuk salah satu pelanggan kami, perusahaan kami sedang mengembangkan data warehouse, lapisan terperinci di mana dan bagian dari jendela ada di cluster Hadoop, dan jendela terakhir di Oracle. Proyek ini melibatkan lapisan konversi data yang luas, yang diimplementasikan pada Spark. Untuk mempercepat pengembangan dan konektivitas pengembang ETL yang tidak terbiasa dengan seluk-beluk teknologi Big Data, tetapi akrab dengan alat SQL dan ETL, alat telah dikembangkan yang secara ideologis mengingatkan alat ETL lainnya, misalnya, Informatica, dan memungkinkan Anda merancang proses ETL secara visual dengan generasi berikutnya. kode untuk Spark. Karena kompleksitas algoritma dan banyaknya transformasi, pengembang terutama menggunakan kueri SparkSQL.


Di sinilah cerita dimulai, karena saya harus menjawab sejumlah besar pertanyaan dari bentuk "Mengapa kueri tidak bekerja / bekerja lambat / tidak berfungsi seperti di Oracle?". Yang ini ternyata menjadi bagian yang paling menarik bagi saya: "Mengapa itu bekerja lambat?". Selain itu, tidak seperti DBMS yang pernah saya pakai sebelumnya, Anda dapat masuk ke kode sumber dan mendapatkan jawaban atas pertanyaan Anda.


Keterbatasan dan Asumsi


Spark 2.3.0 digunakan untuk menjalankan contoh dan menganalisis kode sumber.
Diasumsikan bahwa pembaca sudah terbiasa dengan arsitektur Spark, dan prinsip umum pengoptimal kueri untuk salah satu DBMS. Paling tidak, frasa "rencana kueri" tentu tidak mengherankan.


Juga, artikel ini mencoba untuk tidak menjadi terjemahan kode optimizer Spark ke dalam bahasa Rusia, jadi untuk hal-hal yang sangat menarik dari sudut pandang optimizer, tetapi yang dapat dibaca dalam kode sumber, mereka hanya akan disebutkan secara singkat di sini dengan tautan ke kelas yang sesuai.


Lanjutkan untuk belajar


Mari kita mulai dengan kueri kecil untuk menjelajahi tahapan dasar yang digunakan mulai dari penguraian ke eksekusi.


scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal") scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust") scala> val df = spark.sql(""" | select bal.account_rk, cust.full_name | from bal | join cust | on bal.party_rk = cust.party_rk | and bal.actual_date = cust.actual_date | where bal.actual_date = cast('2017-12-31' as date) | """) df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string] scala> df.explain(true) 

Modul utama yang bertanggung jawab untuk mem-parsing SQL dan mengoptimalkan rencana eksekusi permintaan adalah Spark Catalyst.


Output yang diperluas dalam deskripsi rencana permintaan (df.explain (true)) memungkinkan Anda untuk melacak semua tahapan yang dilalui permintaan:


  • Parsed Logical Plan - dapatkan setelah parsing SQL. Pada tahap ini, hanya kebenaran sintaksis dari permintaan yang diperiksa.

 == Parsed Logical Plan == 'Project ['bal.account_rk, 'cust.full_name] +- 'Filter ('bal.actual_date = cast(2017-12-31 as date)) +- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date)) :- 'UnresolvedRelation `bal` +- 'UnresolvedRelation `cust` 

  • Analisis Logical Plan - pada tahap ini, informasi tentang struktur entitas yang digunakan ditambahkan, korespondensi struktur dan atribut yang diminta diperiksa.

 == Analyzed Logical Plan == account_rk: decimal(38,18), full_name: string Project [account_rk#1, full_name#59] +- Filter (actual_date#27 = cast(2017-12-31 as date)) +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- SubqueryAlias bal : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- SubqueryAlias cust +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc 

  • Optimized Logical Plan adalah yang paling menarik bagi kami. Pada tahap ini, pohon kueri yang dihasilkan dikonversi berdasarkan aturan optimasi yang tersedia.

 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531)) +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc 

  • Rencana Fisik - fitur akses ke data sumber mulai diperhitungkan, termasuk optimisasi untuk menyaring partisi dan data untuk meminimalkan set data yang dihasilkan. Strategi eksekusi gabungan dipilih (untuk detail lebih lanjut tentang opsi yang tersedia, lihat di bawah).

 == Physical Plan == *(2) Project [account_rk#1, full_name#59] +- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight :- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- *(2) Filter isnotnull(party_rk#18) : +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true])) +- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- *(1) Filter isnotnull(party_rk#57) +- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string> 

Tahapan optimasi dan eksekusi berikut (misalnya, WholeStageCodegen) berada di luar ruang lingkup artikel ini, tetapi dijelaskan secara sangat rinci (serta tahapan yang dijelaskan di atas) di Menguasai Spark Sql .


Membaca rencana eksekusi permintaan biasanya terjadi "dari dalam" dan "dari bawah ke atas", yaitu, bagian yang paling bersarang dijalankan terlebih dahulu, dan secara bertahap maju ke proyeksi akhir yang terletak di bagian paling atas.


Jenis pengoptimal kueri


Dua jenis pengoptimal kueri dapat dibedakan:


  • Pengoptimal berbasis aturan (RBO).
  • Pengoptimal berdasarkan pada perkiraan biaya eksekusi permintaan (Pengoptimal berbasis biaya, KSM).

Yang pertama difokuskan pada penggunaan seperangkat aturan tetap, misalnya, penerapan kondisi penyaringan dari mana pada tahap sebelumnya, jika mungkin, perhitungan konstanta, dll.


Untuk mengevaluasi kualitas rencana yang dihasilkan, pengoptimal CBO menggunakan fungsi biaya, yang biasanya tergantung pada jumlah data yang diproses, jumlah baris yang termasuk dalam filter, dan biaya untuk melakukan operasi tertentu.


Untuk mempelajari lebih lanjut tentang spesifikasi desain CBO untuk Apache Spark, silakan ikuti tautan: spesifikasi dan tugas utama JIRA untuk implementasi .


Titik awal untuk menjelajahi berbagai optimasi yang ada adalah kode Optimizer.scala.


Berikut ini kutipan singkat dari daftar panjang optimisasi yang tersedia:


 def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, ........ 

Perlu dicatat bahwa daftar optimisasi ini mencakup optimisasi berbasis aturan dan optimisasi berdasarkan estimasi biaya kueri, yang akan dibahas di bawah ini.


Fitur CBO adalah bahwa untuk operasi yang benar perlu mengetahui dan menyimpan informasi tentang statistik data yang digunakan dalam kueri - jumlah catatan, ukuran catatan, histogram distribusi data dalam kolom tabel.


Untuk mengumpulkan statistik, seperangkat perintah SQL ANALYZE TABLE ... COMPUTE STATISTICS digunakan, di samping itu, satu set tabel diperlukan untuk menyimpan informasi, API disediakan melalui ExternalCatalog, lebih tepatnya melalui HiveExternalCatalog.


Karena CBO saat ini dinonaktifkan secara default, penekanan utama akan ditempatkan pada penelitian optimasi dan nuansa RBO yang tersedia.


Jenis dan pilihan strategi bergabung


Pada tahap pembentukan rencana fisik untuk mengeksekusi permintaan, strategi bergabung dipilih. Opsi berikut saat ini tersedia di Spark (Anda dapat mulai mempelajari kode dari kode di SparkStrategies.scala).


Siaran hash bergabung


Opsi terbaik adalah jika salah satu pihak bergabung cukup kecil (kriteria kecukupan diatur oleh parameter spark.sql.autoBroadcastJoinThreshold di SQLConf). Dalam hal ini, sisi ini sepenuhnya disalin ke semua pelaksana, di mana ada hash bergabung dengan tabel utama. Selain ukuran, harus dicatat bahwa dalam kasus sambungan luar, hanya sisi luar yang dapat disalin, oleh karena itu, jika mungkin, sebagai tabel utama dalam kasus sambungan luar, Anda harus menggunakan tabel dengan jumlah data terbesar.


   ,    ,     SQL      Oracle,   /*+ broadcast(t1, t2) */ 

Sortir gabungan bergabung


Dengan spark.sql.join.preferSortMergeJoin diaktifkan secara default, metode ini diterapkan secara default jika kunci untuk bergabung dapat diurutkan.
Dari fitur-fiturnya, dapat dicatat bahwa, tidak seperti metode sebelumnya, optimisasi pembuatan kode untuk melakukan operasi hanya tersedia untuk gabungan internal.


Kocok hash bergabung


Jika kunci tidak dapat diurutkan, atau opsi pemilihan gabungan pilihan gabungan dinonaktifkan, Catalyst mencoba menerapkan gabungan hash acak. Selain memeriksa pengaturan, juga diperiksa bahwa Spark memiliki cukup memori untuk membangun peta hash lokal untuk satu partisi (jumlah total partisi diatur dengan mengatur spark.sql.shuffle.partitions )


BroadcastNestedLoopJoin dan CartesianProduct


Dalam kasus di mana tidak ada kemungkinan perbandingan langsung dengan kunci (misalnya, kondisi seperti) atau tidak ada kunci untuk bergabung dengan tabel, tergantung pada ukuran tabel, baik jenis ini atau Produk Cartesian dipilih.


Urutan menentukan tabel di join'ah


Bagaimanapun, bergabung membutuhkan tabel acak dengan kunci. Oleh karena itu, saat ini, urutan tabel penentu, terutama dalam hal melakukan beberapa gabungan dalam satu baris, adalah penting (jika Anda membosankan, maka jika CBO tidak dihidupkan dan pengaturan JOIN_REORDER_ENABLED tidak diaktifkan).


Jika memungkinkan, urutan tabel bergabung harus meminimalkan jumlah operasi acak untuk tabel besar, yang bergabung dengan kunci yang sama harus berurutan. Juga, jangan lupa untuk meminimalkan data untuk bergabung, untuk mengaktifkan Broadcast Hash Join.


Aplikasi kondisi filter yang transitif


Pertimbangkan pertanyaan berikut:


 select bal.account_rk, cust.full_name from balance bal join customer cust on bal.party_rk = cust.party_rk and bal.actual_date = cust.actual_date where bal.actual_date = cast('2017-12-31' as date) 

Di sini kita menghubungkan dua tabel yang dipartisi dengan cara yang sama, sesuai dengan bidang actual_date dan menerapkan filter eksplisit hanya untuk partisi sesuai dengan tabel saldo.


Seperti yang dapat dilihat dari rencana kueri yang dioptimalkan, filter berdasarkan tanggal juga berlaku untuk pelanggan, dan pada saat membaca data dari disk, ditentukan bahwa tepat satu partisi diperlukan.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57)) +- Relation[,... 9 more fields] orc 

Tetapi Anda hanya perlu mengganti gabungan dalam dengan bagian luar kiri dalam kueri, karena predikat push untuk tabel pelanggan langsung jatuh, dan pemindaian penuh terjadi, yang merupakan efek yang tidak diinginkan.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Relation[,... 9 more fields] orc 

Jenis konversi


Pertimbangkan contoh sederhana pemilihan dari tabel dengan pemfilteran menurut jenis klien, dalam skema, jenis bidang party_type adalah string.


 select party_rk, full_name from cust where actual_date = cast('2017-12-31' as date) and party_type = 101 --   -- and party_type = '101' --     

Dan bandingkan dua rencana yang dihasilkan, yang pertama - ketika kita merujuk ke tipe yang salah (akan ada pemeran implisit ke int), yang kedua - ketika jenisnya sesuai dengan skema.


 PushedFilters: [IsNotNull(PARTY_TYPE)] //            . PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] //             . 

Masalah serupa diamati untuk kasus membandingkan tanggal dengan string, akan ada filter untuk membandingkan string. Contoh:


 where OPER_DATE = '2017-12-31' Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31) PushedFilters: [IsNotNull(OPER_DATE)] where OPER_DATE = cast('2017-12-31' as date) PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)] 

Untuk kasus ketika konversi tipe implisit dimungkinkan, misalnya, int -> desimal, pengoptimal melakukannya sendiri.


Penelitian lebih lanjut


Banyak informasi menarik tentang "kenop" yang dapat digunakan untuk menyempurnakan Catalyst, serta tentang kemungkinan (sekarang dan masa depan) dari pengoptimal, dapat diperoleh dari SQLConf.scala.


Secara khusus, seperti yang Anda lihat secara default, pengoptimal biaya masih dimatikan saat ini.


 val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false) 

Serta optimalisasi dependen yang terkait dengan pemesanan ulang join'ov.


 val JOIN_REORDER_ENABLED = buildConf("spark.sql.cbo.joinReorder.enabled") .doc("Enables join reorder in CBO.") .booleanConf .createWithDefault(false) 

atau


 val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf .createWithDefault(false) 

Ringkasan Singkat


Hanya sebagian kecil dari optimasi yang ada telah disentuh, eksperimen dengan optimasi biaya, yang dapat memberikan lebih banyak ruang untuk konversi kueri, ada di depan. Juga, pertanyaan menarik yang terpisah adalah perbandingan dari serangkaian optimisasi ketika membaca file dari Parket dan Orc, dilihat dari jira proyek, ini tentang paritas, tetapi benarkah demikian?


Selain itu:


  • Analisis dan optimalisasi permintaan menarik dan mengasyikkan, terutama mengingat ketersediaan kode sumber.
  • Dimasukkannya CBO akan memberikan ruang untuk optimasi dan penelitian lebih lanjut.
  • Penting untuk memantau penerapan aturan-aturan dasar yang memungkinkan Anda menyaring sebanyak mungkin data "ekstra" sedini mungkin.
  • Bergabung adalah kejahatan yang perlu, tetapi jika memungkinkan, meminimalkan mereka dan melacak implementasi mana yang digunakan di bawah tenda.

Source: https://habr.com/ru/post/id417103/


All Articles