Spark SQL. قليلا عن محسن الاستعلام

مرحبا بالجميع. كمقدمة ، أود أن أخبركم كيف جئت إلى مثل هذه الحياة.


قبل الاجتماع مع Big Data و Spark ، على وجه الخصوص ، كان لدي الكثير وغالباً لتحسين استعلامات SQL ، أولاً لـ MSSQL ، ثم لـ Oracle ، والآن صادفت SparkSQL.


وإذا كان هناك بالفعل الكثير من الكتب الجيدة لـ DBMS التي تصف المنهجية و "الأقلام" التي يمكنك تحريفها للحصول على خطة الاستعلام المثلى ، فعندئذ لم أر مثل هذه الكتب لـ Spark. لقد صادفت المزيد من المقالات ومجموعات الممارسات ، والتي تتعلق أكثر بالعمل من خلال واجهة برمجة تطبيقات RDD / Dataset ، بدلاً من SQL خالص. بالنسبة لي ، أحد الكتب المرجعية حول تحسين SQL هو كتاب J. Lewis ، Oracle. أساسيات تحسين التكلفة ". بحثت عن شيء مماثل في عمق الدراسة. لماذا كان موضوع البحث بالتحديد SparkSQL ، وليس واجهة برمجة التطبيقات الأساسية؟ ثم كان سبب الاهتمام ميزات المشروع الذي أعمل عليه.




بالنسبة لأحد عملائنا ، تعمل شركتنا على تطوير مستودع بيانات ، وطبقة تفصيلية منها وجزء من حالات العرض في مجموعة Hadoop ، وحالات العرض النهائية في Oracle. يتضمن هذا المشروع طبقة تحويل بيانات واسعة النطاق ، والتي يتم تنفيذها على Spark. لتسريع التطوير والاتصال بمطوري ETL الذين ليسوا على دراية بتعقيدات تقنيات البيانات الكبيرة ، ولكنهم على دراية بأدوات SQL و ETL ، تم تطوير أداة تذكر أيديولوجياً أدوات ETL الأخرى ، على سبيل المثال ، Informatica ، وتسمح لك بتصميم عمليات ETL بصريًا مع الجيل التالي كود سبارك. نظرًا لتعقيد الخوارزميات والعدد الكبير من التحويلات ، يستخدم المطورون بشكل أساسي استعلامات SparkSQL.


وهنا تبدأ القصة ، حيث كان عليّ أن أجيب على عدد كبير من الأسئلة في النموذج "لماذا لا يعمل / يعمل الطلب ببطء / هل يعمل بشكل مختلف عن Oracle؟". تبين أن هذا هو الجزء الأكثر إثارة للاهتمام بالنسبة لي: "لماذا يعمل ببطء؟". علاوة على ذلك ، على عكس نظام إدارة قواعد البيانات (DBMS) الذي عملت معه من قبل ، يمكنك الدخول إلى شفرة المصدر والحصول على إجابة لأسئلتك.


القيود والافتراضات


يستخدم Spark 2.3.0 لتشغيل الأمثلة وتحليل شفرة المصدر.
من المفترض أن القارئ على دراية بهندسة Spark ، والمبادئ العامة لمحسن الاستعلام لأحد أنظمة DBMS. كحد أدنى ، يجب ألا تكون عبارة "خطة الاستعلام" مفاجئة بالتأكيد.


أيضًا ، لا تحاول هذه المقالة أن تصبح ترجمة لشفرة مُحسِّن Spark إلى اللغة الروسية ، لذا بالنسبة للأشياء المثيرة للاهتمام جدًا من وجهة نظر المُحسِّن ، والتي يمكن قراءتها في شفرة المصدر ، سيتم ذكرها لفترة وجيزة هنا مع روابط للفئات المقابلة.


انتقل إلى الدراسة


لنبدأ باستعلام صغير لاستكشاف المراحل الأساسية التي يمر من خلالها التحليل اللغوي إلى التنفيذ.


scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal") scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust") scala> val df = spark.sql(""" | select bal.account_rk, cust.full_name | from bal | join cust | on bal.party_rk = cust.party_rk | and bal.actual_date = cust.actual_date | where bal.actual_date = cast('2017-12-31' as date) | """) df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string] scala> df.explain(true) 

الوحدة الرئيسية المسؤولة عن تحليل SQL وتحسين خطة تنفيذ الاستعلام هي Spark Catalyst.


