RxDart: التحولات السحرية للتدفقات

مرحبًا - هذا هو الجزء الثالث من سلسلة مقالات Flutter Architecture الخاصة بي.



هذه المرة سنجعل الغوص صغيرًا في عالم التمديدات التفاعلية (Rx). سوف أركز على أكثر وظائف Rx استخدامًا وشرح تطبيقها. إذا لم تكن قد قرأت المنشور السابق ، فقد حان الوقت لهذا قبل الانتقال.


RxDart هو تطبيق لمفهوم Rx للغة Dart ، وذلك بفضل فرانك Pepermans و Brian Egan على ذلك . إذا سبق لك استخدام Rx بلغات أخرى ، فمن المحتمل أن تلاحظ اختلافًا في تسمية عدد من الوظائف ، لكن هذا لن يسبب لك أي صعوبات.


رمز الاختبار هنا .


حتى الآن ، استخدمنا التدفقات كوسيلة لنقل البيانات من مكان إلى آخر في تطبيقنا ، لكن يمكنهم القيام بالكثير. دعنا نلقي نظرة على بعض الميزات التي يضيفها Rx إلى Streams.


خلق ملاحظات


كما ذكرنا سابقًا ، تمثل Observables إصدارات Rx من التدفقات ذات الميزات الرائعة. هناك العديد من الطرق المثيرة للاهتمام لإنشائها:


خارج الدفق


يمكن تحويل أي تيار إلى ملاحظ عن طريق تمريره إلى المنشئ:


var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print); 

الأحداث المتكررة


 var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print); 

وبهذه الطريقة ، سيتم إنشاء الملاحظة التي تعرض القيم مع فترة محددة. حتى تتمكن من استبدال الموقت.


من قيمة واحدة


في بعض الأحيان تتوقع واجهة برمجة التطبيقات (API) تدفق / يمكن ملاحظته حيث يكون لديك قيمة فقط. لمثل هذه الحالات ، فإن الملاحظة يمكن أن يكون لها مصنع.


 var justObservable = Observable<int>.just(42); justObservable.listen(print); //   : 42 

من المستقبل


  Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print); 

سينتظر إنشاء " Observable من المستقبل" حتى يكتمل " المستقبل" Observable قيمة Observable أو null إذا لم يتم إرجاع القيمة. هناك طريقة أخرى لإنشاء دفق من Future وهي استدعاء toStream() لأي مستقبل.


قد تتساءل عن الهدف من تحويل المستقبل إلى ملاحظة / دفق بدلاً من مجرد الانتظار. كن مطمئنًا ، سيصبح ذلك واضحًا عندما ندرس الوظائف المتاحة لمعالجة البيانات أثناء وجودها في البث المباشر.


الموضوعات


Subjects هي بديل عن StreamController في RxDart ، وهذا هو كيف يتم تنفيذها في مكان ما في أحشاء المكتبة.


لكن سلوكهم يختلف قليلاً عن StreamControllers الأساسية:


  • يمكنك تطبيق listen() مباشرة على الموضوع ، دون الوصول إلى خاصية Stream
  • يتوفر أي عدد من الاشتراكات ، ويتلقى جميع المستمعين نفس البيانات في نفس الوقت
  • هناك ثلاثة أنواع من الموضوعات ، موضحة أدناه مع أمثلة:

PublishSubjects


تتصرف PublishSubjects مثل StreamControllers ، باستثناء احتمال وجود العديد من المستمعين:


 var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); //    subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); //        await Future.delayed(Duration(seconds: 5)); //    subject.close; 

قم بتشغيل هذا الرمز وستحصل على:


 Item1 ITEM2 Item2 ITEM3 Item3 

من الواضح أن المستمع الثاني الذي تأخر عن الحفلة (سوف نسميها المشتركين المتأخرين) أخطأ النقطة الأولى. لتجنب هذا ، يمكنك استخدام BehaviourSubject


BehaviourSubject


