RxDart: transformasi ajaib arus

Selamat datang - Ini adalah bagian ketiga dari seri artikel Flutter Architecture saya.



Kali ini kita akan terjun kecil ke dunia magis ekstensi reaktif (Rx). Saya akan fokus pada fungsi Rx yang paling sering digunakan dan menjelaskan aplikasi mereka. Jika Anda belum membaca posting sebelumnya, sekarang adalah waktunya untuk ini sebelum melanjutkan.


RxDart adalah implementasi konsep Rx untuk bahasa Dart, terima kasih kepada Frank Pepermans dan Brian Egan untuk itu . Jika sebelumnya Anda menggunakan Rx dalam bahasa lain, Anda mungkin akan melihat perbedaan dalam penamaan sejumlah fungsi, tetapi ini tidak akan menyebabkan Anda kesulitan.


Kode untuk pengujian ada di sini .


Sejauh ini, kami telah menggunakan stream sebagai cara untuk mentransfer data dari satu tempat ke tempat lain dalam aplikasi kami, tetapi mereka dapat melakukan lebih banyak lagi. Mari kita lihat beberapa fitur yang ditambahkan Rx ke Streams.


Membuat Observable


Seperti yang dinyatakan sebelumnya , Observables adalah versi Rx stream dengan fitur hebat. Ada beberapa cara menarik untuk membuatnya:


Di luar arus


Aliran apa pun dapat dikonversi ke Observable dengan meneruskannya ke konstruktor:


var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print); 

Peristiwa berulang


 var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print); 

Dengan cara ini, Observable akan dibangun yang menampilkan nilai dengan periode tertentu. Jadi Anda bisa mengganti timer.


Dari satu nilai


Terkadang API mengharapkan Stream / Diobservasi di mana Anda hanya memiliki nilai. Untuk kasus seperti itu, Observable memiliki pabrik.


 var justObservable = Observable<int>.just(42); justObservable.listen(print); //   : 42 

Dari Masa Depan


  Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print); 

Membuat Observable from Future akan menunggu Future untuk menyelesaikan dan mengembalikan nilai untuk hasilnya atau null jika nilainya tidak dikembalikan. Cara lain untuk membuat aliran dari Masa Depan adalah memanggil toStream() untuk Masa Depan apa pun.


Anda mungkin bertanya-tanya apa gunanya mengubah Future menjadi Observable / Stream alih-alih hanya menunggu saja. Yakinlah, ini akan menjadi jelas ketika kita memeriksa fungsi yang tersedia untuk memanipulasi data saat mereka berada di aliran.


Subjek


Subjects adalah pengganti StreamController di RxDart, dan itulah bagaimana mereka diimplementasikan di suatu tempat di perut perpustakaan.


Tetapi perilaku mereka sedikit berbeda dari StreamControllers dasar:


  • Anda dapat menerapkan listen() langsung ke Subjek, tanpa mengakses properti Stream
  • sejumlah langganan tersedia, dan semua pendengar menerima data yang sama pada saat yang sama
  • Ada tiga jenis Subjek, yang dijelaskan di bawah ini dengan contoh:

Publikasikan Subjek


PublishSubjects berperilaku seperti StreamControllers , kecuali untuk kemungkinan banyak pendengar:


 var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); //    subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); //        await Future.delayed(Duration(seconds: 5)); //    subject.close; 

Jalankan kode ini dan Anda akan mendapatkan:


 Item1 ITEM2 Item2 ITEM3 Item3 

Jelas bahwa pendengar kedua yang terlambat menghadiri pesta (kami akan menyebutnya pelanggan terlambat) melewatkan poin pertama. Untuk menghindari ini, Anda dapat menggunakan BehaviourSubject


Subjek perilaku


Dengan BehaviourSubject setiap pelanggan baru akan menerima nilai yang terakhir diterima:


 var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3"); 

Di pintu keluar


 Item1 ITEM2 ITEM3 Item2 Item3 

Anda dapat melihat bahwa Item1 hilang untuk pelanggan kedua, tetapi menerima Item2 . Anda mungkin terkejut bahwa pelanggan kedua menerima Item3 sebelum pelanggan pertama menerima Item2 . Ini karena urutan layanan pelanggan tidak dijamin, meskipun semua pelanggan menerima data dalam urutan yang benar. BehaviourSubject hanya menyimpan cache item terakhir yang diterima untuk pelanggan yang terlambat. Jika Anda perlu men- cache lebih banyak elemen, Anda dapat menggunakan ReplaySubject . Dalam kebanyakan kasus ini tidak perlu.


Memanipulasi data dengan cepat



Kekuatan sebenarnya dari Rx terletak pada kenyataan bahwa Rx memungkinkan Anda untuk memproses data selama transmisi melalui aliran. Setiap metode Rx mengembalikan aliran baru dengan data yang dihasilkan (seperti dalam ilustrasi), yang berarti Anda dapat mengikat mereka bersama dalam satu pipa pemrosesan, dan ini membuat Rx alat yang sangat kuat.