يتيح لك الإخراج الموسع في وصف خطة الطلب (df.explain (true)) تتبع جميع المراحل التي يمر بها الطلب:


  • خطة منطقية محللة - احصل على بعد تحليل SQL. في هذه المرحلة ، يتم التحقق من صحة النحوي للطلب فقط.

 == Parsed Logical Plan == 'Project ['bal.account_rk, 'cust.full_name] +- 'Filter ('bal.actual_date = cast(2017-12-31 as date)) +- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date)) :- 'UnresolvedRelation `bal` +- 'UnresolvedRelation `cust` 

  • الخطة المنطقية المحللة - في هذه المرحلة ، يتم إضافة معلومات حول هيكل الكيانات المستخدمة ، ويتم التحقق من مراسلات الهيكل والسمات المطلوبة.

 == Analyzed Logical Plan == account_rk: decimal(38,18), full_name: string Project [account_rk#1, full_name#59] +- Filter (actual_date#27 = cast(2017-12-31 as date)) +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- SubqueryAlias bal : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- SubqueryAlias cust +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc 

  • الخطة المنطقية المحسنة هي الأكثر إثارة للاهتمام بالنسبة لنا. في هذه المرحلة ، يتم تحويل شجرة الاستعلام الناتجة بناءً على قواعد التحسين المتاحة.

 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531)) +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc 

  • الخطة المادية - بدأت تؤخذ ميزات الوصول إلى بيانات المصدر في الاعتبار ، بما في ذلك التحسينات لتصفية الأقسام والبيانات لتقليل مجموعة البيانات الناتجة. تم تحديد استراتيجية تنفيذ الانضمام (لمزيد من التفاصيل حول الخيارات المتاحة ، انظر أدناه).

 == Physical Plan == *(2) Project [account_rk#1, full_name#59] +- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight :- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- *(2) Filter isnotnull(party_rk#18) : +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true])) +- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- *(1) Filter isnotnull(party_rk#57) +- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string> 

تتجاوز مراحل التحسين والتنفيذ التالية (على سبيل المثال ، WholeStageCodegen) نطاق هذه المقالة ، ولكن تم وصفها بتفصيل كبير (بالإضافة إلى المراحل الموضحة أعلاه) في Mastering Spark Sql .


عادةً ما تتم قراءة خطة تنفيذ الاستعلام "من الداخل" و "من الأسفل إلى الأعلى" ، أي يتم تنفيذ الأجزاء الأكثر تداخلًا أولاً ، وتتقدم تدريجيًا إلى الإسقاط النهائي الموجود في الأعلى.


أنواع محسنات الاستعلام


يمكن تمييز نوعين من محسنات الاستعلام:


  • محسنات تستند إلى القواعد (RBOs).
  • المُحسِّنون بناءً على تقدير تكلفة تنفيذ الاستعلام (مُحسِّن قائم على التكلفة ، CBO).

تركز القواعد الأولى على استخدام مجموعة من القواعد الثابتة ، على سبيل المثال ، تطبيق شروط الترشيح من حيث في المراحل المبكرة ، إن أمكن ، حساب الثوابت ، إلخ.


لتقييم جودة الخطة الناتجة ، يستخدم مُحسِّن CBO دالة تكلفة ، والتي تعتمد عادةً على كمية البيانات المُعالجة ، وعدد الصفوف التي تقع تحت المرشحات ، وتكلفة تنفيذ عمليات معينة.


لمعرفة المزيد عن مواصفات تصميم CBO لـ Apache Spark ، يرجى اتباع الروابط: المواصفات ومهمة JIRA الرئيسية للتنفيذ .


نقطة البداية لاستكشاف مجموعة كاملة من التحسينات الحالية هي رمز Optimizer.scala.


فيما يلي مقتطف قصير من قائمة طويلة من التحسينات المتاحة:


 def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, ........ 

وتجدر الإشارة إلى أن قائمة هذه التحسينات تتضمن كلا من التحسينات المستندة إلى القواعد والتحسينات استنادًا إلى تقديرات تكلفة الاستعلام ، والتي سيتم مناقشتها أدناه.


إحدى ميزات CBO هي أنه من أجل التشغيل الصحيح ، تحتاج إلى معرفة وتخزين المعلومات حول إحصائيات البيانات المستخدمة في الاستعلام - عدد السجلات ، حجم التسجيل ، الرسوم البيانية لتوزيع البيانات في أعمدة الجدول.


لجمع الإحصائيات ، مجموعة من أوامر SQL ANALYZE TABLE ... يتم استخدام إحصائيات الكمبيوتر ، بالإضافة إلى ذلك ، هناك حاجة إلى مجموعة من الجداول لتخزين المعلومات ، يتم توفير واجهة برمجة التطبيقات من خلال ExternalCatalog ، وبشكل أكثر دقة من خلال HiveExternalCatalog.


نظرًا لأن CBO معطل حاليًا بشكل افتراضي ، فسيتم التركيز بشكل رئيسي على البحث عن التحسينات الدقيقة والفروق الدقيقة في RBO.


أنواع واختيار استراتيجية الانضمام


في مرحلة تشكيل الخطة المادية لتنفيذ الطلب ، يتم اختيار استراتيجية الانضمام. الخيارات التالية متاحة حاليًا في Spark (يمكنك بدء تعلم التعليمات البرمجية من التعليمات البرمجية في SparkStrategies.scala).


ربط تجزئة البث


الخيار الأفضل هو إذا كان أحد أطراف الانضمام صغيرًا بما فيه الكفاية (يتم تعيين معيار الاكتفاء بواسطة spark.sql.autoBroadcastJoinThreshold في SQLConf). في هذه الحالة ، يتم نسخ هذا الجانب بالكامل إلى جميع المنفذين ، حيث يوجد رابط تجزئة مع الجدول الرئيسي. بالإضافة إلى الحجم ، تجدر الإشارة إلى أنه في حالة الصلة الخارجية ، يمكن فقط نسخ الجانب الخارجي ، لذلك ، إذا أمكن ، كجدول رائد في حالة الصلة الخارجية ، يجب عليك استخدام الجدول بأكبر كمية من البيانات.


   ,    ,     SQL      Oracle,   /*+ broadcast(t1, t2) */ 

دمج دمج الفرز


مع تشغيل spark.sql.join.preferSortMergeJoin افتراضيًا ، يتم تطبيق هذه الطريقة افتراضيًا إذا كان من الممكن فرز مفاتيح الانضمام.
من الميزات ، يمكن ملاحظة أنه ، على عكس الطريقة السابقة ، لا يتوفر تحسين إنشاء التعليمات البرمجية لأداء العملية إلا للربط الداخلي.


انضمام خلط التجزئة


إذا لم يكن من الممكن فرز المفاتيح ، أو تم تعطيل خيار تحديد دمج دمج الفرز الافتراضي ، فسيحاول Catalyst تطبيق ربط تجزئة عشوائي. بالإضافة إلى التحقق من الإعدادات ، تم التحقق أيضًا من أن Spark لديه ذاكرة كافية لإنشاء خريطة تجزئة محلية لقسم واحد (يتم تعيين العدد الإجمالي للأقسام عن طريق تعيين spark.sql.shuffle.partitions )


BroadcastNestedLoopJoin و CartesianProduct


في حالة عدم وجود إمكانية للمقارنة المباشرة حسب المفتاح (على سبيل المثال ، حالة مثل) أو لا توجد مفاتيح لربط الجداول ، اعتمادًا على حجم الجداول ، يتم تحديد هذا النوع أو CartesianProduct.


ترتيب تحديد الجداول في الانضمام


في أي حال ، يتطلب الربط جداول تبديل حسب المفتاح. لذلك ، في الوقت الحالي ، يعد ترتيب تحديد الجداول ، خاصةً في حالة إجراء عدة وصلات متتالية ، مهمًا (إذا كنت مملًا ، إذا لم يتم تمكين CBO ولم يتم تمكين إعداد JOIN_REORDER_ENABLED).


إذا كان ذلك ممكنًا ، يجب أن يقلل ترتيب الانضمام إلى الجداول من عدد عمليات التبديل العشوائي للجداول الكبيرة ، والتي يجب أن تتم عمليات الربط لها على نفس المفتاح بالتسلسل. ولا تنس أيضًا تقليل البيانات للانضمام لتمكين بث Hash Join.


التطبيق التبادلي لظروف المرشح


خذ بعين الاعتبار الاستعلام التالي:


 select bal.account_rk, cust.full_name from balance bal join customer cust on bal.party_rk = cust.party_rk and bal.actual_date = cust.actual_date where bal.actual_date = cast('2017-12-31' as date) 

هنا نقوم بتوصيل جدولين مقسمين بنفس الطريقة ، وفقًا لحقل التاريخ الفعلي ونطبق مرشحًا صريحًا فقط على القسم وفقًا لجدول التوازن.


كما يمكن رؤيته من خطة الاستعلام المحسنة ، يتم تطبيق عامل التصفية حسب التاريخ أيضًا على العميل ، وفي وقت قراءة البيانات من القرص ، يتم تحديد قسم واحد بالضبط.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57)) +- Relation[,... 9 more fields] orc 