مع BehaviourSubject كل مشترك جديد أولاً على آخر قيمة مقبولة:


 var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3"); 

عند الخروج


 Item1 ITEM2 ITEM3 Item2 Item3 

يمكنك أن ترى أن Item1 فقد للمشترك الثاني ، لكنه يتلقى Item2 . قد تفاجأ أن المشترك الثاني يتلقى Item3 قبل أن يحصل المشترك الأول على Item2 . وذلك لأن تسلسل المشتركين في الخدمة غير مضمون ، على الرغم من أن جميع المشتركين يتلقون البيانات بالترتيب الصحيح. BehaviourSubject فقط بتخزين آخر عنصر تم استلامه للمشتركين المتأخرين. إذا كنت بحاجة إلى تخزين المزيد من العناصر ، يمكنك استخدام ReplaySubject . في معظم الحالات ، هذا ليس ضروريًا.


معالجة البيانات على الطاير



تكمن القوة الحقيقية لـ Rx في حقيقة أنها تسمح لك بمعالجة البيانات أثناء النقل عبر الدفق. تقوم كل طريقة من طرق Rx بإرجاع دفق جديد مع البيانات الناتجة (كما في الرسم التوضيحي) ، مما يعني أنه يمكنك ربطها معًا في خط أنابيب معالجة واحد ، وهذا يجعل Rx أداة قوية للغاية.


خريطة


إذا كان هناك أي عملية تشغيل لا أريد تفويتها ، فهذه هي map() . ما تقوم به map() هو أنها تحتاج إلى نقل كل عنصر من عناصر البيانات وتطبيق وظيفة معينة عليه ، وبعد ذلك تضع النتيجة في الدفق الناتج. مثال بسيط:



 var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3"); 

النتيجة:


 ITEM1 ITEM2 ITEM3 

لكن map مطلوبة لإرجاع نفس نوع البيانات الذي تتلقاه كمدخلات. سيأخذ المثال التالي أعدادًا صحيحة بدلاً من السلاسل. بالإضافة إلى ذلك ، سوف نربط بين تحويلين:


 var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3); 

أو شيء من هذا القبيل:



 class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a)); 

أحد أكثر الاستخدامات المفيدة لـ .map هو عندما تحصل على بيانات بتنسيق من بعض واجهة برمجة تطبيقات REST أو من قاعدة بيانات وتريد تحويلها إلى كائنات خاصة بك:


 class User { final String name; final String adress; final String phoneNumber; final int age; //       - //   factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // -  var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; //   json-,    API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); //    dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); }); 

ألاحظ أنه ليس فقط Stream ، ولكن أيضًا أي Iterable يوفر وظيفة map يمكنك استخدامها للتحولات في القوائم.


حيث


إذا كنت مهتمًا فقط بقيم معينة تحدث في الدفق ، فيمكنك استخدام الدالة .where() بدلاً من استخدام if في المستمع ، وهذا أكثر تعبيرًا وأسهل في القراءة:


 var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3 

Debounce


هذا هو واحد من اللؤلؤ الصغيرة من آر إكس! تخيل أن لديك حقل بحث يستدعي REST API إذا تم تغيير نصه. إجراء مكالمة API لكل ضغطة مفتاح أمر مكلف. وبالتالي ، لا ترغب في إجراء مكالمة إلا إذا توقف المستخدم للحظة. لهذا الغرض ، يتم استخدام الدالة debounce() ، والتي تبتلع كل الأحداث الواردة إذا لم يتم إتباعها مؤقتًا.


 var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); //    await Future.delayed(Duration(milliseconds: 700)); //       : 'ABC' 

لذلك ، إذا قمت بتحويل معالج TextField.onChanged إلى Observable ، فستحصل على حل أنيق.


وسع


إذا كان المصدر الخاص بك ينبعث منه صفائف من الكائنات ، وتريد معالجة كل كائن بنفسك ، فيمكنك استخدام .expand ، والذي سيفعل ذلك:


