مرحبًا - هذا هو الجزء الثالث من سلسلة مقالات Flutter Architecture الخاصة بي.
هذه المرة سنجعل الغوص صغيرًا في عالم التمديدات التفاعلية (Rx). سوف أركز على أكثر وظائف Rx استخدامًا وشرح تطبيقها. إذا لم تكن قد قرأت المنشور السابق ، فقد حان الوقت لهذا قبل الانتقال.
RxDart هو تطبيق لمفهوم Rx للغة Dart ، وذلك بفضل فرانك Pepermans و Brian Egan على ذلك . إذا سبق لك استخدام Rx بلغات أخرى ، فمن المحتمل أن تلاحظ اختلافًا في تسمية عدد من الوظائف ، لكن هذا لن يسبب لك أي صعوبات.
رمز الاختبار هنا .
حتى الآن ، استخدمنا التدفقات كوسيلة لنقل البيانات من مكان إلى آخر في تطبيقنا ، لكن يمكنهم القيام بالكثير. دعنا نلقي نظرة على بعض الميزات التي يضيفها Rx إلى Streams.
خلق ملاحظات
كما ذكرنا سابقًا ، تمثل Observables إصدارات Rx من التدفقات ذات الميزات الرائعة. هناك العديد من الطرق المثيرة للاهتمام لإنشائها:
خارج الدفق
يمكن تحويل أي تيار إلى ملاحظ عن طريق تمريره إلى المنشئ:
var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print);
الأحداث المتكررة
var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print);
وبهذه الطريقة ، سيتم إنشاء الملاحظة التي تعرض القيم مع فترة محددة. حتى تتمكن من استبدال الموقت.
من قيمة واحدة
في بعض الأحيان تتوقع واجهة برمجة التطبيقات (API) تدفق / يمكن ملاحظته حيث يكون لديك قيمة فقط. لمثل هذه الحالات ، فإن الملاحظة يمكن أن يكون لها مصنع.
var justObservable = Observable<int>.just(42); justObservable.listen(print);
من المستقبل
Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print);
سينتظر إنشاء " Observable
من المستقبل" حتى يكتمل " المستقبل" Observable
قيمة Observable
أو null
إذا لم يتم إرجاع القيمة. هناك طريقة أخرى لإنشاء دفق من Future وهي استدعاء toStream()
لأي مستقبل.
قد تتساءل عن الهدف من تحويل المستقبل إلى ملاحظة / دفق بدلاً من مجرد الانتظار. كن مطمئنًا ، سيصبح ذلك واضحًا عندما ندرس الوظائف المتاحة لمعالجة البيانات أثناء وجودها في البث المباشر.
الموضوعات
Subjects
هي بديل عن StreamController
في RxDart ، وهذا هو كيف يتم تنفيذها في مكان ما في أحشاء المكتبة.
لكن سلوكهم يختلف قليلاً عن StreamControllers الأساسية:
- يمكنك تطبيق
listen()
مباشرة على الموضوع ، دون الوصول إلى خاصية Stream - يتوفر أي عدد من الاشتراكات ، ويتلقى جميع المستمعين نفس البيانات في نفس الوقت
- هناك ثلاثة أنواع من الموضوعات ، موضحة أدناه مع أمثلة:
PublishSubjects
تتصرف PublishSubjects
مثل StreamControllers
، باستثناء احتمال وجود العديد من المستمعين:
var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1");
قم بتشغيل هذا الرمز وستحصل على:
Item1 ITEM2 Item2 ITEM3 Item3
من الواضح أن المستمع الثاني الذي تأخر عن الحفلة (سوف نسميها المشتركين المتأخرين) أخطأ النقطة الأولى. لتجنب هذا ، يمكنك استخدام BehaviourSubject
BehaviourSubject
مع BehaviourSubject
كل مشترك جديد أولاً على آخر قيمة مقبولة:
var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3");
عند الخروج
Item1 ITEM2 ITEM3 Item2 Item3
يمكنك أن ترى أن Item1
فقد للمشترك الثاني ، لكنه يتلقى Item2
. قد تفاجأ أن المشترك الثاني يتلقى Item3
قبل أن يحصل المشترك الأول على Item2
. وذلك لأن تسلسل المشتركين في الخدمة غير مضمون ، على الرغم من أن جميع المشتركين يتلقون البيانات بالترتيب الصحيح. BehaviourSubject
فقط بتخزين آخر عنصر تم استلامه للمشتركين المتأخرين. إذا كنت بحاجة إلى تخزين المزيد من العناصر ، يمكنك استخدام ReplaySubject . في معظم الحالات ، هذا ليس ضروريًا.
معالجة البيانات على الطاير

