
في هذا المنشور ، سننظر في الوصول إلى واجهة برمجة تطبيقات Spark من لغات برمجة مختلفة في JVM ، بالإضافة إلى بعض مشكلات الأداء عند تجاوز لغة Scala. حتى إذا كنت تعمل خارج JVM ، فقد يكون هذا القسم مفيدًا ، نظرًا لأن اللغات غير JVM غالبًا ما تعتمد على Java API ، وليس على Scala API.
لا يعني العمل بلغات برمجة أخرى دائمًا أنك بحاجة إلى تجاوز JVM ، والعمل في JVM له مزايا عديدة من حيث الأداء - ويرجع ذلك أساسًا إلى أنك لست بحاجة إلى نسخ البيانات. على الرغم من أنه ليس من الضروري استخدام مكتبات أو محولات ربط خاصة للوصول إلى Spark من خارج لغة Scala ، إلا أن استدعاء كود Scala من لغات البرمجة الأخرى قد يكون صعبًا. يدعم إطار عمل Spark استخدام Java 8 في تعبيرات lambda ، وأولئك الذين يستخدمون إصدارات قديمة من JDK لديهم الفرصة لتطبيق الواجهة المناسبة من حزمة org.apache.spark.api.java.function. حتى في الحالات التي لا تحتاج فيها إلى نسخ البيانات ، قد يكون للعمل بلغة برمجة أخرى فروق دقيقة صغيرة ولكنها مهمة تتعلق بالأداء.
تظهر الصعوبات في الوصول إلى واجهات برمجة تطبيقات Scala المختلفة بشكل خاص عند استدعاء وظائف باستخدام علامات الفئة أو عند استخدام الخصائص المقدمة باستخدام تحويلات النوع الضمنية (على سبيل المثال ، جميع وظائف مجموعات RDD المتعلقة بالفئتين المزدوجة و Tuple). بالنسبة للآليات التي تعتمد على تحويلات النوع الضمنية ، غالبًا ما يتم توفير فئات ملموسة مكافئة مع تحويلات صريحة لها. يمكن تمرير علامات فئة وهمية (على سبيل المثال ، AnyRef) إلى الوظائف التي تعتمد على علامات الفئة (غالبًا ما تقوم المحولات بذلك تلقائيًا. عادةً لا يؤدي استخدام فئات معينة بدلاً من تحويلات النوع الضمنية إلى زيادة في الحمل ، ولكن يمكن أن تفرض علامات الفئة الوهمية قيودًا على بعض تحسينات المترجم.
لا تختلف واجهة برمجة تطبيقات Java كثيرًا عن Scala API من حيث الخصائص ، وفي بعض الأحيان تكون بعض الوظائف أو واجهات برمجة تطبيقات المطور مفقودة. يتم دعم لغات برمجة JVM الأخرى ، مثل لغة Clojure مع DSL
Flambo والمكتبة
البراقة ، باستخدام واجهات برمجة تطبيقات Java مختلفة بدلاً من استدعاء Scala API مباشرة. نظرًا لأن معظم روابط اللغات ، حتى اللغات غير JVM مثل Python و R ، تمر عبر واجهة برمجة تطبيقات
Java ، سيكون من المفيد التعامل معها.
تتشابه واجهات برمجة تطبيقات Java إلى حد كبير مع واجهات برمجة تطبيقات Scala ، على الرغم من أنها مستقلة عن علامات الفئة والتحويلات الضمنية. يعني غياب هذا الأخير أنه بدلاً من تحويل مجموعات RDD تلقائيًا من كائنات Tuple أو مزدوجة إلى فئات خاصة مع وظائف إضافية ، يجب عليك استخدام وظائف تحويل النوع الصريح (على سبيل المثال ، mapToDouble أو mapToPair). يتم تحديد الوظائف المحددة فقط لمجموعات Java RDD ؛ لحسن الحظ من أجل التوافق ، هذه الأنواع الخاصة هي مجرد محولات لمجموعات Scala RDD. بالإضافة إلى ذلك ، ترجع هذه الوظائف الخاصة أنواعًا مختلفة من البيانات ، مثل JavaDoubleRDD و JavaPairRDD ، مع الميزات التي توفرها تحويلات لغة Scala الضمنية.
دعنا ننتقل مرة أخرى إلى المثال المتعارف عليه لعد الكلمات باستخدام Java API (مثال 7.1). نظرًا لأن استدعاء Scala API من Java قد يكون أمرًا صعبًا في بعض الأحيان ، فإن واجهات برمجة تطبيقات Java لإطار عمل Spark يتم تنفيذها تقريبًا في لغة Scala مع علامات فئة مخفية وتحويلات ضمنية. وبسبب هذا ، فإن محولات Java هي طبقة رقيقة جدًا ، في المتوسط تتكون من بضعة أسطر من التعليمات البرمجية ، وإعادة كتابتها بدون جهد تقريبًا.
مثال 7.1 عد الكلمات (جافا)
import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.api.java.JavaSparkContext; import java.util.regex.Pattern; import java.util.Arrays; public final class WordCount { private static final Pattern pattern = Pattern.compile(" "); public static void main(String[] args) throws Exception { JavaSparkContext jsc = new JavaSparkContext(); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(e -> Arrays.asList( pattern.split(e)).iterator()); JavaPairRDD<String, Integer> wordsIntial = words.mapToPair( e -> new Tuple2<String, Integer>(e, 1)); } }
في بعض الأحيان قد تحتاج إلى تحويل جافا RDDs إلى Scala RDDs أو العكس. غالبًا ما يكون هذا مطلوبًا للمكتبات التي تتطلب إدخال أو إعادة مجموعات Scala RDD ، ولكن في بعض الأحيان قد لا تكون خصائص Spark الأساسية متاحة بعد في واجهة برمجة تطبيقات Java. يعد تحويل RDDs إلى Java إلى Scala RDDs أسهل طريقة لاستخدام هذه الميزات الجديدة.
إذا كنت بحاجة إلى نقل مجموعة Java RDD إلى مكتبة Scala ، والتي تتوقع وجود RDD Spark عادي عند الإدخال ، يمكنك الوصول إلى Scala RDD الأساسي باستخدام طريقة rdd (). غالبًا ما يكون هذا كافيًا لنقل RDD النهائي إلى أي مكتبة Scala مطلوبة ؛ من بين الاستثناءات البارزة هي مكتبات Scala ، التي تعتمد على التحويلات الضمنية لأنواع أنواع أنواع المحتوى أو معلومات علامات الصف في عملها. في هذه الحالة ، فإن أسهل طريقة للوصول إلى التحويلات الضمنية هي كتابة محول صغير في Scala. إذا كان لا يمكن استخدام الأصداف Scala ، فيمكنك استدعاء الوظيفة المقابلة لفئة
JavaConverters وتشكيل علامة فئة وهمية.
لإنشاء علامة فئة وهمية ، يمكنك استخدام طريقة scala.reflect.ClassTag $ .MODULE $ .AnyRef () أو الحصول على العلامة الحقيقية باستخدام scala.reflect.ClassTag $ .MODULE $ .apply (CLASS) ، كما هو موضح في الأمثلة 7.2 و 7.3.
للتحويل من Scala RDD إلى RDD Java ، غالبًا ما تكون معلومات علامة الفصل أكثر أهمية من معظم مكتبات Spark. والسبب هو أنه على الرغم من أن فئات JavaRDD المختلفة توفر منشئين يمكن الوصول إليهم بشكل عام يأخذون Scala RDD كوسيطات ، إلا أنهم يقصدون استدعاؤهم من كود Scala ، وبالتالي يتطلبون معلومات حول علامة الفئة.
غالبًا ما يتم استخدام علامات فئة وهمية في التعليمات البرمجية العامة أو القالب ، حيث تكون الأنواع الدقيقة غير معروفة في وقت الترجمة. غالبًا ما تكون هذه العلامات كافية ، على الرغم من وجود احتمال لفقدان بعض الفروق الدقيقة على جانب رمز Scala ؛ في حالات نادرة جدًا ، يتطلب رمز Scala معلومات دقيقة لعلامات الفئة. في هذه الحالة ، سيكون عليك استخدام علامة حقيقية. في معظم الحالات ، لا يتطلب هذا الكثير من الجهد ويحسن الأداء ، لذا حاول استخدام هذه العلامات حيثما أمكن.
مثال 7.2. جعل جافا / سكالا RDD متوافق مع علامة فئة وهمية
public static JavaPairRDD wrapPairRDDFakeCt( RDD<Tuple2<String, Object>> RDD) { // AnyRef — // , // , // // ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef(); return new JavaPairRDD(rdd, fake, fake); }
مثال 7.3. ضمان توافق Java / Scala RDD
public static JavaPairRDD wrapPairRDD( RDD<Tuple2<String, Object>> RDD) { // ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class); ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class); return new JavaPairRDD(rdd, strCt, longCt); }
كل من Spark SQL و API لخطوط الأنابيب ML كانت في معظمها متناسقة في Java و Scala. ومع ذلك ، توجد وظائف المساعد الخاصة بـ Java ، ولا يسهل استدعاء وظائف Scala المكافئة لها. فيما يلي أمثلةهم: وظائف عددية مختلفة ، مثل الجمع والطرح ، وما إلى ذلك ، لفئة العمود. من الصعب استدعاء مكافئاتها الزائدة من لغة سكالا (+ ، -). بدلاً من استخدام JavaDataFrame و JavaSQLContext ، يتم توفير الطرق المطلوبة لـ Java في SQLContext ومجموعات DataFrame العادية. قد يربكك هذا ، لأن بعض الطرق المذكورة في وثائق Java لا يمكن استخدامها من كود Java ، ولكن في مثل هذه الحالات يتم توفير وظائف بنفس الأسماء للاتصال من Java.
الوظائف المعرفة من قبل المستخدم (UDFs) في لغة Java ، ولهذا السبب ، في معظم اللغات الأخرى باستثناء Scala ، تتطلب تحديد نوع القيمة التي يتم إرجاعها بواسطة الوظيفة ، حيث لا يمكن استنتاجها منطقيًا ، على غرار كيفية تنفيذها في Scala (مثال 7.4) .
مثال 7.4. عينة UDF لجافا
sqlContext.udf() .register("strlen", (String s) -> s.length(), DataTypes.StringType);
على الرغم من أن الأنواع المطلوبة من قبل Scala و Java APIs مختلفة ، فإن التفاف أنواع مجموعات Java لا يتطلب نسخًا إضافيًا. في حالة التكرارات ، يتم إجراء تحويل النوع المطلوب للمحول بطريقة متأخرة عند الوصول إلى العناصر ، مما يسمح لإطار عمل Spark بتفريغ البيانات إذا لزم الأمر (كما هو موضح في قسم "تنفيذ تحويلات المكرر - المكرر باستخدام وظيفة mapPartitions" في الصفحة 121). هذا مهم للغاية لأنه بالنسبة للعديد من العمليات البسيطة ، قد تكون تكلفة نسخ البيانات أعلى من تكلفة الحساب نفسه.
ما بعد سكالا و JVM
إذا لم تقتصر على JVM ، فسيزداد عدد لغات البرمجة المتاحة للعمل بشكل كبير. ومع ذلك ، مع بنية Spark الحالية ، يمكن أن يؤدي العمل خارج JVM - خاصة في عقد العمل - إلى زيادات كبيرة في التكلفة بسبب نسخ البيانات في عقد العمل بين JVM ورمز اللغة الهدف. في العمليات المعقدة ، تكون حصة تكلفة نسخ البيانات صغيرة نسبيًا ، ولكن في العمليات البسيطة يمكن أن تؤدي بسهولة إلى مضاعفة التكلفة الحسابية الإجمالية.
أول لغة برمجة غير JVM مدعومة مباشرة خارج Spark هي Python ، وأصبحت واجهة برمجة التطبيقات والواجهة الخاصة بها هي النموذج الذي تعتمد عليه تطبيقات لغات البرمجة الأخرى غير JVM.
كيف يعمل PySpark
يتصل PySpark بـ JVM Spark باستخدام مزيج من القنوات على العمال و Py4J ، وهي مكتبة متخصصة توفر تفاعل Python / Java ، على السائق. تحت هذا ، للوهلة الأولى ، تخفي العمارة البسيطة الكثير من الفروق الدقيقة المعقدة ، بفضلها يعمل PySpark ، كما هو موضح في الشكل. 7.1. إحدى المشاكل الرئيسية: حتى عندما يتم نسخ البيانات من عامل Python إلى JVM ، فإنه ليس في الشكل الذي يمكن للجهاز الظاهري تحليله بسهولة. يلزم بذل جهود خاصة من جانب كل من عامل Python و Java لضمان أن JVM لديه معلومات كافية لعمليات مثل التقسيم.
أطقم PySpark RDD
تعتبر تكلفة الموارد اللازمة لنقل البيانات من وإلى JVM ، وكذلك لتشغيل منفذ Python التنفيذي ، كبيرة. يمكنك تجنب العديد من مشكلات الأداء في واجهات برمجة تطبيقات PySpark RDD Suite باستخدام واجهات برمجة التطبيقات DataFrame / Dataset ، لأن البيانات تبقى في JVM لأطول فترة ممكنة.
يتم نسخ البيانات من JVM إلى Python باستخدام المقابس والبايتات المتسلسلة. يتوفر إصدار أكثر عمومية للتفاعل مع البرامج بلغات أخرى من خلال واجهة PipedRDD ، ويظهر تطبيقه في القسم الفرعي "استخدام الأنبوب".
سيكون تنظيم قنوات تبادل البيانات (في اتجاهين) مكلفًا للغاية. نتيجة لذلك ، يقوم PySpark بتنظيم (إن أمكن) خط أنابيب تحويل Python داخل مترجم Python ، وتسلسل عملية التصفية ، وبعدها الخريطة ، على مكرر كائن Python باستخدام فئة PipelinedRDD المتخصصة. حتى عندما تحتاج إلى تبديل البيانات ولا يكون PySpark قادرًا على ربط التحويلات في آلة افتراضية للعامل الفردي ، يمكنك إعادة استخدام مترجم Python ، وبالتالي فإن تكلفة بدء تشغيل المترجم لن تتباطأ أكثر.
هذا ليس سوى جزء من اللغز. تعمل أقراص PipedRDD العادية مع نوع السلسلة ، والذي ليس من السهل تبديله بسبب عدم وجود مفتاح طبيعي. في PySpark ، وفي صورتها وتشابهها في المكتبات المرتبطة بالعديد من لغات البرمجة الأخرى ، يتم استخدام نوع خاص من PairwiseRDD ، حيث يكون المفتاح عددًا صحيحًا طويلًا ، ويتم تنفيذ عملية إلغاء التسلسل بواسطة رمز المستخدم بلغة سكالا ، والمخصصة لتحليل قيم Python. إن تكلفة إزالة التسلسل هذه ليست باهظة للغاية ، ولكنها توضح أن Scala في إطار عمل Spark يعتبر بشكل أساسي أن نتائج كود Python تعمل كمصفوفات بايت "غير شفافة".
على الرغم من بساطته ، فإن نهج التكامل هذا يعمل بشكل جيد بشكل مدهش ، ومعظم العمليات على مجموعات Scala RDD متاحة في Python. في بعض أصعب الأماكن في الكود ، يتم الوصول إلى المكتبات ، على سبيل المثال ، MLlib ، بالإضافة إلى تحميل / حفظ البيانات من مصادر مختلفة.
كما يفرض العمل مع تنسيقات البيانات المختلفة حدوده ، حيث يعتمد جزء كبير من التعليمات البرمجية لتحميل / حفظ البيانات من إطار عمل Spark على واجهات Hadoop Java. هذا يعني أن جميع البيانات المحملة يتم تحميلها أولاً في JVM ، ثم يتم نقلها إلى Python فقط.
عادة ما يتم استخدام طريقتين للتفاعل مع MLlib: إما أن يستخدم PySpark نوع بيانات متخصص مع تحويلات من نوع Scala ، أو يتم إعادة تنفيذ الخوارزمية في Python. يمكن تجنب هذه المشاكل باستخدام حزمة Spark ML ، التي تستخدم واجهة DataFrame / Dataset ، والتي عادة ما تقوم بتخزين البيانات في JVM.
PySpark DataFrame ومجموعات البيانات
لا تحتوي مجموعات DataFrame و Dataset على العديد من مشكلات الأداء في واجهات برمجة تطبيقات Python RDD Set لأنها تخزن البيانات في JVM لأطول فترة ممكنة. يظهر اختبار الأداء نفسه الذي أجريناه لتوضيح تفوق مجموعات DataFrame على مجموعات RDD (انظر الشكل 3.1) اختلافات كبيرة عند التشغيل في Python (الشكل 7.2).
بالنسبة للعديد من العمليات باستخدام مجموعات DataFrame و Dataset ، قد لا تحتاج إلى نقل البيانات من JVM على الإطلاق ، على الرغم من أن استخدام تعبيرات UDF و UDAF و Python lambda المختلفة تتطلب بطبيعة الحال نقل بعض البيانات إلى JVM. هذا يؤدي إلى المخطط المبسط التالي للعديد من العمليات ، والتي تبدو مثل تلك الموضحة في الشكل. 7.3.
الوصول إلى كائنات Java الأساسية والرمز المختلط في Scala
إحدى النتائج المهمة لهندسة PySpark هي أن العديد من فئات إطار عمل Spark Python هي في الواقع محولات لترجمة المكالمات من كود Python إلى نموذج JVM مفهوم.
إذا كنت تعمل مع مطوري Scala / Java وترغب في التفاعل مع التعليمات البرمجية الخاصة بهم ، فلن يكون هناك أي محولات للوصول إلى التعليمات البرمجية الخاصة بك مسبقًا ، ولكن يمكنك تسجيل Java / Scala UDF واستخدامها من كود Python. بدءًا من Spark 2.1 ، يمكن القيام بذلك باستخدام طريقة registerJavaFunction للكائن sqlContext.
في بعض الأحيان لا تمتلك هذه المحولات جميع الآليات اللازمة ، وبما أن Python لا تتمتع بحماية قوية ضد استدعاء الطرق الخاصة ، يمكنك اللجوء على الفور إلى JVM. ستسمح لك نفس التقنية بالوصول إلى التعليمات البرمجية الخاصة بك في JVM ، وبجهود قليلة ، قم بتحويل النتائج مرة أخرى إلى كائنات Python.
في القسم الفرعي "خطط الاستعلام الكبيرة والخوارزميات التكرارية" على ص. 91 لاحظنا أهمية استخدام إصدار JVM من مجموعات DataFrame و RDD لتقليل خطة الاستعلام. هذا حل بديل ، لأنه عندما تصبح خطط الاستعلام كبيرة جدًا بحيث لا يمكن معالجتها بواسطة مُحسِّن Spark SQL ، فإن مُحسِّن SQL ، بسبب وضع مجموعة RDD في المنتصف ، يفقد القدرة على النظر بعد اللحظة التي تظهر فيها البيانات في RDD. يمكن تحقيق نفس الشيء بمساعدة واجهات برمجة تطبيقات Python العامة ، ومع ذلك ، في نفس الوقت ، ستفقد العديد من مزايا مجموعات DataFrame ، لأنه سيتعين على جميع البيانات الانتقال ذهابًا وإيابًا من خلال عقد عمل Python. بدلاً من ذلك ، يمكنك تقليل الرسم البياني الأصلي من خلال الاستمرار في تخزين البيانات في JVM (كما هو موضح في المثال 7.5).
مثال 7.5 قطع خطة استعلام كبيرة لإطار DataFrame باستخدام Python
def cutLineage(df): """ DataFrame — .. : >>> df = RDD.toDF() >>> cutDf = cutLineage(df) >>> cutDf.count() 3 """ jRDD = df._jdf.toJavaRDD() jSchema = df._jdf.schema() jRDD.cache() sqlCtx = df.sql_ctx try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema) newDF = DataFrame(newJavaDF, sqlCtx) return newDF
بشكل عام ، من خلال الاصطلاح ، يتم استخدام بناء الجملة _j [abbreviated_name] للوصول إلى إصدارات Java الداخلية لمعظم كائنات Python. لذا ، على سبيل المثال ، يحتوي كائن SparkContext على _jsc ، مما يسمح لك بالحصول على كائن SparkContext Java الداخلي. هذا ممكن فقط في برنامج التشغيل ، لذلك عندما ترسل كائنات PySpark إلى عقد العمل ، لن تتمكن من الوصول إلى مكون Java الداخلي ولن تعمل معظم واجهة برمجة التطبيقات.
للوصول إلى فئة Spark في JVM ، التي لا تحتوي على محول Python ، يمكنك استخدام بوابة Py4J على برنامج التشغيل. يحتوي الكائن SparkContext على ارتباط إلى العبّارة في خاصية _gateway. سيسمح بناء الجملة sc._gateway.jvm. [Full_class_name_in_JVM] بالوصول إلى أي كائن Java.
ستعمل تقنية مماثلة في صفوف سكالا الخاصة بك إذا تم ترتيبها وفقًا لمسار الفصل. يمكنك إضافة ملفات JAR إلى مسار الفصل الدراسي باستخدام الأمر spark-Submit مع المعلمة --jars أو عن طريق تعيين خصائص التكوين spark.driver.extraClassPath. المثال 7.6 ، الذي ساعد على توليد الأرز. 7.2 ، تم تصميمه عن قصد لإنشاء بيانات لاختبار الأداء باستخدام رمز Scala الحالي.
مثال 7.6 استدعاء فئات غير Spark-JVM باستخدام Py4J
sc = sqlCtx._sc # SQL Context, 2.1, 2.0 , # 2.0, — , :p try: try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx except: javaSqlCtx = sqlCtx._jwrapped jsc = sc._jsc scalasc = jsc.sc() gateway = sc._gateway # java-, RDD JVM- # Row (Int, Double). RDD Python # RDD Java ( Row), # , . # Java-RDD Row — # DataFrame, # RDD Row. java_rdd = (gateway.jvm.com.highperformancespark.examples. tools.GenerateScalingData. generateMiniScaleRows(scalasc, rows, numCols)) # JSON . # Python- Java-. schema = StructType([ StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]) # 2.1 / 2.1 try: jschema = javaSqlCtx.parseDataType(schema.json()) except: jschema = sqlCtx._jsparkSession.parseDataType(schema.json()) # RDD (Java) DataFrame (Java) java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema) # DataFrame (Java) DataFrame (Python) python_dataframe = DataFrame(java_dataframe, sqlCtx) # DataFrame (Python) RDD pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1])) return (python_dataframe, pairRDD)
على الرغم من أن العديد من فئات Python هي ببساطة محولات لكائنات Java ، إلا أنه لا يمكن لف جميع كائنات Java في كائنات Python ثم استخدامها في Spark. على سبيل المثال ، يتم تمثيل الكائنات في مجموعات PySpark RDD كسلاسل متسلسلة ، والتي لا يمكن تحليلها إلا بسهولة في كود Python. لحسن الحظ ، يتم توحيد كائنات DataFrame بين لغات البرمجة المختلفة ، لذلك إذا كان بإمكانك تحويل بياناتك إلى مجموعات DataFrame ، فيمكنك بعد ذلك لفها في كائنات Python وإما استخدامها مباشرة ك Python DataFrame ، أو تحويل Python DataFrame إلى RDD لهذا نفس اللغة.
»يمكن العثور على مزيد من المعلومات حول الكتاب على
موقع الناشر على الويب»
المحتويات»
مقتطفاتخصم 20٪ على كوبونات الرشاشات -
سبارك