صورة


سترى تطبيق هذه الطريقة أدناه في مثال FireStore.


دمج


إذا كان لديك عدة مؤشرات .mergeWith مختلفة ، لكنك تريد معالجة الكائنات الخاصة بهم معًا ، يمكنك استخدام .mergeWith (في تطبيقات Rx الأخرى فقط merge ) ، والتي تأخذ مجموعة من مؤشرات الترابط وإرجاع مؤشر ترابط مدمج واحد.


صورة


.mergeWith لا يضمن دمج أي ترتيب في التدفقات. ينبعث البيانات في ترتيب المدخلات.


على سبيل المثال ، إذا كان لديك مكونان يقومان بالإبلاغ عن الأخطاء من خلال دفق ، وترغب في عرضها معًا في مربع حوار ، يمكنك القيام بذلك على النحو التالي (الرمز الزائف):


 @override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); } 

أو إذا كنت تريد عرضًا مشتركًا للرسائل من العديد من الشبكات الاجتماعية ، فقد يبدو هذا (الرمز الزائف):


 final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]); 

ZipWith


zipWith أيضا يدمج دفق واحد مع آخر. ولكن ، على عكس .mergeWith ، لا يرسل البيانات بمجرد تلقي عنصر من أحد تدفقات مصدره. ينتظر حتى وصول العناصر من كلا تدفقات المصدر ، ثم يجمعها باستخدام وظيفة zipper المقدمة:


صورة


يبدو توقيع zipWith مخيفًا ، لكننا ننظر إليه الآن:


 // R :   Stream/Observable // S :   Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) 

مثال مبسط للغاية:


 new Observable.just(1) // .just()  Observable,    .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); //  3 

هناك تطبيق أكثر عملية إذا كنت بحاجة إلى انتظار وظيفتين غير متزامنتين ترجع Future ، وتريد معالجة البيانات بمجرد أن يتم إرجاع كل النتائج. في هذا المثال المفترض بعض الشيء ، نقدم اثنين من واجهات برمجة التطبيقات (API) لـ REST: أحدهما يعرض User ، والآخر يعرض Product كسلاسل JSON ، ونريد انتظار كلا المكالمات قبل إرجاع كائن Invoice .


 class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } //  HTTP ,  Product,  JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } //  HTTP ,  User,  JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); //        await Future.delayed(Duration(seconds: 5)); }); } 

بالنظر إلى الإخراج ، يمكنك أن ترى كيف يتم ذلك بشكل غير متزامن


 Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99 

CombineLatest


يدمج combineLatest أيضًا قيم الدفق ، ولكن بطريقة مختلفة قليلاً عن merge zip . يستمع لمزيد من مؤشرات الترابط ويصدر قيمة مشتركة كلما وصلت قيمة جديدة من أحد سلاسل العمليات. من المثير للاهتمام أنه لا يولد القيمة المتغيرة فحسب ، ولكن أيضًا آخر قيم تم الحصول عليها لجميع تدفقات المصدر الأخرى. انظر بعناية إلى هذه الرسوم المتحركة:


صورة


قبل combineLates قيمته الأولى ، يجب أن تتلقى كافة مؤشرات الترابط المصدر إدخالًا واحدًا على الأقل.


على عكس الأساليب التي تم استخدامها سابقًا ، فإن combineLatest ثابت. بالإضافة إلى ذلك ، نظرًا لأن Dart لا يسمح combLastest الزائد للمشغل ، فهناك إصدارات من combLastest بناءً على عدد تدفقات المصدر: combineLatest2 ... combineLatest9


combineLatest استخدامًا جيدًا ، على سبيل المثال ، إذا كان لديك Observable<bool> combineLatest أن بعض أجزاء التطبيق لديك مشغولة ، وتريد عرض مشغول الدوار إذا كان أحدهما مشغولًا. قد يبدو مثل هذا (الرمز الزائف):


 class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; } 