تكمن القوة الحقيقية لـ Rx في حقيقة أنها تسمح لك بمعالجة البيانات أثناء النقل عبر الدفق. تقوم كل طريقة من طرق Rx بإرجاع دفق جديد مع البيانات الناتجة (كما في الرسم التوضيحي) ، مما يعني أنه يمكنك ربطها معًا في خط أنابيب معالجة واحد ، وهذا يجعل Rx أداة قوية للغاية.
خريطة
إذا كان هناك أي عملية تشغيل لا أريد تفويتها ، فهذه هي map()
. ما تقوم به map()
هو أنها تحتاج إلى نقل كل عنصر من عناصر البيانات وتطبيق وظيفة معينة عليه ، وبعد ذلك تضع النتيجة في الدفق الناتج. مثال بسيط:

var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3");
النتيجة:
ITEM1 ITEM2 ITEM3
لكن map
مطلوبة لإرجاع نفس نوع البيانات الذي تتلقاه كمدخلات. سيأخذ المثال التالي أعدادًا صحيحة بدلاً من السلاسل. بالإضافة إلى ذلك ، سوف نربط بين تحويلين:
var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3);
أو شيء من هذا القبيل:

class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a));
أحد أكثر الاستخدامات المفيدة لـ .map
هو عندما تحصل على بيانات بتنسيق من بعض واجهة برمجة تطبيقات REST أو من قاعدة بيانات وتريد تحويلها إلى كائنات خاصة بك:
class User { final String name; final String adress; final String phoneNumber; final int age;
ألاحظ أنه ليس فقط Stream ، ولكن أيضًا أي Iterable يوفر وظيفة map
يمكنك استخدامها للتحولات في القوائم.
حيث
إذا كنت مهتمًا فقط بقيم معينة تحدث في الدفق ، فيمكنك استخدام الدالة .where()
بدلاً من استخدام if
في المستمع ، وهذا أكثر تعبيرًا وأسهل في القراءة:
var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3);
Debounce
هذا هو واحد من اللؤلؤ الصغيرة من آر إكس! تخيل أن لديك حقل بحث يستدعي REST API إذا تم تغيير نصه. إجراء مكالمة API لكل ضغطة مفتاح أمر مكلف. وبالتالي ، لا ترغب في إجراء مكالمة إلا إذا توقف المستخدم للحظة. لهذا الغرض ، يتم استخدام الدالة debounce()
، والتي تبتلع كل الأحداث الواردة إذا لم يتم إتباعها مؤقتًا.
var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC");
لذلك ، إذا قمت بتحويل معالج TextField.onChanged
إلى Observable
، فستحصل على حل أنيق.
وسع
إذا كان المصدر الخاص بك ينبعث منه صفائف من الكائنات ، وتريد معالجة كل كائن بنفسك ، فيمكنك استخدام .expand
، والذي سيفعل ذلك:

سترى تطبيق هذه الطريقة أدناه في مثال FireStore.
دمج
إذا كان لديك عدة مؤشرات .mergeWith
مختلفة ، لكنك تريد معالجة الكائنات الخاصة بهم معًا ، يمكنك استخدام .mergeWith
(في تطبيقات Rx الأخرى فقط merge
) ، والتي تأخذ مجموعة من مؤشرات الترابط وإرجاع مؤشر ترابط مدمج واحد.

.mergeWith
لا يضمن دمج أي ترتيب في التدفقات. ينبعث البيانات في ترتيب المدخلات.
على سبيل المثال ، إذا كان لديك مكونان يقومان بالإبلاغ عن الأخطاء من خلال دفق ، وترغب في عرضها معًا في مربع حوار ، يمكنك القيام بذلك على النحو التالي (الرمز الزائف):
@override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); }
أو إذا كنت تريد عرضًا مشتركًا للرسائل من العديد من الشبكات الاجتماعية ، فقد يبدو هذا (الرمز الزائف):
final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]);
ZipWith
zipWith
أيضا يدمج دفق واحد مع آخر. ولكن ، على عكس .mergeWith
، لا يرسل البيانات بمجرد تلقي عنصر من أحد تدفقات مصدره. ينتظر حتى وصول العناصر من كلا تدفقات المصدر ، ثم يجمعها باستخدام وظيفة zipper
المقدمة:

يبدو توقيع zipWith
مخيفًا ، لكننا ننظر إليه الآن:
مثال مبسط للغاية:
new Observable.just(1)
هناك تطبيق أكثر عملية إذا كنت بحاجة إلى انتظار وظيفتين غير متزامنتين ترجع Future
، وتريد معالجة البيانات بمجرد أن يتم إرجاع كل النتائج. في هذا المثال المفترض بعض الشيء ، نقدم اثنين من واجهات برمجة التطبيقات (API) لـ REST: أحدهما يعرض User
، والآخر يعرض Product
كسلاسل JSON ، ونريد انتظار كلا المكالمات قبل إرجاع كائن Invoice
.
class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } }
بالنظر إلى الإخراج ، يمكنك أن ترى كيف يتم ذلك بشكل غير متزامن
Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99
CombineLatest
يدمج combineLatest
أيضًا قيم الدفق ، ولكن بطريقة مختلفة قليلاً عن merge
zip
. يستمع لمزيد من مؤشرات الترابط ويصدر قيمة مشتركة كلما وصلت قيمة جديدة من أحد سلاسل العمليات. من المثير للاهتمام أنه لا يولد القيمة المتغيرة فحسب ، ولكن أيضًا آخر قيم تم الحصول عليها لجميع تدفقات المصدر الأخرى. انظر بعناية إلى هذه الرسوم المتحركة:

قبل combineLates
قيمته الأولى ، يجب أن تتلقى كافة مؤشرات الترابط المصدر إدخالًا واحدًا على الأقل.
على عكس الأساليب التي تم استخدامها سابقًا ، فإن combineLatest
ثابت. بالإضافة إلى ذلك ، نظرًا لأن Dart لا يسمح combLastest
الزائد للمشغل ، فهناك إصدارات من combLastest
بناءً على عدد تدفقات المصدر: combineLatest2 ... combineLatest9
combineLatest
استخدامًا جيدًا ، على سبيل المثال ، إذا كان لديك Observable<bool>
combineLatest
أن بعض أجزاء التطبيق لديك مشغولة ، وتريد عرض مشغول الدوار إذا كان أحدهما مشغولًا. قد يبدو مثل هذا (الرمز الزائف):
class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; }
في واجهة المستخدم الخاصة بك ، يمكنك استخدام isBusy
مع StreamBuilder
لعرض Spinner
إذا كانت القيمة الناتجة صحيحة.
combineLatest
ميزة مناسبة جدًا مع تيارات لقطات FireStore .
تخيل أنك تريد إنشاء تطبيق يعرض موجزًا للأخبار جنبًا إلى جنب مع توقعات الطقس. يتم تخزين رسائل شريط وبيانات الطقس في مجموعتين مختلفتين من FireStore. يتم تحديث كلاهما بشكل مستقل. تريد عرض تحديثات البيانات باستخدام StreamBuilder. مع combineLatest
فإنه من السهل:
class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } }
في واجهة المستخدم الخاصة بك ، ستبدو مثل هذا: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).
متميز
في السيناريو الموضح أعلاه ، قد يحدث أن يقوم كل من isBusyOne و isBusyTwo بإعطاء القيمة نفسها ، مما سيؤدي إلى تحديث واجهة المستخدم بنفس البيانات. لمنع هذا ، يمكننا استخدام .distinct()
. إنه يضمن عدم تدفق البيانات إلا إذا كانت قيمة العنصر الجديد مختلفة عن الأخيرة. وبالتالي ، نغير الرمز إلى:
Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
وهذا يوضح أيضًا أنه يمكننا الجمع بين وظائفنا في سلاسل مختلفة حسب الرغبة.
AsyncMap
بالإضافة إلى map()
هناك أيضًا وظيفة asyncMap
، والتي تتيح لك استخدام دالة غير متزامنة كوظيفة مخطط. دعنا نقدم إعدادًا مختلفًا قليلاً لمثال FireStore. الآن يعتمد WeatherForecast الضروري على موقع NewsMessage ويجب تحديثه فقط عند تلقي NewsMessage جديد:
Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); }
سيتم إنشاء الملاحظة التي يتم إرجاعها بواسطة getDependendMessages في إنشاء CombinedMessage جديد في كل مرة يتغير newsCollection.
Debug Observables
بالنظر إلى سلاسل نداء Rx الأنيقة ، يبدو من المستحيل تقريبًا تصحيح تعبير مثل هذا:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
لكن ضع في اعتبارك أن =>
مجرد نموذج قصير لوظيفة مجهولة. باستخدام تحويل لمنع الجسم ، سوف تحصل على:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); });
والآن يمكننا تعيين نقطة توقف أو إضافة بيانات مطبوعة في كل خطوة من خط أنابيبنا.
احذر من الآثار الجانبية
إذا كنت ترغب في الاستفادة من Rx لجعل شفرتك أكثر قوة ، فضع في اعتبارك دائمًا أن Rx هو تحويل البيانات عندما يتحرك "على طول حزام ناقل". لذلك ، لا تستدعي أبدًا الوظائف التي تغير أي متغيرات / حالات خارج خط أنابيب المعالجة حتى تصل إلى وظيفة .listen.
بدلاً من القيام بذلك:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen();
افعل هذا:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); });
واجب map()
هو تحويل البيانات في الدفق ، وليس أكثر! إذا كانت وظيفة العرض التي تم تمريرها تؤدي شيئًا آخر ، فسيتم اعتبارها من الآثار الجانبية ، مما يؤدي إلى حدوث أخطاء محتملة يصعب اكتشافها عند قراءة الرمز.
بعض الأفكار حول تحرير الموارد
لتجنب تسرب الذاكرة ، اتصل دائمًا cancel()
للاشتراكات ، dispose()
أجل StreamControllers ، close()
للموضوعات ، بمجرد عدم حاجتك إليها.
استنتاج
مبروك لو بقيت معي حتى هذه اللحظة. الآن لا يمكنك استخدام Rx فقط لجعل حياتك أسهل ، ولكن أيضًا الاستعداد للمشاركات التالية التي سنبحث فيها في تفاصيل RxVMS .