Peta


Jika ada operasi Stream yang tidak ingin saya lewatkan, maka ini adalah map() . Apa yang dilakukan map() adalah bahwa setiap item data harus ditransfer dan menerapkan fungsi tertentu padanya, setelah itu menempatkan hasilnya dalam aliran yang dihasilkan. Contoh sederhana:



 var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3"); 

Hasil:


 ITEM1 ITEM2 ITEM3 

Tetapi map tidak diperlukan untuk mengembalikan tipe data yang sama dengan yang diterimanya sebagai input. Contoh berikut akan mengambil bilangan bulat sebagai ganti string. Selain itu, kami akan menautkan dua transformasi:


 var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3); 

atau sesuatu seperti ini:



 class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a)); 

Salah satu penggunaan .map paling berguna adalah ketika Anda mendapatkan data dalam format dari beberapa REST API atau dari database dan ingin data tersebut dikonversi ke objek Anda sendiri:


 class User { final String name; final String adress; final String phoneNumber; final int age; //       - //   factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // -  var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; //   json-,    API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); //    dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); }); 

Saya perhatikan bahwa tidak hanya Streams, tetapi juga setiap Iterable menawarkan fungsi map yang dapat Anda gunakan untuk transformasi dalam daftar.


Dimana


Jika Anda hanya tertarik pada nilai-nilai tertentu yang terjadi dalam aliran, Anda dapat menggunakan fungsi .where() alih-alih menggunakan if di pendengar Anda, ini lebih ekspresif dan lebih mudah dibaca:


 var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3 

Debounce


Ini adalah salah satu mutiara kecil Rx! Bayangkan Anda memiliki bidang pencarian yang memanggil REST API jika teksnya diubah. Membuat panggilan API untuk setiap penekanan tombol itu mahal. Dengan demikian, Anda hanya ingin melakukan panggilan jika pengguna berhenti sejenak. Untuk ini, fungsi debounce() digunakan, yang menelan semua peristiwa yang masuk jika tidak diikuti oleh jeda.


 var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); //    await Future.delayed(Duration(milliseconds: 700)); //       : 'ABC' 

Karena itu, jika Anda mengonversi penangan TextField.onChanged ke Observable , Anda akan mendapatkan solusi yang elegan.


Perluas


Jika Stream sumber Anda memancarkan array objek, dan Anda ingin memproses masing-masing objek sendiri, Anda dapat menggunakan. .expand , yang akan melakukan hal itu:


gambar


Anda akan melihat penerapan metode ini di bawah dalam contoh FireStore.


Gabungkan


Jika Anda memiliki beberapa utas berbeda, tetapi Anda ingin memproses objek mereka bersama-sama, Anda dapat menggunakan .mergeWith (dalam implementasi Rx lainnya hanya merge ), yang mengambil larik utas dan mengembalikan satu utas yang digabungkan.


gambar


.mergeWith tidak menjamin bahwa pesanan apa pun dalam aliran digabungkan. Data dipancarkan dalam urutan input.


Misalnya, jika Anda memiliki dua komponen yang melaporkan kesalahan melalui aliran, dan Anda ingin mereka ditampilkan bersama dalam dialog, Anda bisa melakukan ini sebagai berikut (pseudo-code):


 @override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); } 

atau jika Anda ingin tampilan pesan gabungan dari beberapa jejaring sosial, mungkin akan terlihat seperti ini (pseudo-code):


 final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]); 

Zipwith


zipWith juga menggabungkan satu aliran dengan yang lain. Tapi, tidak seperti .mergeWith , ia tidak mengirim data segera setelah menerima elemen dari salah satu stream sumbernya. Dia menunggu sampai elemen dari kedua aliran sumber tiba, dan kemudian menggabungkannya menggunakan fungsi zipper disediakan:


gambar


zipWith tangan zipWith tampak menakutkan, tetapi sekarang kita melihatnya:


 // R :   Stream/Observable // S :   Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) 

Contoh yang sangat sederhana:


 new Observable.just(1) // .just()  Observable,    .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); //  3 

Aplikasi yang lebih praktis adalah jika Anda harus menunggu dua fungsi tidak sinkron yang mengembalikan Future , dan Anda ingin memproses data segera setelah kedua hasil dikembalikan. Dalam contoh yang sedikit dibuat-buat ini, kami menyajikan dua API REST: satu mengembalikan User , yang lainnya mengembalikan Product sebagai string JSON, dan kami ingin menunggu kedua panggilan sebelum mengembalikan objek Invoice .


 class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } //  HTTP ,  Product,  JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } //  HTTP ,  User,  JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); //        await Future.delayed(Duration(seconds: 5)); }); } 

Melihat output, Anda dapat melihat bagaimana hal ini dilakukan secara serempak


 Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99 

Combinelatest