ولكن تحتاج فقط إلى استبدال الصلة الداخلية بالجانب الأيسر الأيسر في الاستعلام ، حيث ينخفض ​​الضغط المسند لجدول العملاء على الفور ، ويحدث مسح كامل ، وهو تأثير غير مرغوب فيه.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Relation[,... 9 more fields] orc 

نوع التحويل


ضع في اعتبارك مثالاً بسيطًا للتحديد من جدول مع التصفية حسب نوع العميل ، في نوع الحقل نوع party_type هو سلسلة.


 select party_rk, full_name from cust where actual_date = cast('2017-12-31' as date) and party_type = 101 --   -- and party_type = '101' --     

وقارن بين الخطتين الناتجتين ، الأولى - عندما نشير إلى النوع غير الصحيح (سيكون هناك ضمنيًا ضمنيًا إلى int) ، والثانية - عندما يتوافق النوع مع المخطط.


 PushedFilters: [IsNotNull(PARTY_TYPE)] //            . PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] //             . 

لوحظت مشكلة مماثلة في حالة مقارنة التواريخ بسلسلة ، سيكون هناك مرشح لمقارنة السلاسل. مثال:


 where OPER_DATE = '2017-12-31' Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31) PushedFilters: [IsNotNull(OPER_DATE)] where OPER_DATE = cast('2017-12-31' as date) PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)] 