في واجهة المستخدم الخاصة بك ، يمكنك استخدام isBusy مع StreamBuilder لعرض Spinner إذا كانت القيمة الناتجة صحيحة.


combineLatest ميزة مناسبة جدًا مع تيارات لقطات FireStore .


تخيل أنك تريد إنشاء تطبيق يعرض موجزًا ​​للأخبار جنبًا إلى جنب مع توقعات الطقس. يتم تخزين رسائل شريط وبيانات الطقس في مجموعتين مختلفتين من FireStore. يتم تحديث كلاهما بشكل مستقل. تريد عرض تحديثات البيانات باستخدام StreamBuilder. مع combineLatest فإنه من السهل:


 class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } } 

في واجهة المستخدم الخاصة بك ، ستبدو مثل هذا: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


متميز


في السيناريو الموضح أعلاه ، قد يحدث أن يقوم كل من isBusyOne و isBusyTwo بإعطاء القيمة نفسها ، مما سيؤدي إلى تحديث واجهة المستخدم بنفس البيانات. لمنع هذا ، يمكننا استخدام .distinct() . إنه يضمن عدم تدفق البيانات إلا إذا كانت قيمة العنصر الجديد مختلفة عن الأخيرة. وبالتالي ، نغير الرمز إلى:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct(); 

وهذا يوضح أيضًا أنه يمكننا الجمع بين وظائفنا في سلاسل مختلفة حسب الرغبة.


AsyncMap


بالإضافة إلى map() هناك أيضًا وظيفة asyncMap ، والتي تتيح لك استخدام دالة غير متزامنة كوظيفة مخطط. دعنا نقدم إعدادًا مختلفًا قليلاً لمثال FireStore. الآن يعتمد WeatherForecast الضروري على موقع NewsMessage ويجب تحديثه فقط عند تلقي NewsMessage جديد:


 Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); } 

سيتم إنشاء الملاحظة التي يتم إرجاعها بواسطة getDependendMessages في إنشاء CombinedMessage جديد في كل مرة يتغير newsCollection.


Debug Observables


بالنظر إلى سلاسل نداء Rx الأنيقة ، يبدو من المستحيل تقريبًا تصحيح تعبير مثل هذا:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); 

لكن ضع في اعتبارك أن => مجرد نموذج قصير لوظيفة مجهولة. باستخدام تحويل لمنع الجسم ، سوف تحصل على:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); 

والآن يمكننا تعيين نقطة توقف أو إضافة بيانات مطبوعة في كل خطوة من خط أنابيبنا.


احذر من الآثار الجانبية


إذا كنت ترغب في الاستفادة من Rx لجعل شفرتك أكثر قوة ، فضع في اعتبارك دائمًا أن Rx هو تحويل البيانات عندما يتحرك "على طول حزام ناقل". لذلك ، لا تستدعي أبدًا الوظائف التي تغير أي متغيرات / حالات خارج خط أنابيب المعالجة حتى تصل إلى وظيفة .listen.
بدلاً من القيام بذلك:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen(); 

افعل هذا:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); }); 

واجب map() هو تحويل البيانات في الدفق ، وليس أكثر! إذا كانت وظيفة العرض التي تم تمريرها تؤدي شيئًا آخر ، فسيتم اعتبارها من الآثار الجانبية ، مما يؤدي إلى حدوث أخطاء محتملة يصعب اكتشافها عند قراءة الرمز.


بعض الأفكار حول تحرير الموارد


لتجنب تسرب الذاكرة ، اتصل دائمًا cancel() للاشتراكات ، dispose() أجل StreamControllers ، close() للموضوعات ، بمجرد عدم حاجتك إليها.


استنتاج


مبروك لو بقيت معي حتى هذه اللحظة. الآن لا يمكنك استخدام Rx فقط لجعل حياتك أسهل ، ولكن أيضًا الاستعداد للمشاركات التالية التي سنبحث فيها في تفاصيل RxVMS .

Source: https://habr.com/ru/post/ar451292/


All Articles