combineLatest juga menggabungkan nilai aliran, tetapi dengan cara yang sedikit berbeda dari merge dan zip . Ini mendengarkan lebih banyak utas dan mengeluarkan nilai gabungan setiap kali nilai baru datang dari salah satu utas. Sangat menarik bahwa ia menghasilkan tidak hanya nilai yang diubah, tetapi juga nilai yang diperoleh terakhir dari semua aliran sumber lainnya. Perhatikan baik-baik animasi ini:


gambar


Sebelum combineLates nilai pertama, semua utas sumber harus menerima setidaknya satu entri.


Berbeda dengan metode yang digunakan sebelumnya, combineLatest adalah statis. Selain itu, karena Dart tidak memungkinkan overloading operator, ada versi combLastest tergantung pada jumlah stream sumber: kombinasikanLatest2 ... kombinasikanLatest9


combineLatest penggunaan yang baik, misalnya, jika Anda memiliki dua Observable<bool> yang menandakan bahwa beberapa bagian dari aplikasi Anda sedang sibuk, dan Anda ingin menampilkan pemintal Sibuk jika salah satunya sibuk. Mungkin terlihat seperti ini (pseudo-code):


 class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; } 

Di UI Anda, Anda bisa menggunakan isBusy dengan StreamBuilder untuk menampilkan Spinner jika nilai yang dihasilkan benar.


combineLatest sangat cocok dikombinasikan dengan stream snapshots FireStore .


Bayangkan Anda ingin membuat aplikasi yang menampilkan feed berita beserta ramalan cuaca. Pesan ticker dan data cuaca disimpan dalam dua koleksi FireStore yang berbeda. Keduanya diperbarui secara independen. Anda ingin menampilkan pembaruan data menggunakan StreamBuilder. Dengan combineLatest mudah:


 class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } } 

Di UI Anda, akan terlihat seperti ini: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


Berbeda


Dalam skenario yang dijelaskan di atas, mungkin saja isBusyOne dan isBusyTwo memberikan nilai yang sama, yang akan mengarah pada pembaruan antarmuka pengguna dengan data yang sama. Untuk mencegah ini, kita bisa menggunakan .distinct() . Ini memastikan bahwa data hanya dialirkan jika nilai elemen baru berbeda dari yang terakhir. Dengan demikian, kami akan mengubah kode menjadi:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct(); 

dan itu juga menunjukkan bahwa kita dapat menggabungkan fungsi kita dalam rantai yang berbeda sesuka hati.


Asyncmap


Selain map() ada juga fungsi asyncMap , yang memungkinkan Anda untuk menggunakan fungsi asinkron sebagai fungsi peta. Mari kita perkenalkan pengaturan yang sedikit berbeda untuk contoh FireStore kami. Sekarang, WeatherForecast yang diperlukan tergantung pada lokasi NewsMessage dan hanya akan diperbarui ketika NewsMessage baru diterima :


 Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); } 

Observable yang dikembalikan oleh getDependendMessages akan menghasilkan CombinedMessage baru setiap kali newsCollection berubah.


Debug Dapat Diobservasi


Melihat rantai panggilan Rx yang elegan, tampaknya hampir mustahil untuk men-debug ekspresi seperti ini:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); 

Tetapi perlu diingat bahwa => hanyalah bentuk singkat untuk fungsi anonim. Menggunakan Konversi untuk memblokir tubuh , Anda akan mendapatkan:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); 

Dan sekarang kita dapat mengatur breakpoint atau menambahkan pernyataan cetak pada setiap langkah dari pipeline kita.


Waspadai efek samping


Jika Anda ingin memanfaatkan Rx untuk membuat kode Anda lebih kuat, selalu ingat bahwa Rx adalah konversi data saat memindahkannya "di sepanjang sabuk konveyor". Oleh karena itu, jangan pernah panggil fungsi yang mengubah variabel / status di luar pipa pemrosesan hingga Anda mencapai fungsi .listen.
Alih-alih melakukannya:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen(); 

lakukan ini:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); }); 

Tugas map() adalah untuk mengubah data dalam aliran, DAN TIDAK ADA LEBIH BANYAK! Jika fungsi tampilan yang disahkan melakukan sesuatu yang lain, itu akan dianggap sebagai efek samping, menghasilkan potensi kesalahan yang sulit dideteksi ketika membaca kode.


Beberapa pemikiran tentang pembebasan sumber daya


Untuk menghindari kebocoran memori, selalu panggil cancel() untuk langganan, dispose() untuk StreamControllers, close() untuk Subjek, segera setelah Anda tidak lagi membutuhkannya.


Kesimpulan


Selamat jika kamu tetap bersamaku sampai saat ini. Sekarang Anda tidak hanya dapat menggunakan Rx untuk membuat hidup Anda lebih mudah, tetapi juga mempersiapkan posting berikutnya di mana kita akan mempelajari rincian RxVMS .

Source: https://habr.com/ru/post/id451292/


All Articles