
في هذه المقالة ، سوف نلقي نظرة على كيف يمكن معالجة موضوع آخر في سلسلة رسائل واحدة ، والسبب في ذلك ، وكيف ستساعدنا عوامل تشغيل High Order Observables (المشار إليها فيما يلي باسم HOO) في هذا الأمر.
سلسلة من المقالات "أساسيات البرمجة التفاعلية باستخدام RxJS":
عند العمل مع سلاسل العمليات ، غالبًا ما ينشأ موقف عندما يكون من الضروري نقل نتائج شخص آخر إلى سلسلة رسائل كقيمة. على سبيل المثال ، نريد تنفيذ طلب ajax ومعالجة استجابته في سلسلة العمليات الحالية ، أو تشغيل العديد من الطلبات المتوازية ، وتنفيذ تجمع. أعتقد أن الكثير من الناس معتادون على حل هذه المشكلات باستخدام آلية مثل الوعد. ولكن هل من الممكن حلها باستخدام RxJS؟ بالطبع ، وكل شيء أسهل بكثير مما تعتقد!
ملاحظة : من أجل فهم الجزء النظري من المقال ، لا يلزمك قراءة المقالات السابقة ، بل تحتاج فقط إلى معرفة ماهية المشغلين والمشغلين والأنابيب. في الجزء العملي ، سنقوم بتحسين المثال من
المقال الثاني ، والذي يمكنك العثور عليه
هنا .
المشكلة
دعونا نتخيل المهمة التالية: نحتاج إلى معرفة كل ثانية ما إذا كان الخادم يمكن الوصول إليه. كيف يمكننا حلها؟
أولاً ، قم بإنشاء دفق باستخدام طريقة المؤقت:
timer(0, 1000).subscribe({ next: console.log });
طريقة
الموقت مشابهة جداً من حيث المبدأ إلى
الفاصل الزمني . ولكن على عكس ذلك ، فإنه يسمح لك بتعيين مهلة بدء سلسلة الرسائل ، والتي يتم إرسالها بواسطة المعلمة الأولى. تشير المعلمة الثانية إلى الفاصل الزمني الذي سيتم من خلاله إنشاء قيمة جديدة. إذا لم يتم تحديد المعلمة الثانية ، فإن الموقت سيولد قيمة واحدة فقط وينهي الدفق.
نظرًا لأنني وليس لدي خادم ، أقترح مجرد كتابة وظيفة تحاكي طلبًا على الخادم:
const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) }
ماذا تفعل هذه الطريقة؟ تقوم بإرجاع دفق تم إنشاؤه باستخدام طريقة المؤقت ، والتي تنبعث منها قيمة بعد مرور ثانية واحدة وتنتهي. نظرًا لأن أسلوب المؤقت يولد فقط رقمًا ، فإننا نستخدم مشغل mapTo لاستبداله بسلسلة "نجاح".
هذا ما يبدو عليه الدفق الذي أنشأته طريقة makeRequest:

الآن لدينا خيار: لاستدعاء الأسلوب makeRequest داخل الدفق أو إسناد هذه المسؤولية إلى المراقب؟
النهج الأول هو الأفضل ، لأننا في هذه الحالة سنكون قادرين على استخدام الإمكانات الكاملة لـ RxJS مع مشغليها وتخفيف مراقبنا من الواجبات غير الضرورية. نستخدم طريقة المؤقت لتنفيذ الطلبات عن طريق الفاصل الزمني:
timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log });
عندما نقوم بتشغيل هذه الشفرة ، سنرى أنه في console.log لن نحصل على رسالة تحتوي على "نجاح" النص ، ولكن كائن من نوع ملاحظ:

الجواب متوقع تمامًا ، لأننا في الخريطة نعيد الدفق. لكي يعمل تيار ، تحتاج إلى الاشتراك فيه. حسنًا ، دعنا نرى كيف
لا نفعل ذلك :
timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); });
المشكلة في المثال أعلاه هي أننا نحصل على اشتراك في اشتراك. لكن ماذا لو أردنا تقديم أكثر من طلب في سلسلة؟ أو ماذا لو كنا في مرحلة ما نحتاج إلى إلغاء الاشتراك من التدفق في الداخل؟ في هذه الحالة ، سوف يشبه رمزنا "الشعرية" أكثر فأكثر. لحل هذه المشكلة ، لدى RxJS عوامل تشغيل خاصة تسمى HOO.
HOO
HOO هو نوع خاص من العبارات التي تقبل التدفقات كقيم. أحد هذه العوامل هو أسلوب الدمج.
عندما يصل دفق في mergeAll ، فإنه يشترك فيه. يسمى الدفق الذي اشترك فيه المشغل داخليًا. يسمى التدفق الذي يستقبل المشغل منه التدفقات الأخرى كقيمة خارجية.
عندما ينشئ مؤشر ترابط داخلي قيمة ، يدفع mergeAll تلك القيمة في مؤشر الترابط الخارجي. وبالتالي ، نتخلص من الحاجة إلى الاشتراك يدويًا. إذا ألغينا اشتراكك في التدفق الخارجي ، فستقوم ميزة دمج الكل تلقائيًا بإلغاء الاشتراك من التدفق الداخلي.
دعونا نرى كيف يمكننا إعادة كتابة مثالنا مع mergeAll:
timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log });
في المثال أعلاه ، تم إنشاء الدفق الخارجي بواسطة بيان المؤقت. والتدفقات التي يتم إنشاؤها في مشغل الخريطة داخلية. يقع كل مؤشر ترابط تم إنشاؤه في عبارة دمج.

