الجزء 1: بيان المشكلة
مرحبا يا هبر! أنا مهندس حلول في CleverDATA. اليوم سأتحدث عن كيفية تصنيف كميات كبيرة من البيانات باستخدام نماذج تم إنشاؤها باستخدام أي مكتبة تعلُم آلي متاحة تقريبًا. في هذه السلسلة المكونة من جزأين ، سننظر في الأسئلة التالية.
- كيفية تقديم نموذج تعلُّم الآلة كخدمة (نموذج كخدمة)؟
- كيف تتم مهام المعالجة الموزعة لكميات كبيرة من البيانات فعليًا باستخدام Apache Spark؟
- ما هي المشاكل التي تظهر عندما يتفاعل Apache Spark مع الخدمات الخارجية؟
- كيف يمكن تنظيم تفاعل Apache Spark مع الخدمات الخارجية باستخدام مكتبات akka-Streams و akka-http ، بالإضافة إلى نهج Stream Reactive Streams؟
في البداية ، خططت لكتابة مقال واحد ، ولكن بما أن حجم المادة تبين أنه كبير جدًا ، فقد قررت تقسيمه إلى جزأين. سننظر اليوم في الجزء الأول في البيان العام للمشكلة ، وكذلك المشاكل الرئيسية التي يجب حلها أثناء التنفيذ. في
الجزء الثاني ، سوف نتحدث عن التنفيذ العملي لحل هذه المشكلة باستخدام نهج التدفقات التفاعلية.
تمتلك شركتنا
CleverDATA فريقًا من محللي البيانات الذين ، بمساعدة مجموعة واسعة من الأدوات (مثل scikit-learn و facebook fastText و xgboost و tensorFlow ، وما إلى ذلك) ، يقومون بتدريب نماذج التعلم الآلي. لغة البرمجة الأساسية الفعلية التي يستخدمها المحللون هي Python. تحتوي جميع مكتبات تعلُم الآلة تقريبًا ، حتى التي تم تنفيذها في الأصل بلغات أخرى ، على واجهة Python ومتكاملة مع مكتبات Python الرئيسية (بشكل أساسي مع NumPy).
من ناحية أخرى ، يستخدم النظام الإيكولوجي Hadoop على نطاق واسع لتخزين ومعالجة كميات كبيرة من البيانات غير المهيكلة. في ذلك ، يتم تخزين البيانات على نظام ملفات HDFS في شكل كتل موزعة منسوخة ذات حجم معين (عادة 128 ميجا بايت ، ولكن من الممكن تكوينها). تحاول خوارزميات معالجة البيانات الموزعة الأكثر كفاءة تقليل تفاعل الشبكة بين أجهزة المجموعة. للقيام بذلك ، يجب معالجة البيانات على نفس الأجهزة حيث يتم تخزينها.
بالطبع ، في كثير من الحالات لا يمكن تجنب تفاعل الشبكة تمامًا ، ولكن ، مع ذلك ، تحتاج إلى محاولة تنفيذ جميع المهام محليًا وتقليل كمية البيانات التي ستحتاج إلى نقلها عبر الشبكة.
يسمى هذا المبدأ في معالجة البيانات الموزعة "نقل الحسابات بالقرب من البيانات". تلتزم جميع الأطر الرئيسية ، خاصة Hadoop MapReduce و Apache Spark ، بهذا المبدأ. إنها تحدد تركيبة وتسلسل عمليات محددة والتي يجب تشغيلها على الأجهزة حيث يتم تخزين كتل البيانات المطلوبة.
الشكل 1. تتألف مجموعة HDFS من عدة أجهزة ، أحدها عقدة الاسم ، والباقي عقدة بيانات. يخزن Name Node معلومات حول الملفات التي تكون كتلها ، وعن الأجهزة التي توجد فيها فعليًا. يتم تخزين الكتل نفسها في عقدة البيانات ، والتي يتم نسخها على عدة أجهزة لزيادة الموثوقية. تدير عقدة البيانات أيضًا مهام معالجة البيانات. تتكون المهام من العملية الرئيسية (Master ، M) ، التي تنسق إطلاق عمليات العمل (العامل ، W) على الأجهزة حيث يتم تخزين كتل البيانات اللازمة.يتم إطلاق جميع مكونات النظام البيئي Hadoop تقريبًا باستخدام Java Virtual Machine (JVM) ويتم دمجها بشكل وثيق مع بعضها البعض. على سبيل المثال ، لتشغيل المهام المكتوبة باستخدام Apache Spark للعمل مع البيانات المخزنة على HDFS ، لا توجد حاجة تقريبًا إلى معالجة إضافية: يوفر الإطار هذه الوظيفة من خارج الصندوق.
لسوء الحظ ، تفترض معظم المكتبات المصممة للتعلم الآلي أنه يتم تخزين البيانات ومعالجتها محليًا. في الوقت نفسه ، هناك مكتبات متكاملة بإحكام مع نظام Hadoop البيئي ، على سبيل المثال ، Spark ML أو Apache Mahout. ومع ذلك ، لديهم عدد من السلبيات الكبيرة. أولاً ، تقدم تطبيقات أقل بكثير لخوارزميات التعلم الآلي. ثانيًا ، لا يمكن لجميع محللي البيانات العمل معهم. تشمل مزايا هذه المكتبات حقيقة أنه يمكن استخدامها لتدريب النماذج على كميات كبيرة من البيانات باستخدام الحوسبة الموزعة.
ومع ذلك ، غالبًا ما يستخدم محللو البيانات طرقًا بديلة لتدريب النماذج ، ولا سيما المكتبات التي تتيح استخدام وحدات معالجة الرسومات. لن أفكر في قضايا نماذج التدريب في هذه المقالة ، لأنني أريد التركيز على استخدام النماذج الجاهزة التي تم إنشاؤها باستخدام أي مكتبة متاحة للتعلم الآلي لتصنيف كميات كبيرة من البيانات.
لذا ، فإن المهمة الرئيسية التي نحاول حلها هنا هي تطبيق نماذج التعلم الآلي على كميات كبيرة من البيانات المخزنة على HDFS. إذا استطعنا استخدام وحدة SparkML من مكتبة Apache Spark ، التي تنفذ خوارزميات التعلم الآلي الأساسية ، فسيكون تصنيف كميات كبيرة من البيانات مهمة تافهة:
val model: LogisticRegressionModel = LogisticRegressionModel.load("/path/to/model") val dataset = spark.read.parquet("/path/to/data") val result = model.transform(dataset)
لسوء الحظ ، يعمل هذا النهج فقط مع الخوارزميات التي تم تنفيذها في وحدة SparkML (يمكن العثور على قائمة كاملة
هنا ). في حالة استخدام مكتبات أخرى ، علاوة على ذلك ، لا يتم تنفيذها على JVM ، يصبح كل شيء أكثر تعقيدًا.
لحل هذه المشكلة ، قررنا لف النموذج في خدمة REST. وفقًا لذلك ، عند بدء مهمة تصنيف البيانات المخزنة على HDFS ، من الضروري تنظيم التفاعل بين الأجهزة التي يتم تخزين البيانات عليها والجهاز (أو مجموعة الأجهزة) التي تعمل عليها خدمة التصنيف.
الشكل 2. مفهوم النموذج كخدمةوصف خدمة تصنيف Python
من أجل تقديم النموذج كخدمة ، من الضروري حل المهام التالية:
- تنفيذ الوصول الفعال إلى النموذج عبر HTTP ؛
- ضمان الاستخدام الأكثر كفاءة لموارد الجهاز (جميع نوى المعالج والذاكرة في المقام الأول) ؛
- توفر مقاومة للأحمال العالية ؛
- توفر القدرة على التوسع أفقيًا.
يعد الوصول إلى النموذج عبر HTTP أمرًا بسيطًا للغاية في التنفيذ: تم تطوير عدد كبير من المكتبات لبايثون تسمح لك بتنفيذ نقطة وصول REST باستخدام كمية صغيرة من التعليمات البرمجية. أحد هذه الإطارات الدقيقة هو
قارورة . تنفيذ خدمة التصنيف على القارورة كالتالي:
from flask import Flask, request, Response model = load_model() n_features = 100 app = Flask(__name__) @app.route("/score", methods=['PUT']) def score(): inp = np.frombuffer(request.data, dtype='float32').reshape(-1, n_features) result = model.predict(inp) return Response(result.tobytes(), mimetype='application/octet-stream') if __name__ == "__main__": app.run()
هنا ، عندما تبدأ الخدمة ، نقوم بتحميل النموذج في الذاكرة ، ثم نستخدمه عند استدعاء طريقة التصنيف. تقوم وظيفة load_model بتحميل النموذج من مصدر خارجي ، سواء كان نظام الملفات ، أو تخزين قيمة المفتاح ، إلخ.
النموذج هو كائن له طريقة تنبؤية. في حالة التصنيف ، فإنه يأخذ مدخلات إلى بعض متجه الميزة بحجم معين وينتج إما قيمة منطقية تشير إلى ما إذا كان المتجه المحدد مناسبًا لهذا النموذج ، أو بعض القيمة من 0 إلى 1 ، والتي يمكنك عندئذٍ تطبيق عتبة القطع: كل شيء فوق العتبة ، هي نتيجة إيجابية للتصنيف ، والباقي ليس كذلك.
يتم تمرير ناقل الميزة الذي نحتاج إلى تصنيفه في شكل ثنائي وإزالته إلى مصفوفة غير مرتبة. سيكون من العبء إجراء طلب HTTP لكل متجه. على سبيل المثال ، في حالة متجه 100 الأبعاد واستخدام قيم النوع float32 ، فإن طلب HTTP الكامل ، بما في ذلك الرؤوس ، سيبدو كالتالي:
PUT /score HTTP/1.1 Host: score-node-1:8099 User-Agent: curl/7.58.0 Accept: */* Content-Type: application/binary Content-Length: 400 [400 bytes of data]
كما ترون ، فإن كفاءة هذا الطلب منخفضة جدًا (400 بايت من الحمولة / (133 بايت رأس + 400 بايت)) = 75٪). لحسن الحظ ، في جميع المكتبات تقريبًا ، تتيح لك طريقة التنبؤ عدم استقبال المتجه [1 xn] ، ولكن المصفوفة [mxn] ، وبالتالي ، إخراج النتيجة فورًا لقيم الإدخال m.
بالإضافة إلى ذلك ، تم تحسين مكتبة numpy للعمل مع المصفوفات الكبيرة ، مما يتيح لك استخدام جميع موارد الجهاز المتاحة بشكل فعال. وبالتالي ، لا يمكننا إرسال واحد فقط بل عددًا كبيرًا من متجهات الميزات في طلب واحد ، وإلغاء تسلسلها إلى مصفوفة غير مرتبة بحجم [mxn] ، وتصنيف ، وإرجاع المتجه [mx 1] من القيم المنطقية أو العائمة 32. ونتيجة لذلك ، تصبح كفاءة تفاعل HTTP عند استخدام مصفوفة من 1000 صف تساوي 100٪ تقريبًا. يمكن تجاهل حجم رؤوس HTTP في هذه الحالة.
لاختبار خدمة Flask على الجهاز المحلي ، يمكنك تشغيلها من سطر الأوامر. ومع ذلك ، فإن هذه الطريقة غير مناسبة تمامًا للاستخدام الصناعي. والحقيقة هي أن Flask هو مؤشر ترابط واحد ، وإذا نظرنا إلى الرسم البياني لتحميل المعالج أثناء تشغيل الخدمة ، فسوف نرى أن نواة واحدة محملة بنسبة 100 ٪ والباقي غير نشط. لحسن الحظ ، هناك طرق لاستخدام جميع نواة الجهاز: لذلك ، يجب تشغيل Flask من خلال خادم تطبيقات الويب uwsgi. يتيح لك تكوين عدد العمليات والخيوط بشكل مثالي لضمان الحمل المنتظم على جميع نوى المعالج. يمكن العثور على مزيد من التفاصيل حول جميع خيارات تكوين uwsgi
هنا .
من الأفضل استخدام nginx كنقطة دخول HTTP ، لأن uwsgi يمكن أن يعمل بشكل غير مستقر في حالة الأحمال العالية. من ناحية أخرى ، تأخذ Nginx تدفق المدخلات بالكامل من الطلبات على نفسها ، وتصفية الطلبات غير الصالحة ، وجرعات الحمل على uwsgi. تتواصل Nginx مع uwsgi عبر مقابس لينكس باستخدام ملف معالجة. فيما يلي مثال لتهيئة nginx:
server { listen 80; server_name 127.0.0.1; location / { try_files $uri @score; } location @score { include uwsgi_params; uwsgi_pass unix:/tmp/score.sock; } }
كما نرى ، تبين أنه تكوين معقد إلى حد ما لجهاز واحد. إذا احتجنا إلى تصنيف كميات كبيرة من البيانات ، فسيأتي عدد كبير من الطلبات لهذه الخدمة ، ويمكن أن تصبح اختناقًا. الحل لهذه المشكلة هو التحجيم الأفقي.
من أجل الراحة ، نقوم بتغليف الخدمة في حاوية Docker ثم نشرها على العدد المطلوب من الأجهزة. إذا رغبت في ذلك ، يمكنك استخدام أدوات النشر الآلي مثل Kubernetes. فيما يلي مثال على هيكل Dockerfile لإنشاء حاوية مع خدمة.
FROM ubuntu #Installing required ubuntu and python modules RUN apt-get update RUN apt-get -y install python3 python3-pip nginx RUN update-alternatives
لذا فإن هيكلية خدمة التصنيف هي كما يلي:
الشكل 3. مخطط خدمة التصنيفملخص موجز لعمل Apache Spark في النظام البيئي Hadoop
الآن خذ بعين الاعتبار عملية معالجة البيانات المخزنة على HDFS. كما أشرت سابقًا ، يتم استخدام مبدأ نقل الحسابات إلى البيانات لهذا الغرض. لبدء مهام المعالجة ، تحتاج إلى معرفة الأجهزة التي يتم تخزين كتل البيانات التي نحتاجها من أجل تشغيل العمليات المعنية مباشرة بمعالجتها. من الضروري أيضًا تنسيق إطلاق هذه العمليات ، وإعادة تشغيلها في حالة الطوارئ ، إذا لزم الأمر ، تجميع نتائج المهام الفرعية المختلفة ، إلخ.
يتم إنجاز جميع هذه المهام من خلال مجموعة متنوعة من الأطر التي تعمل مع النظام الإيكولوجي Hadoop. واحدة من الأكثر شعبية ومريحة هي Apache Spark. المفهوم الرئيسي الذي تم بناء الإطار بأكمله حوله هو RDD (مجموعة البيانات الموزعة المرنة). بشكل عام ، يمكن اعتبار RDD كمجموعة موزعة مقاومة للسقوط. يمكن الحصول على RDD بطريقتين رئيسيتين:
- إنشاء من مصدر خارجي ، مثل مجموعة في الذاكرة أو ملف أو دليل على نظام الملفات ، وما إلى ذلك ؛
- التحويل من RDD آخر من خلال تطبيق عمليات التحويل. يدعم RDD جميع العمليات الأساسية للعمل مع المجموعات ، مثل الخريطة ، flatMap ، المرشح ، groupBy ، الانضمام ، إلخ.
من المهم أن نفهم أن RDD ، على عكس المجموعات ، ليست بيانات مباشرة ، بل سلسلة من العمليات التي يجب إجراؤها على البيانات. لذلك ، عندما يتم استدعاء عمليات التحويل ، لا يحدث أي عمل في الواقع ، ونحصل فقط على RDD جديد ، والذي سيحتوي على عملية واحدة أكثر من العملية السابقة. يبدأ العمل نفسه عندما يتم استدعاء ما يسمى العمليات أو الإجراءات الطرفية. وتشمل هذه الحفظ في ملف ، والحفظ في مجموعة في الذاكرة ، وحساب عدد العناصر ، وما إلى ذلك.
عند بدء عملية نهائية ، يقوم Spark ، على أساس RDD الناتج ، ببناء رسم بياني لعملية لا حلقية (DAG ، رسم بياني غير مباشر موجه) وتشغيلها بالتتابع على الكتلة وفقًا للرسم البياني المستلم. عند بناء DAG على أساس RDD ، يقوم Spark بعدد من التحسينات ، على سبيل المثال ، إذا أمكن ، يجمع بين عدة تحويلات متتالية في عملية واحدة.
كانت RDD الوحدة الرئيسية للتفاعل مع Spark API في إصدارات Spark 1.x. في الإصدار Spark 2.x ، قال المطورون إن المفهوم الرئيسي للتفاعل هو مجموعة البيانات. Dataset عبارة عن إضافة لـ RDD مع دعم للتفاعل الشبيه بـ SQL. عند استخدام Dataset API ، تتيح لك Spark استخدام مجموعة واسعة من التحسينات ، بما في ذلك التحسينات منخفضة المستوى إلى حد ما. ولكن بشكل عام ، تنطبق المبادئ الأساسية التي تنطبق على RDDs على مجموعة البيانات.
يمكن العثور على مزيد من التفاصيل حول عمل Spark في
الوثائق على الموقع الرسمي .
دعنا نفكر في مثال على أبسط تصنيف على Spark دون استخدام الخدمات الخارجية. يتم تنفيذ خوارزمية لا معنى لها إلى حد ما هنا ، والتي تأخذ في الاعتبار نسبة كل حرف لاتيني في النص ، ثم تأخذ في الاعتبار الانحراف المعياري. هنا ، أولاً وقبل كل شيء ، من المهم الانتباه مباشرة إلى الخطوات الأساسية المستخدمة عند العمل مع Spark.
case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) def std(vector: Array[Float]): Float = ???
في هذا المثال ، نحن:
- نحدد هيكل المدخلات والبيانات الوسيطة والمخرجات (يتم تعريف بيانات الإدخال على أنها بعض النصوص التي يرتبط بها معرف معين ، وتتطابق البيانات المتوسطة مع المعرّف مع ناقل الميزة ، ويطابق الناتج المعرف مع بعض القيمة العددية) ؛
- نحدد دالة لحساب القيمة الناتجة بواسطة ناقل الميزة (على سبيل المثال ، الانحراف المعياري ، التنفيذ غير مبين) ؛
- تعريف مجموعة البيانات الأصلية على أنها بيانات مخزنة على HDFS بتنسيق الباركيه على طول المسار / المسار / إلى / البيانات ؛
- تحديد مجموعة بيانات وسيطة كخريطة نقطية من مجموعة البيانات الأصلية.
- وبالمثل ، نحدد مجموعة البيانات الناتجة من خلال تحويل أحادي المعامل من المتوسط ؛
- حفظ مجموعة البيانات الناتجة إلى HDFS بتنسيق الباركيه على طول المسار / المسار / إلى / النتيجة. نظرًا لأن الحفظ في ملف هو عملية نهائية ، يتم بدء الحسابات نفسها بدقة في هذه المرحلة.
يعمل Apache Spark على مبدأ العامل الرئيسي. عندما يبدأ التطبيق ، تبدأ العملية الرئيسية ، التي تسمى السائق. ينفذ الكود المسؤول عن توليد RDD ، على أساسها سيتم إجراء الحسابات.
عندما يتم استدعاء عملية طرفية ، يقوم السائق بإنشاء DAG بناءً على RDD الناتج. ثم يبدأ السائق في إطلاق تدفقات العمل تسمى التنفيذيين ، حيث سيتم معالجة البيانات مباشرة. بعد بدء سير العمل ، يمررها السائق الكتلة القابلة للتنفيذ التي يجب تنفيذها ، ويشير أيضًا إلى أي جزء من البيانات يحتاج إلى تطبيقه.
يوجد أدناه رمز المثال الخاص بنا ، حيث يتم تمييز أقسام التعليمات البرمجية التي يتم تنفيذها على المنفذ (بين بداية سطر المنفذ ونهاية جزء المنفذ). يتم تنفيذ ما تبقى من التعليمات البرمجية على السائق.
case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) def std(vector: Array[Float]): Float = ??? val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] val result: Dataset[Score] = ds.map {
في نظام Hadoop البيئي ، يتم تشغيل جميع التطبيقات في حاويات. الحاوية هي عملية يتم تشغيلها على أحد الأجهزة في مجموعة تم تخصيصها لكمية معينة من الموارد. يتم التعامل مع إطلاق الحاوية بواسطة مدير موارد YARN. يحدد أي من الأجهزة يحتوي على عدد كاف من نوى المعالج وذاكرة الوصول العشوائي ، وكذلك ما إذا كان يحتوي على كتل البيانات اللازمة للمعالجة.
عند تشغيل تطبيق Spark ، تقوم YARN بإنشاء الحاوية وتشغيلها على أحد آلات المجموعة التي يتم فيها تشغيل برنامج التشغيل. بعد ذلك ، عندما يقوم السائق بتحضير DAG من العمليات التي يجب تشغيلها على المنفذين ، تقوم YARN بتشغيل حاويات إضافية على الأجهزة المطلوبة.
كقاعدة ، يكفي للسائق أن يخصص نواة واحدة وكمية صغيرة من الذاكرة (إلا إذا ، بالطبع ، لن يتم تجميع نتيجة الحساب على السائق في الذاكرة). بالنسبة إلى المنفذين ، من أجل تحسين الموارد وتقليل العدد الإجمالي للعمليات في النظام ، يمكن تمييز أكثر من جوهر واحد: في هذه الحالة ، سيكون المنفذ قادرًا على أداء العديد من المهام في وقت واحد.
ولكن من المهم أن نفهم أنه في حالة فشل إحدى المهام قيد التشغيل في الحاوية أو في حالة عدم كفاية الموارد ، فقد تقرر YARN إيقاف الحاوية ، ثم يجب إعادة تشغيل جميع المهام التي تم تنفيذها فيها مرة أخرى على منفذ آخر. بالإضافة إلى ذلك ، إذا خصصنا عددًا كبيرًا بما فيه الكفاية من النوى لكل حاوية ، فمن المحتمل ألا يتمكن YARN من تشغيله. على سبيل المثال ، إذا كان لدينا جهازان يتم ترك نواتين عليهما بدون استخدام ، فيمكننا البدء في كل حاوية تتطلب نواتين ، ولكن لا يمكننا بدء حاوية واحدة تتطلب أربعة نوى.
الآن دعونا نرى كيف سيتم تنفيذ التعليمات البرمجية من مثالنا مباشرة على الكتلة. تخيل أن حجم البيانات المصدر هو 2 تيرابايت. وفقًا لذلك ، إذا كان حجم الكتلة على HDFS هو 128 ميغا بايت ، فسيكون هناك 16384 كتلة في المجموع. يتم نسخ كل كتلة إلى أجهزة متعددة لضمان الموثوقية. من أجل البساطة ، نأخذ معامل النسخ يساوي اثنين ، أي أنه سيكون هناك 32768 كتلة متاحة في المجموع. لنفترض أننا نستخدم مجموعة من 16 جهازًا للتخزين. وبناءً على ذلك ، في كل جهاز في حالة التوزيع المنتظم ، سيكون هناك حوالي 2048 كتلة ، أو 256 غيغابايت لكل جهاز. على كل جهاز ، لدينا 8 نوى للمعالج و 64 جيجا بايت من ذاكرة الوصول العشوائي.
لمهمتنا ، لا يحتاج برنامج التشغيل إلى الكثير من الموارد ، لذلك سنخصص 1 نواة و 1 غيغابايت من الذاكرة لذلك. سوف نعطي فناني الأداء 2 نواة و 4 غيغابايت من الذاكرة. افترض أننا نريد زيادة استخدام موارد المجموعة. وبالتالي ، نحصل على 64 حاوية: واحدة للسائق ، و 63 للفنانين.
الشكل 4. العمليات التي تعمل على عقدة البيانات والموارد التي يستخدمونها.نظرًا لأننا في حالتنا نستخدم عمليات الخريطة فقط ، فإن DAG الخاصة بنا ستتكون من عملية واحدة. وتتكون من الإجراءات التالية:
- خذ كتلة واحدة من البيانات من القرص الصلب المحلي ،
- تحويل البيانات
- حفظ النتيجة في كتلة جديدة على القرص المحلي الخاص بك.
في المجموع ، نحتاج إلى معالجة 16384 كتلة ، لذلك يجب على كل منفذ تنفيذ 16384 / (63 منفذًا * 2 نواة) = 130 عملية. وبالتالي ، فإن دورة حياة المنفذ كعملية منفصلة (في حالة حدوث كل شيء دون السقوط) ستبدو على النحو التالي.
- إطلاق الحاوية.
- تلقي مهمة من السائق يكون فيها معرف كتلة والعملية اللازمة. نظرًا لأننا خصصنا مركزين للحاوية ، فإن المنفذ يتلقى مهمتين في وقت واحد.
- أداء مهمة وإرسال النتيجة للسائق.
- الحصول على المهمة التالية من برنامج التشغيل وتكرار الخطوتين 2 و 3 حتى تتم معالجة جميع الكتل لهذا الجهاز المحلي.
- توقف الحاوية
ملاحظة : يتم الحصول على DAGs أكثر تعقيدًا إذا كان من الضروري إعادة توزيع البيانات الوسيطة بين الأجهزة ، عادةً لعمليات التجميع (groupBy ، و replaceByKey ، وما إلى ذلك) والاتصالات (الانضمام) ، والتي يعتبر النظر فيها خارج نطاق هذه المقالة.
المشاكل الرئيسية للتفاعل بين أباتشي سبارك والخدمات الخارجية
إذا احتجنا ، في إطار عملية الخريطة ، إلى الوصول إلى بعض الخدمات الخارجية ، تصبح المهمة أقل صعوبة. افترض أن كائن من فئة ExternalServiceClient مسؤول عن التفاعل مع خدمة خارجية. بشكل عام ، قبل بدء العمل ، نحتاج إلى تهيئته ، ثم نسميها حسب الضرورة:
val client = ExternalServiceClient.create()
عادة ، تستغرق تهيئة العميل بعض الوقت ، وبالتالي ، كقاعدة عامة ، تتم تهيئته عند بدء تشغيل التطبيق ، ثم يتم استخدامه للحصول على مثيل عميل من بعض السياق العام أو التجمع العام. لذلك ، عندما تتلقى حاوية مع منفذ Spark مهمة تتطلب التفاعل مع خدمة خارجية ، سيكون من الجيد الحصول على عميل تمت تهيئته بالفعل قبل بدء العمل على صفيف البيانات ، ثم إعادة استخدامه لكل عنصر.
هناك طريقتان للقيام بذلك في Spark. أولاً ، إذا كان العميل قابلاً للتسلسل (يجب على العميل نفسه وجميع حقوله توسيع واجهة java.io.Serializable) ، فيمكن تهيئته على برنامج التشغيل ثم
تمريره إلى المنفذين عبر آلية متغير البث .
val client = ExternalServiceClient.create() val clientBroadcast = sparkContext.broadcast(client) ds.map { f: Features => val score = clientBroadcast.value.score(f.vector) Score(f.id, score) }
في حالة عدم إمكانية إجراء تسلسل للعميل ، أو أن تهيئة العميل هي عملية تعتمد على إعدادات الجهاز المعين الذي يعمل عليه (على سبيل المثال ، لتحقيق التوازن ، يجب أن تذهب الطلبات من جزء من الأجهزة إلى جهاز الخدمة الأول ، وللآخر إلى الثاني) ، ثم يمكن تهيئة العميل مباشرة على المنفذ.
لهذا ، فإن RDD (ومجموعة البيانات) لديها عملية mapPartitions ، وهي نسخة معممة لعملية الخريطة (إذا نظرت إلى شفرة المصدر لفئة RDD ، يتم تنفيذ عملية الخريطة من خلال mapPartitions). يتم تشغيل الوظيفة التي تم تمريرها إلى عملية mapPartitions مرة واحدة لكل كتلة.
يتم توفير مكرر لإدخال هذه الوظيفة للبيانات التي سنقرأها من الكتلة ، وعند الإخراج يجب أن يعيد مكررًا لبيانات الإخراج المقابلة لكتلة الإدخال: ds.mapPartitions {fi: Iterator[Features] => val client = ExternalServiceClient.create() fi.map { f: Features => val score = client.score(f.vector) Score(f.id, score) } }
في هذا الرمز ، يتم إنشاء عميل لخدمة خارجية لكل كتلة من بيانات المصدر. هذا ، بالطبع ، أفضل من إنشاء عميل في كل مرة لمعالجة كل عنصر ، وفي كثير من الحالات يكون هذا حلًا مقبولًا تمامًا. ومع ذلك ، بعد ذلك قليلاً ، سأوضح كيف يمكنك إنشاء كائن سيتم تهيئته مرة واحدة في بداية الحاوية ثم استخدامه لبدء جميع المهام التي تأتي في هذه الحاوية.تكون عملية معالجة المكرر الناتج مترابطة. دعني أذكرك بأن النمط الرئيسي للوصول إلى بنية نوع المكرر هو استدعاء متسلسل لـ hasNext والأساليب التالية: while (i.hasNext()) { val item = i.next() … }
إذا كان لدينا مركزان مخصصان للمنفذ ، فسيكون لهما سير عمل رئيسيان فقط يشاركان في معالجة البيانات. دعني أذكرك أنه إذا كان لدينا 8 نوى على الجهاز ، فلن يسمح YARN لها بتشغيل أكثر من 4 عمليات لمنفذين من نوى على التوالي ، فلن يكون لدينا سوى 8 سلاسل لكل جهاز. بالنسبة للحوسبة المحلية ، يعد هذا هو الخيار الأفضل ، حيث سيوفر الحمل الأقصى لقوة الحوسبة مع الحد الأدنى من النفقات للتحكم في التدفق. ومع ذلك ، في حالة التفاعل مع الخدمات الخارجية ، تتغير الصورة.عند استخدام الخدمات الخارجية ، يعد الأداء أحد أهم المشكلات. إن أبسط طريقة لتنفيذ ذلك هي استخدام عميل متزامن ، حيث نصل إلى الخدمة لكل عنصر ، وبعد تلقي استجابة منه ، نشكل القيمة الناتجة. ومع ذلك ، فإن هذا النهج له عيب كبير: أثناء التفاعل المتزامن ، يتم حظر سلسلة رسائل تستدعي خدمة خارجية بشكل متزامن طوال مدة التفاعل مع هذه الخدمة. والحقيقة هي أنه عندما يتم استدعاء طريقة hasNext ، نتوقع الحصول على إجابة لا لبس فيها على السؤال عما إذا كان هناك المزيد من العناصر لمعالجتها. في حالة عدم اليقين (على سبيل المثال ، عندما أرسلنا طلبًا لخدمة خارجية ولا نعرف ما إذا كانت ستعيد إجابة فارغة أو غير فارغة) ليس لدينا خيار سوى انتظار إجابة ، وبالتالي حظر سلسلة الرسائل التي تسمى هذه الطريقة. لذلكالمكرر هو بنية بيانات حجب .الشكل 5. معالجة البتات للمكرر الذي تم الحصول عليه نتيجة استدعاء الوظيفة التي تم تمريرها إلى mapPartitions تحدث في خيط واحد. ونتيجة لذلك ، نحصل على استخدام غير فعال للغاية للموارد.كما تتذكر ، قمنا بتحسين خدمة التصنيف الخاصة بنا بحيث تتيح لنا معالجة العديد من الطلبات في نفس الوقت. وفقًا لذلك ، نحتاج إلى جمع العدد اللازم من الطلبات من المكرر الأصلي ، وإرسالها إلى الخدمة ، والحصول على رد وإصدارها للمكرر الناتج.الشكل 6. التفاعل المتزامن عند إرسال طلب تصنيف لمجموعة من العناصر فيالواقع ، في هذه الحالة ، لن يكون الأداء أفضل بكثير ، لأنه ، أولاً ، نحن مضطرون للحفاظ على الخيط الرئيسي في حالة محظورة أثناء التفاعل مع خدمة خارجية ، و ، ثانيًا ، الخدمة الخارجية خاملة أثناء عملنا على النتيجة.البيان الختامي للمشكلة
وبالتالي ، عند استخدام خدمة خارجية ، يجب علينا حل مشكلة الوصول المتزامن. من الناحية المثالية ، سيكون من الملائم نقل التفاعل مع الخدمات الخارجية إلى تجمع مؤشرات ترابط منفصل. في هذه الحالة ، سيتم تنفيذ الطلبات إلى الخدمات الخارجية في وقت واحد مع معالجة نتائج الطلبات السابقة ، وبالتالي سيكون من الممكن استخدام موارد الجهاز بكفاءة أكبر. للتفاعل بين الخيوط ، يمكن استخدام قائمة انتظار الحظر ، والتي ستكون بمثابة مخزن مؤقت للاتصالات. ستؤدي التدفقات المسؤولة عن التفاعل مع الخدمات الخارجية إلى وضع البيانات في قائمة الانتظار ، وسيأخذها الدفق الذي يعالج المكرر الناتج من هناك.ومع ذلك ، مثل هذه المعالجة غير المتزامنة تقدم عددًا من المشكلات الإضافية.- , , , .
- , , . , . , .
- لكي تقوم طريقة hasNext بإرجاع خطأ في المكرر الناتج ، تحتاج إلى التأكد من الرد على جميع الطلبات والإشارة إلى أنه لن يكون هناك المزيد من البيانات في المخزن المؤقت. مع المعالجة المتزامنة ، هذا أمر بسيط للغاية: إذا ، بعد معالجة الاستجابة التالية ، عاد المكرر الأصلي hasNext = false ، وبالتالي ، لن يكون هناك المزيد من العناصر. في حالة المعالجة غير المتزامنة ، خاصة إذا أرسلنا عدة طلبات في نفس الوقت ، فأنت بحاجة إلى تنسيق استلام الردود ، وبعد إرسال الرد الأخير فقط أرسل إشارة لإكمال المعالجة.
حول كيف تمكنا من حل هذه المشاكل بفعالية ، سأخبرنا في الجزء التالي . ترقبوا!في هذه الأثناء ، انظر إلى الوظائف الشاغرة في شركتنا ، ربما نحن نبحث عنك؟