الجزء 2: الحل
مرحبًا مرة أخرى! اليوم سأواصل قصتي حول كيفية تصنيف كميات كبيرة من البيانات على Apache Spark باستخدام نماذج التعلم الآلي التعسفي. في
الجزء الأول من المقالة ، قمنا بفحص بيان المشكلة نفسها ، بالإضافة إلى المشكلات الرئيسية التي تنشأ عند تنظيم التفاعل بين المجموعة التي يتم تخزين البيانات الأولية ومعالجتها ، وخدمة التصنيف الخارجية. في الجزء الثاني ، سننظر في أحد الخيارات لحل هذه المشكلة باستخدام نهج التدفقات التفاعلية وتنفيذها باستخدام مكتبة akka-streams.
مفهوم تيارات رد الفعل
لحل المشكلات الموضحة في الجزء الأول ، يمكنك استخدام النهج المسمى
تيارات تفاعلية . يسمح لك بالتحكم في عملية نقل دفق البيانات بين مراحل المعالجة ، والعمل بسرعات مختلفة وبشكل مستقل عن بعضها البعض دون الحاجة إلى التخزين المؤقت. إذا كانت إحدى مراحل المعالجة أبطأ من المرحلة السابقة ، فمن الضروري الإشارة إلى المرحلة الأسرع حول مقدار بيانات الإدخال التي تكون جاهزة للمعالجة في الوقت الحالي. هذا التفاعل يسمى الضغط الخلفي. وهو يتألف من حقيقة أن المراحل الأسرع تعالج بالضبط العديد من العناصر المطلوبة للمرحلة البطيئة ، وليس أكثر ، ثم تحرر موارد الحوسبة.
بشكل عام ، تعد Stream Reactive Streams مواصفة لتطبيق نموذج
الناشر-المشترك . تحدد هذه المواصفة مجموعة من أربع واجهات (الناشر ، المشترك ، المعالج ، والاشتراك) وعقد لأساليبها.
دعونا ننظر في هذه الواجهات بمزيد من التفصيل:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
هناك جانبان لنموذج المشترك-الناشر: الإرسال والاستلام. عند تنفيذ تيارات تفاعلية ، تكون الفئة التي تطبق واجهة الناشر مسؤولة عن نقل البيانات ، والمشترك مسؤول عن الاستلام. لإنشاء اتصال بينهما ، يجب تسجيل المشترك لدى الناشر عن طريق استدعاء طريقة الاشتراك الخاصة به. وفقًا للمواصفات ، بعد تسجيل المشترك ، يجب على الناشر استدعاء أساليبه بالترتيب التالي:
- onSubscribe. يتم استدعاء هذه الطريقة فور تسجيل المشترك لدى الناشر. كمعلمة ، يتم تمرير كائن اشتراك إليه يطلب من خلاله المشترك بيانات من الناشر. يجب تخزين هذا الكائن واستدعاؤه فقط في سياق هذا المشترك.
- بعد أن يطلب المشترك بيانات من Publisher عن طريق استدعاء أسلوب الطلب على كائن الاشتراك المقابل ، يمكن لـ Publisher استدعاء المشترك onNext ، بتمرير العنصر التالي.
- يمكن للمشترك بعد ذلك استدعاء طريقة الطلب بشكل دوري على الاشتراك ، ولكن لا يمكن لـ Publisher استدعاء طريقة onNext أكثر من الإجمالي المطلوب من خلال طريقة الطلب.
- إذا كان دفق البيانات محدودًا ، بعد تمرير جميع العناصر من خلال طريقة onNext ، يجب على الناشر استدعاء طريقة onComplete.
- إذا حدث خطأ في Publisher ولم يكن من الممكن معالجة العناصر بشكل إضافي ، فيجب استدعاء طريقة onError
- بعد استدعاء طرق onComplete أو onError ، يجب استبعاد تفاعل الناشر الإضافي مع المشترك.
يمكن اعتبار استدعاءات الطريقة بمثابة إرسال إشارات بين الناشر والمشترك. يشير المشترك إلى الناشر بعدد العناصر التي يكون جاهزًا للمعالجة ، ويشير الناشر بدوره إلى أنه يوجد إما العنصر التالي ، أو أنه لا يوجد المزيد من العناصر ، أو حدث خطأ ما.
لاستبعاد تأثير آخر للناشر والمشترك على بعضهما البعض ، يجب ألا تكون الاستدعاءات لجميع الطرق التي تطبق واجهات التدفقات التفاعلية غير قابلة للحظر. في هذه الحالة ، سيكون التفاعل بينهما غير متزامن تمامًا.
يمكن العثور على مزيد من التفاصيل حول مواصفات واجهات التدفقات التفاعلية
هنا .
وبالتالي ، من خلال ربط التكرارات الأصلية والنتائج الناتجة من خلال تحويلها إلى الناشر والمشترك ، على التوالي ، يمكننا حل المشكلات المحددة في الجزء السابق من المقالة. يتم حل مشكلة تجاوز سعة المخزن المؤقت بين المراحل عن طريق طلب عدد معين من العناصر من قبل المشترك. يتم حل مشكلة الإكمال الناجح أو غير الناجح عن طريق إرسال إشارات إلى المشترك من خلال طرق onComplete أو onError ، على التوالي. يصبح الناشر مسؤولاً عن إرسال هذه الإشارات ، والتي يجب في حالتنا أن تتحكم في عدد طلبات HTTP التي تم إرسالها وعددها الذي تلقى ردودًا. بعد تلقي الاستجابة الأخيرة ومعالجة جميع النتائج الواردة ، يجب أن ترسل إشارة onComplete. في حالة فشل أحد الطلبات ، يجب أن يرسل إشارة onError ، ويتوقف عن إرسال المزيد من العناصر إلى المشترك ، وكذلك طرح العناصر من المكرر الأصلي.
يجب تنفيذ المكرر الناتج كمشترك. في هذه الحالة ، لا يمكننا الاستغناء عن المخزن المؤقت الذي ستتم فيه كتابة العناصر عند استدعاء طريقة onNext من واجهة المشترك وطرحها باستخدام hasNext والطرق التالية من واجهة Iterator. كتطبيق مؤقت ، يمكنك استخدام قائمة انتظار الحظر ، على سبيل المثال ، LinkedBlockedQueue.
سوف يسأل القارئ اليقظ على الفور السؤال: لماذا قائمة الانتظار المحظورة ، لأنه وفقًا لمواصفات تيارات التفاعل ، يجب أن يكون تنفيذ جميع الطرق غير محجوب؟ ولكن هذا كل شيء هنا: نظرًا لأننا نطلب من Publisher عددًا محددًا بدقة من العناصر ، فلن يتم استدعاء أسلوب onNext أكثر من هذا العدد من المرات ، ويمكن لقائمة الانتظار دائمًا إضافة عنصر جديد دون حظر.
من ناحية أخرى ، قد يحدث الحظر عندما يتم استدعاء الأسلوب hasNext في حالة وجود قائمة انتظار فارغة. ومع ذلك ، هذا صحيح: طريقة hasNext ليست جزءًا من عقد واجهة المشترك ، فهي محددة في واجهة Iterator ، والتي ، كما أوضحنا سابقًا ، هي بنية بيانات مانعة. عند استدعاء الطريقة التالية ، نطرح العنصر التالي من قائمة الانتظار ، وعندما يصبح حجمه أقل من حد معين ، سنحتاج إلى طلب الجزء التالي من العناصر من خلال استدعاء لطريقة الطلب.
الشكل 7. التفاعل غير المتزامن مع خدمة خارجية باستخدام نهج التدفقات التفاعليةبالطبع ، في هذه الحالة لن نتخلص تمامًا من حظر المكالمات. يحدث هذا بسبب عدم تطابق النماذج بين التدفقات التفاعلية ، التي تفترض تفاعلًا غير متزامن تمامًا ، ومكرر ، والذي يجب أن يطلق عليه trueN أو خطأ عند استدعاء طريقة hasNext. ومع ذلك ، على عكس التفاعل المتزامن مع خدمة خارجية ، يمكن تقليل وقت التعطل بسبب الأقفال بشكل كبير عن طريق زيادة الحمل الكلي لنوى المعالج.
سيكون من الملائم إذا قام مطورو Apache Spark في الإصدارات المستقبلية بتطبيق طريقة تناظرية لطريقة mapPartitions ، والتي تعمل مع الناشر والمشترك. هذا سيسمح بتفاعل غير متزامن تمامًا ، وبالتالي القضاء على إمكانية حظر الخيوط.
Akka-streams و akka-http كتطبيق لمواصفات الجداول التفاعلية
حاليًا ، هناك بالفعل أكثر من اثنتي عشرة تطبيق لمواصفات التدفقات التفاعلية. أحد هذه التطبيقات هو وحدة akka-streams من مكتبة
akka . في عالم JVM عكا أثبتت نفسها كواحدة من أكثر الوسائل فعالية لكتابة أنظمة متوازية وموزعة. يتم تحقيق ذلك بسبب حقيقة أن المبدأ الأساسي المنصوص عليه في أساسها هو
نموذج الممثل ، والذي يسمح لك بكتابة تطبيقات تنافسية للغاية دون التحكم المباشر في الخيوط ومجمعاتها.
تمت كتابة الكثير من المؤلفات حول تطبيق مفهوم الممثلين في عكا ، لذلك لن نتوقف هنا (
الموقع الرسمي عكا هو مصدر جيد جدًا للمعلومات ، أوصي أيضًا بـ
عكا في كتاب
العمل ). سنلقي هنا نظرة فاحصة على الجانب التكنولوجي للتنفيذ في إطار آلية الرصد المشتركة.
بشكل عام ، لا يوجد الممثلون بمفردهم ، ولكنهم يشكلون نظامًا هرميًا. لإنشاء نظام ممثل ، تحتاج إلى تخصيص موارد له ، لذا فإن الخطوة الأولى عند العمل مع akka هي إنشاء مثيل لكائن ActorSystem. عندما يبدأ ActorSystem ، يتم إنشاء مجموعة منفصلة من سلاسل العمليات ، تسمى المرسل ، حيث يتم تنفيذ جميع التعليمات البرمجية المحددة في الممثلين. عادة ، ينفذ مؤشر ترابط واحد رمز العديد من الممثلين ، ومع ذلك ، إذا لزم الأمر ، يمكنك تكوين مرسل منفصل لمجموعة معينة من الممثلين (على سبيل المثال ، للممثلين الذين يتفاعلون مباشرة مع واجهة برمجة تطبيقات للحظر).
واحدة من أكثر المهام الشائعة التي يتم حلها باستخدام الممثلين هي المعالجة المتتابعة لتدفقات البيانات. سابقًا ، لهذا ، كان من الضروري إنشاء سلاسل من الممثلين يدويًا والتأكد من عدم وجود اختناقات بينهم (على سبيل المثال ، إذا قام أحد الفاعلين بمعالجة الرسائل بشكل أسرع من التالي ، فقد يكون لديه تجاوز في قائمة انتظار الرسائل الواردة ، مما يؤدي إلى خطأ OutOfMemoryError).
بدءًا من الإصدار 2.4 ، تمت إضافة وحدة akka-streams النمطية إلى akka ، مما يسمح لك بتعريف عملية معالجة البيانات بشكل تعريفي ، ثم إنشاء الممثلين الضروريين لتنفيذها. ينفذ Akka-streams أيضًا مبدأ الضغط الخلفي ، الذي يلغي إمكانية تجاوز في قائمة انتظار الرسائل الواردة لجميع الجهات الفاعلة المشاركة في المعالجة.
العناصر الرئيسية لتحديد مخطط معالجة تدفق البيانات في تدفقات akka هي المصدر والتدفق والحوض. من خلال دمجها مع بعضها البعض ، نحصل على رسم بياني قابل للتشغيل. لبدء عملية المعالجة ، يتم استخدام مادة مادية ، والتي تعمل على إنشاء جهات فاعلة تعمل وفقًا للرسم البياني المحدد من قبلنا (واجهة Materializer وتنفيذها ActorMaterializer).
دعونا ننظر في مراحل المصدر والتدفق والحوض بمزيد من التفصيل. يحدد المصدر مصدر البيانات. يدعم Akka-streams أكثر من اثني عشر طريقة مختلفة لإنشاء المصادر ، بما في ذلك من المكرر:
val featuresSource: Source[Array[Float], NotUsed] = Source.fromIterator { () => featuresIterator }
يمكن أيضًا الحصول على المصدر عن طريق تحويل مصدر موجود:
val newSource: Source[String, NotUsed] = source.map(item => transform(item))
إذا كان التحويل عملية غير بسيطة ، فيمكن تمثيله ككيان تدفق. يدعم تيارات Akka العديد من الطرق المختلفة لإنشاء التدفق. أسهل طريقة هي الإنشاء من دالة:
val someFlow: Flow[String, Int, NotUsed] = Flow.fromFunction((x: String) => x.length)
من خلال الجمع بين المصدر والتدفق ، نحصل على مصدر جديد.
val newSource: Source[Int, NotUsed] = oldSource.via(someFlow)
يتم استخدام الحوض كمرحلة أخيرة من معالجة البيانات. كما هو الحال في Source ، يوفر akka-streams أكثر من اثني عشر خيارًا مختلفًا من Sink ، على سبيل المثال ، يقوم Sink.foreach بإجراء عملية محددة لكل عنصر ، يجمع Sink.seq جميع العناصر في مجموعة ، إلخ.
val printSink: Sink[Any, Future[Done]] = Sink.foreach(println)
يتم تحديد معلمات المصدر والتدفق والحوض من خلال أنواع عناصر الإدخال و / أو الإخراج ، على التوالي. بالإضافة إلى ذلك ، قد يكون لكل مرحلة معالجة بعض نتائج عملها. لهذا ، يتم أيضًا تعيين المعلمات Source و Flow و Sink بواسطة نوع إضافي يحدد نتيجة العملية. هذا النوع يسمى نوع القيمة المادية. إذا كانت العملية لا تعني وجود نتيجة إضافية لعملها ، على سبيل المثال ، عندما نحدد التدفق من خلال وظيفة ، فسيتم استخدام نوع NotUsed كقيمة مادية.
من خلال الجمع بين المصدر والتدفق والحوض الضروري ، نحصل على RunnableGraph. يتم تحديده بواسطة نوع واحد ، والذي يحدد نوع القيمة التي تم الحصول عليها نتيجة لتنفيذ هذا الرسم البياني. إذا لزم الأمر ، عند دمج المراحل ، يمكنك تحديد نتيجة أي المراحل ستكون نتيجة الرسم البياني للعمليات بالكامل. بشكل افتراضي ، يتم أخذ نتيجة المرحلة المصدر:
val graph: RunnableGraph[NotUsed] = someSource.to(Sink.foreach(println))
ومع ذلك ، إذا كانت نتيجة مرحلة المغسلة أكثر أهمية بالنسبة لنا ، فيجب علينا الإشارة صراحةً إلى ما يلي:
val graph: RunnableGraph[Future[Done]] = someSource.toMat(Sink.foreach(println))(Keep.right)
بعد تحديد الرسم البياني للعمليات ، يجب علينا تشغيله. للقيام بذلك ، يحتاج runnableGraph إلى استدعاء أسلوب التشغيل. كمعلمة ، تأخذ هذه الطريقة كائن ActorMaterializer (والذي يمكن أن يكون أيضًا في نطاق ضمني) ، وهو المسؤول عن إنشاء الجهات الفاعلة التي ستؤدي العمليات. عادة ، يتم إنشاء ActorMaterializer فورًا بعد إنشاء نظام ActorSystem ، مرتبطًا بدورة حياته ، ويستخدمه لإنشاء ممثلين. فكر في مثال:
في حالة التركيبات البسيطة ، يمكنك الاستغناء عن إنشاء RunnableGraph منفصل ، ولكن ببساطة قم بتوصيل Source بـ Sink وبدء تشغيلها عن طريق استدعاء الأسلوب runWith على Source. تفترض هذه الطريقة أيضًا وجود كائن ActorMaterializer في النطاق الضمني. بالإضافة إلى ذلك ، في هذه الحالة ، سيتم استخدام القيمة المادية المحددة في الحوض. على سبيل المثال ، باستخدام الكود التالي ، يمكننا تحويل Source إلى Publisher من مواصفات Stream Reactive Streams:
val source: Source[Score, NotUsed] = Source.fromIterator(() => sourceIterator).map(item => transform(item)) val publisher: Publisher[Score] = source.runWith(Sink.asPublisher(false))
لذلك ، لقد أوضحنا الآن كيف يمكنك الحصول على ناشر التدفقات التفاعلية من خلال إنشاء مصدر من مكرر المصدر وإجراء بعض التحويلات على عناصره. الآن يمكننا ربطه بمشترك يقوم بتزويد البيانات للمكرر الناتج. يبقى النظر في السؤال الأخير: كيفية تنظيم تفاعل HTTP مع خدمة خارجية.
يتضمن هيكل akka وحدة
akka-http ، التي تسمح لك بتنظيم الاتصال غير المتزامن غير المحظور عبر HTTP. بالإضافة إلى ذلك ، تم بناء هذه الوحدة على أساس تدفقات akka ، مما يسمح لك بإضافة تفاعل HTTP كخطوة إضافية في الرسم البياني لعمليات معالجة تدفق البيانات.
للاتصال بالخدمات الخارجية ، يوفر akka-http ثلاث واجهات مختلفة.
- API على مستوى الطلب - هو أبسط خيار في حالة الطلبات الفردية إلى جهاز تعسفي. في هذا المستوى ، تتم إدارة اتصالات HTTP تلقائيًا تمامًا ، وفي كل طلب ، من الضروري نقل العنوان الكامل للجهاز الذي يتم إرسال الطلب إليه.
- واجهة برمجة التطبيقات على مستوى المضيف - مناسبة عندما نعرف أي منفذ سنصل إليه على الجهاز. في هذه الحالة ، يتحكم akka-http في مجموعة اتصالات HTTP ، وفي الطلبات يكفي تحديد المسار النسبي للمورد المطلوب.
- واجهة برمجة تطبيقات Connection-Level - تسمح لك بالتحكم الكامل في إدارة اتصالات HTTP ، أي فتح الطلبات وإغلاقها وتوزيعها عبر الاتصالات.
في حالتنا ، فإن عنوان خدمة التصنيف معروف لنا مسبقًا ، لذلك ، من الضروري تنظيم تفاعل HTTP فقط مع هذا الجهاز المعين. لذلك ، فإن واجهة برمجة تطبيقات Host-Level هي الأفضل بالنسبة لنا. الآن ، دعنا نرى كيف يتم إنشاء تجمع اتصالات HTTP عند استخدامه:
val httpFlow: Flow[(HttpRequest,Id), (Try[HttpResponse],Id), Http.HostConnectionPool] = Http().cachedHostConnectionPool[Id](hostAddress, portNumber)
عند استدعاء Http (). CachedHostConnectionPool [T] (hostAddress، portNumber) في ActorSystem ، ضمن نطاق ضمني ، يتم تخصيص الموارد لإنشاء تجمع اتصال ، ولكن لم يتم تأسيس الاتصالات نفسها. نتيجة لهذه المكالمة ، يتم إرجاع Flow ، الذي يتلقى زوجًا من طلب HTTP وبعض كائن تعريف المعرف كمدخل. هناك حاجة إلى كائن التعريف لمطابقة الطلب مع الاستجابة المقابلة نظرًا لأن مكالمة HTTP في akka-http هي عملية غير متزامنة ، ولا يتوافق الترتيب الذي يتم فيه تلقي الردود بالضرورة مع الترتيب الذي تم إرسال الطلبات به. لذلك ، عند التدفق ، يعطي التدفق بضع نتائج الاستعلام وكائن التعريف المقابل.
مباشرة ، يتم إنشاء اتصالات HTTP عند إطلاق رسم بياني (بما في ذلك هذا التدفق) (يتحقق). يتم تنفيذ Akka-http بطريقة تجعل بغض النظر عن عدد المرات التي تم فيها تجسيد الرسوم البيانية التي تحتوي على httpFlow ، في نفس نظام ActorSystem سيكون هناك دائمًا مجموعة مشتركة واحدة من اتصالات HTTP التي سيتم استخدامها من قبل جميع عمليات التجسيد. يتيح لك ذلك التحكم بشكل أفضل في استخدام موارد الشبكة وتجنب التحميل الزائد عليها.
وبالتالي ، ترتبط دورة حياة تجمع اتصال HTTP بنظام ActorSystem. كما ذكرنا من قبل ، يتم أيضًا إرفاق دورة حياة تجمع مؤشر الترابط بها ، حيث يتم تنفيذ العمليات المحددة في الممثلين (أو في حالتنا ، التي يتم تعريفها على أنها مراحل akka-streams و akka-http). لذلك ، لتحقيق أقصى قدر من الكفاءة ، يجب علينا إعادة استخدام مثيل واحد من ActorSystem في نفس عملية JVM.
تجميع كل هذا: مثال على تنفيذ التفاعل مع خدمة التصنيف
لذا ، يمكننا الآن الانتقال إلى عملية تصنيف كميات كبيرة من البيانات الموزعة على Apache Spark باستخدام التفاعل غير المتزامن مع الخدمات الخارجية. تم عرض المخطط العام لهذا التفاعل بالفعل في الشكل 7.
لنفترض أن لدينا بعض مجموعة البيانات الأولية [الميزات] المحددة. بتطبيق عملية mapPartitions عليها ، يجب أن نحصل على مجموعة بيانات ، حيث يتم ختم كل معرف من مجموعة المصدر بقيمة معينة تم الحصول عليها نتيجة التصنيف (مجموعة البيانات [النتيجة]). لتنظيم المعالجة غير المتزامنة على المنفذين ، يجب أن نلف المصدر والمكرر الناتج في الناشر والمشترك ، على التوالي ، من مواصفات التدفقات التفاعلية وربطها معًا.
case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) val batchesRequestCount = config.getInt(“scoreService. batchesRequestCount”)
في هذا التطبيق ، يؤخذ في الاعتبار أن خدمة التصنيف لمكالمة واحدة يمكن أن تعالج مجموعة من ناقلات الميزات في وقت واحد ، وبالتالي ، فإن نتيجة التصنيف بعد المكالمة إليها ستكون متاحة على الفور للمجموعة بأكملها. لذلك ، كنوع من المعلمات للناشر ، لم نحصل فقط على النتيجة ، كما قد تتوقع ، ولكن على [نقاط] متكررة. وبالتالي ، نرسل نتائج التصنيف لهذه المجموعة إلى المكرر الناتج (وهو أيضًا مشترك) عن طريق مكالمة واحدة إلى طريقة onNext. هذا أكثر كفاءة من استدعاء onNext لكل عنصر. الآن سنقوم بتحليل هذا الرمز بمزيد من التفاصيل.
- نحدد هيكل بيانات المدخلات والمخرجات. كمدخلات ، سيكون لدينا مجموعة من بعض معرفات الهوية مع متجه الميزة ، وكمخرج ، سيكون لدينا مجموعة من المعرّفات بقيمة رقمية يتم الحصول عليها نتيجة التصنيف.
- نحدد عدد المجموعات التي سيطلبها المشترك من الناشر في كل مرة. نظرًا لافتراض أن هذه القيم ستكمن في المخزن المؤقت وتنتظر حتى تتم قراءتها من المكرر الناتج ، فإن هذه القيمة تعتمد على مقدار الذاكرة المخصصة للمنفذ.
- إنشاء الناشر من مكرر المصدر. سيكون مسؤولاً عن التفاعل مع خدمة التصنيف. تتم مناقشة وظيفة createPublisher أدناه.
- قم بإنشاء مشترك ، والذي سيكون المكرر الناتج. ويرد أيضًا رمز فئة IteratorSubscriber أدناه.
- تسجيل مشترك مع الناشر.
- إرجاع IteratorSubscriber كنتيجة لعملية mapPartitions.
الآن فكر في تنفيذ وظيفة createPublisher.
type Ids = Seq[String]
- - , . httpFlow, .
- : , (batchSize) (parallelismLevel).
- implicit scope ActorSystem, ActorMaterializer httpFlow. Spark-. ActorSystemHolder .
- akka-streams . Source[Features] .
- batchSize .
- HttpRequest . HttpRequest createHttpRequest. createPublisher. feature-, , ( predict). , HTTP-. , HTTP-, HTTP-, URI .
- httpFlow.
- , . flatMapMerge, akka-http Source[ByteString], , . . parallelismLevel , ( ). HTTP-: , , , .
- : . akka ByteString. , ByteString O(1), ByteString . , , . , .
- HTTP- , Stream . , discardEntityBytes , , .
- . akka-http , .
- , Publisher, . , . false Sink.asPublisher , Publisher Subscriber-.
كما ذكرنا في القسم السابق ، للعمل مع akka ، تحتاج إلى نظام ActorSystem ، والذي يجب إنشاؤه مرة واحدة ثم إعادة استخدامه. لسوء الحظ ، ليس لدينا طريقة لاستدعاء البيئة العالمية لمنفذ Spark ، ولكن يمكننا اللجوء إلى الأساليب القياسية لإنشاء كائنات عالمية. نظرًا لأن منفذ Spark هو عملية JVM منفصلة ، وبالتالي ، في داخله ، يمكننا إنشاء كائن عام نقوم فيه بتخزين ActorSystem و ActorMatrializer و httpFlow باستخدامه. object ActorSystemHolder { implicit lazy val actorSystem: ActorSystem = {
- نقوم بإنشاء جميع المتغيرات العالمية باستخدام التهيئة البطيئة ، أي في الواقع أنها سيتم إنشاؤها عندما تكون في الطلب الأول.
- يؤدي هذا إلى إنشاء ActorSystem جديد باسم محدد.
- من أجل إنهاء جميع العمليات المنفذة في إطار نظام ActorSystem بشكل صحيح ، يجب أن نطلق على طريقة الإنهاء عليه ، والتي بدورها ستوقف جميع الجهات الفاعلة باستخدام آلية التوقف القياسية الخاصة بهم. للقيام بذلك ، يجب تسجيل الخطاف الذي يتم استدعاؤه عند انتهاء عملية JVM.
- بعد ذلك ، نقوم بإنشاء ActorMaterializer الذي سيبدأ تنفيذ عمليات تيارات akka باستخدام نظام ActorSystem الخاص بنا.
- أخيرًا ، نقوم بإنشاء httpFlow للتفاعل مع خدمة خارجية. كما ذكرنا في القسم السابق ، نخصص هنا موارد لمجموعة اتصالات HTTP في إطار ActorSystem.
ضع في اعتبارك الآن تنفيذ المكرر الناتج كمشترك في عملية تفاعل HTTP الخاصة بنا. sealed trait QueueItem[+T] case class Item[+T](item: T) extends QueueItem[T] case object Done extends QueueItem[Nothing] case class Failure(cause: Throwable) extends QueueItem[Nothing] //(1) class StreamErrorCompletionException(cause: Throwable) extends Exception(cause) //(2) class IteratorSubscriber[T](requestSize: Int) extends Subscriber[Iterable[T]] with Iterator[T] {
إن فئة IteratorSubscriber هي تنفيذ لنموذج Producer-Consumer. الجزء الذي ينفذ واجهة المشترك هو Producer ، والجزء الذي ينفذ Iterator هو المستهلك. يتم استخدام المخزن المؤقت الذي يتم تنفيذه كقائمة حظر كوسيلة للاتصال. يتم استدعاء الأساليب من واجهة Iterator في دفق من تجمع منفذي Apache Spark ، ويتم استدعاء طرق واجهة المشترك من التجمع المملوك من قبل ActorSystem.الآن ، دعونا نلقي نظرة على رمز تنفيذ IteratorSubscriber المحدد بمزيد من التفاصيل.- أولاً ، نحدد نوع البيانات الجبرية لعناصر المخزن المؤقت. في المخزن المؤقت ، يمكن أن يكون لدينا مجموعة العناصر التالية ، أو علامة على الانتهاء بنجاح من Done ، أو علامة على الفشل ، تحتوي على Throwable ، والتي تسببت في الخطأ.
- , hasNext .
- , , Publisher-.
- , . LinkedBlockingQueue, . , .
- , . , , Publisher-. , , Publisher- . hasNext next ( requestNextBatches hasNext), , .
- subscriptionPromise subscription Subscription, Publisher onSubscribe. , Reactive Streams Subscriber- Publisher- , , hasNext , onSubscribe. , subscription, Publisher-. lazy subscription, Promise.
- . hasNext next, , .
- , , hasNext false . hasNext, .
- onSubscribe Publisher- Subscription Promise, subscription.
- onNext Publisher-, . .
- Publisher onComplete, Done.
- Publisher onError. .
- hasNext , . , true, . , .
- , false.
- , , requestSize, Publisher. , , , Publisher- , HTTP- .
- . , , , . , , ( , , subscription), , , , .
- , currentIterator. , . , hasNext , ( , ), .
- , false hasNext. , isDone, , . - , hasNext , false. , hasNext , false , . , .
- , , , .
- تُرجع الطريقة التالية العنصر التالي من المكرر الحالي. وفقًا لدلالات المكالمة ، قبل ذلك ، يجب على المتصل استدعاء طريقة hasNext ، لذلك عند الاتصال التالي ، يجب أن يكون العنصر التالي دائمًا العنصر التالي.
- نرسل هنا إشارة إلى Publisher بأننا مستعدون لمعالجة المجموعة التالية من النتائج باستخدام كائن الاشتراك الذي تلقيناه عند التسجيل في Publisher. يتم تحديد عدد المجموعات حسب قيمة requestSize. كما نزيد عدد العناصر المتوقعة بهذه القيمة.
وبالتالي ، يبدو المخطط العام للمعالجة الكاملة لكتلة البيانات ، التي تم إطلاقها على المنفذ ، كما يلي:الشكل 8. تفاعل الجهات الفاعلة مع المصدر والمكرر الناتج.في الختام: مزايا وعيوب هذا الحل
تتمثل الميزة الرئيسية لهذا المخطط في أنه يتيح لك العمل مع نماذج التعلم الآلي المنفذة باستخدام أي وسيلة متاحة. يتم تحقيق ذلك بسبب حقيقة أن بروتوكول HTTP يستخدم للوصول إلى النموذج ، وهو وسيلة قياسية للاتصال بين التطبيقات. ونتيجة لذلك ، لا يرتبط تنفيذ النموذج بواجهته.ميزة أخرى - يتيح لك هذا المخطط تنفيذ التحجيم الأفقي لجميع عناصره. اعتمادًا على الجزء الأكثر تحميلًا ، يمكننا إضافة آلات إما إلى مجموعة Hadoop ، أو لتشغيل مثيلات إضافية للنموذج. نتيجة لذلك ، هذا النظام يتحمل الأخطاء ، لأنه في حالة وجود مشاكل مع أي من الآلات ، يمكننا استبداله بسهولة. يتم تحقيق ذلك بسبب حقيقة أن البيانات المخزنة على hdfs يتم نسخها ، ولا تعتمد خدمة التصنيف على بعض الحالات العامة القابلة للتغيير ، وبالتالي ، يمكن نشرها في عدة حالات.بالإضافة إلى ذلك ، يوفر هذا المخطط فرصًا كبيرة للضبط والتحسين. على سبيل المثال ، نظرًا لأن akka-http يستخدم مجموعة واحدة من الاتصالات في عملية واحدة طوال دورة حياتها ، فمن السهل التحكم في عدد الاتصالات بخدمة خارجية. أو إذا تم توزيع الكتلة عبر عدة مراكز بيانات ، فيمكننا رفع مثيل نموذجي في كل مركز من مراكز البيانات وتكوين عمليات Apache Spark بحيث تشير فقط إلى مثيلها ، وبالتالي القضاء على المكالمات بين مراكز البيانات.أخيرًا ، باستخدام الإعدادات ، يسمح لك هذا المخطط بتجنب التوقف عن العمل تمامًا تقريبًا. من خلال تحديد حجم المجموعة المرسلة للتصنيف ، وعدد مثيلات خدمة التصنيف وحجم تجمع اتصال http ، من الممكن تحقيق أقصى حمل لقوة الحوسبة سواء من جانب الخدمة أو من جانب المجموعة.أحد العيوب الرئيسية لهذا المخطط هو تعقيده النسبي الناجم عن فصل المكونات والحاجة إلى تنظيم التفاعل بينها. بالإضافة إلى ذلك ، سيتم استخدام جزء من الطاقة الحاسوبية لضمان الاتصال ، مما يقلل بشكل كبير من الكفاءة. قد تحدث أخطاء اتصال إضافية أيضًا. ونتيجة لذلك ، هناك حاجة لإعدادات إضافية لزيادة فعالية التفاعل.لاستبعاد تفاعلات الشبكة ، يمكن للمرء التفكير في نشر مثيلات الخدمة على نفس الأجهزة مثل البيانات. ولكن ، كقاعدة عامة ، يكون عدد الأجهزة في مجموعة Hadoop كبيرًا جدًا ، لذلك سيكون من غير المربح نشر مثيل نموذجي على كل منها ، خاصة في حالة النماذج الكبيرة.بالإضافة إلى ذلك ، كقاعدة عامة ، على مجموعة Hadoop ، يتم تنفيذ العديد من المهام في وقت واحد تتنافس على مواردها ، وبالتالي فإن إطلاق خدمة إضافية سيقلل من الأداء العام للكتلة.باختصار ، أود أن أشير إلى أن هذا الحل تم تنفيذه بنجاح في شركتنا CleverDATA . بفضله ، يمكن لفرق محللي البيانات ومطوري التطبيقات استخدام أي وسيلة مناسبة لعملهم دون تقييد اختيار بعضهم البعض. في الواقع ، المكان الوحيد الذي يتطلب مناقشة وتنسيقًا مشتركًا هو الواجهة التي يوفرها نموذج التعلم الآلي ، ويمكن حل جميع المشكلات الأخرى داخل أحد الفرق. وهكذا ، تمكنت الفرق من العمل بشكل مستقل عن بعضها البعض في حل المشاكل المشتركة.