يتم استخدام الخريطة المدمجة + mergeAll في كثير من الأحيان ، لذلك في RxJS هناك طريقة دمج:
timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log });
عندما ينشئ مؤشر ترابط خارجي قيمة ، يستدعي مشغل mergeMap وظيفة رد الاتصال التي تم تمريرها إليها ، والتي تنشئ سلسلة رسائل جديدة. ثم mergeMap تشترك في الدفق الذي تم إنشاؤه.

تتمثل خصوصية مشغل mergeAll / mergeMap في أنه إذا حدث تدفق آخر له ، فإنه يشترك فيه أيضًا. وبالتالي ، في الدفق الخارجي ، يمكننا الحصول على قيم من عدة قيم داخلية في آن واحد. لنرى المثال التالي:
timer(0, 1000)
هذه هي الطريقة التي سيبدو بها الدفق الخارجي بدون مشغل mergeMap:

وهكذا مع mergeMap:
timer(0, 1000).pipe( mergeMap(() => interval(1000)) )

كل ثانية ، نقوم بإنشاء مؤشر ترابط داخلي جديد وتشترك فيه mergeMap. وبالتالي ، لدينا العديد من الخيوط الداخلية تعمل في وقت واحد ، والقيم التي تندرج في الخارج:

ملاحظة : كن حذرًا عند استخدام mergeMap ، سيعمل كل مؤشر ترابط داخلي جديد حتى تقوم بإلغاء الاشتراك من الخارج. في المثال أعلاه ، يتزايد عدد مؤشرات الترابط الداخلية كل ثانية ، في النهاية يمكن أن يكون هناك الكثير من مؤشرات الترابط التي لا يستطيع الكمبيوتر التعامل معها.
concatAll / concatMap
طريقة mergeMap رائعة عندما لا تهتم بترتيب تنفيذ مؤشرات الترابط الداخلية ، ولكن ماذا لو كنت في حاجة إليها؟ لنفترض أننا نريد تنفيذ طلب الخادم التالي فقط بعد تلقي رد من الطلب السابق؟
لهذه الأغراض ، فإن مشغل HOO concatAll / concatMap مناسب. ينتظر هذا المشغل ، بعد اشتراكه في الخيط الداخلي ، حتى ينتهي ، ثم يشترك في الخيط التالي.
إذا كان هناك مؤشر ترابط جديد أثناء تنفيذ مؤشر ترابط واحد ، فسيتم وضعه في قائمة الانتظار حتى يتم الانتهاء من سلسلة الرسائل السابقة.
في المثال أعلاه ، نقوم بإنشاء اثنين من المواضيع باستخدام طريقة الموقت. من أجل الوضوح ، استخدمت مشغل mapTo لعرض قيم مختلفة. سينشئ الخيط الأول 1 ، والثاني - 2. يتم إنشاء خيط خارجي باستخدام الطريقة ، والتي تأخذ اثنين من ما سبق ملاحظته كمدخلات.
تستقبل عبارة concatAll أولاً firstInnerObservable ، وتشترك فيها ، وتنتظر اكتمالها ، وفقط بعد إتمام الاشتراك الأول في secondInnerObservable. إليك ما سيبدو عليه الدفق الخارجي:

إذا استبدلنا موقع concatAll بـ mergeAll ، فسيظهر الملف كما يلي:
of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log });

switchAll / switchMap
يختلف هذا المشغل عن المشغلين السابقين في أنه عندما يستقبل دفقًا جديدًا ، فإنه يقوم على الفور بإلغاء الاشتراك من المشغل السابق والاشتراك في المشغل الجديد.
خذ المثال أعلاه واستبدل concatAll بـ switchAll ، وانظر كيف يتصرف التدفق الخارجي:
of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log });