بالنسبة للحالة التي يكون فيها تحويل نوع ضمنيًا ممكنًا ، على سبيل المثال ، int -> عشري ، يقوم المحسن بذلك من تلقاء نفسه.


مزيد من البحث


يمكن الحصول على الكثير من المعلومات المثيرة للاهتمام حول "المقابض" التي يمكن استخدامها لضبط المحفز ، وكذلك حول إمكانيات (الحاضر والمستقبل) للمحسن من SQLConf.scala.


على وجه الخصوص ، كما ترى بشكل افتراضي ، لا يزال مُحسِّن التكلفة متوقفًا في الوقت الحالي.


 val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false) 

فضلا عن التحسينات التابعة لها المرتبطة بإعادة ترتيب join'ov.


 val JOIN_REORDER_ENABLED = buildConf("spark.sql.cbo.joinReorder.enabled") .doc("Enables join reorder in CBO.") .booleanConf .createWithDefault(false) 

أو


 val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf .createWithDefault(false) 

ملخص موجز


لم يلمس سوى جزء صغير من التحسينات الحالية ، وتلك التجارب مع تحسين التكلفة ، والتي يمكن أن تعطي مساحة أكبر بكثير لتحويل الاستعلام ، في المستقبل. أيضًا ، سؤال مثير للاهتمام منفصل هو مقارنة مجموعة من التحسينات عند قراءة الملفات من Parquet و Orc ، إذا حكمنا من قبل jira للمشروع ، فإن الأمر يتعلق بالتكافؤ ، ولكن هل هو كذلك حقًا؟


بالإضافة إلى:


  • تحليل الطلبات وتحسينها مثير للاهتمام ومثير للاهتمام ، خاصة بالنظر إلى توفر رموز المصدر.
  • إدراج CBO سيوفر المجال لمزيد من التحسينات والبحوث.
  • من الضروري مراقبة تطبيق القواعد الأساسية التي تسمح لك بتصفية أكبر قدر ممكن من البيانات "الإضافية" في المراحل المبكرة الممكنة.
  • الانضمام هو شر لا بد منه ، ولكن إذا كان ذلك ممكنًا ، فإنه يستحق التقليل منه وتتبع التنفيذ الذي يتم استخدامه تحت غطاء المحرك.

Source: https://habr.com/ru/post/ar417103/


All Articles