حصلت فقط القيمة من الدفق الداخلي الثاني في الدفق الخارجي. ذلك لأن switchMap ألغيت اشتراكه من الأول عندما تلقى سلسلة الرسائل الثانية.
متى تكون هذه ضرورية؟ على سبيل المثال ، عند تنفيذ بحث البيانات. إذا لم تصل الاستجابة من الخادم بعد ، وقد أرسلنا بالفعل طلبًا جديدًا ، فلن يكون من المنطقي انتظار الطلب السابق.
العادم / العادمخريطة
العادم هو عكس بيان switchAll تمامًا ، ويشبه سلوكه concatAll. هذه الطريقة ، بالاشتراك في الدفق ، تنتظر إكمالها. إذا حدث دفق جديد إليه ، فسيتم تجاهله ببساطة.
of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log });

في المثال أعلاه ، لم نحصل على شذوذ ، لأنه في تلك اللحظة كان المشغل ينتظر الانتهاء من الخيط الأول ، وأسقط ببساطة الخيط الثاني.
أعتقد أن الكثيرين لديهم سؤال ، متى يمكن أن تكون هناك حاجة لهذا السلوك؟ مثال جيد هو نموذج تسجيل الدخول. لا معنى لإرسال عدة طلبات إلى الخادم حتى يتم استكمال الطلب الحالي.
نحن بصدد الانتهاء من التطبيق
نذكر
مثال من
المادة الثانية . في ذلك ، قمنا بتنفيذ عملية بحث على GitHub واستخدمنا مشغل mergeMap لإرسال الطلبات إلى الخادم. الآن نحن نعرف ميزات هذا المشغل ، هل هو مناسب حقًا في حالتنا؟
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
دعنا نفترض أن خادم GitHub سيكون مثقلًا جدًا ، ثم معالجة ردنا سيستغرق الكثير من الوقت. ما الذي يمكن أن يحدث خطأ في هذه الحالة؟
لنفترض أن المستخدم أدخل بعض البيانات ، ولم ينتظر إجابة ، وأدخل بيانات جديدة. في هذه الحالة ، سوف نرسل الطلب الثاني إلى الخادم. ومع ذلك ، لا أحد يضمن أن الإجابة على الطلب الأول سوف تأتي في وقت سابق.
نظرًا لأن مشغل mergeMap لا يهتم بترتيب معالجة مؤشرات الترابط الداخلية ، في حالة تنفيذ الطلب الأول في وقت لاحق عن الثاني ، سنقوم بمسح البيانات الفعلية. لذلك ، أقترح استبدال أسلوب mergeMap مع switchMap:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
الآن ، إذا قام المستخدم بإدخال بيانات جديدة ، فسوف تقوم switchMap بإلغاء الاشتراك من الدفق السابق والاشتراك في الجديد.
تجدر الإشارة إلى أن طلب http الخاص بنا سيستمر في تعليقه حتى يعطي الخادم إجابة عليه. ولكن نظرًا لأننا ألغينا اشتراكنا في الدفق الداخلي ، فلن تقع الإجابة في الدفق الخارجي.
ملاحظة : إذا كنت تعمل مع Angular وتستخدم HttpClient للعمل مع http ، فلا تقلق بشأن إلغاء الطلب نفسه. يمكن لـ HttpClient القيام بذلك نيابة عنك عند إلغاء الاشتراك.
إلغاء HTTP
يحتوي إحضار api على القدرة على إلغاء طلب http باستخدام
AbortController . عند دمجها مع مشغل switchMap ، ستوفر هذه الوظيفة حركة مرور المستخدم.
دعنا نعيد كتابة مثالنا قليلاً. وإنشاء طريقة من شأنها أن تلغي دعوة الجلب في الملاحظة:
const createCancellableRequest = (url) => {
قم أيضًا بتغيير طريقة getUsersRepsFromApi:
const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); }
الآن طريقة إرجاع ليس وعد ، ولكن يمكن ملاحظتها. لذلك ، نقوم بإزالة من المجمع في switchMap:
switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) )
ملاحظة : في الإصدار 6.5 من
RxJS ،
أضافوا عبارة fromFetch ، والتي تطلق على نفسها طريقة الإجهاض تحت الغطاء ، لذلك لم تعد بحاجة لكتابة "دراجتك".
هذا كل شئ! يمكن العثور على جميع نماذج التعليمات البرمجية
هنا .
استنتاج
اليوم نظرنا إلى ما هو HOO وبعض المشغلين مفيدة للغاية من هذه الفئة. بالطبع ، هذه كانت بعيدة كل البعد عنهم. لمزيد من المعلومات التفصيلية والتفصيلية ، أوصي بزيارة
وثائق RxJS.
في المقالة التالية أخطط للنظر في الفرق بين الملاحظات الساخنة والباردة.
أخيرًا: لا تستخدم الاشتراك في الاشتراك ، لأنه يوجد HOO!