RxDart: transformations magiques des flux

Bienvenue - Ceci est la troisième partie de ma série d'articles Flutter Architecture.



Cette fois, nous allons faire une petite plongée dans le domaine magique des extensions réactives (Rx). Je vais me concentrer sur les fonctions Rx les plus utilisées et expliquer leur application. Si vous n'avez pas lu le post précédent, c'est le moment de le faire avant de continuer.


RxDart est une implémentation du concept Rx pour le langage Dart, merci à Frank Pepermans et Brian Egan pour cela . Si vous avez déjà utilisé Rx dans d'autres langues, vous remarquerez probablement une différence dans la dénomination d'un certain nombre de fonctions, mais il est peu probable que cela vous cause des difficultés.


Le code de test est ici .


Jusqu'à présent, nous avons utilisé les flux comme moyen de transférer des données d'un endroit à un autre dans notre application, mais ils peuvent faire beaucoup plus. Jetons un coup d'œil à certaines des fonctionnalités que Rx ajoute aux Streams.


Créer des observables


Comme indiqué précédemment , les observables sont des versions Rx de flux avec de grandes fonctionnalités. Il existe plusieurs façons intéressantes de les créer:


Hors flux


N'importe quel Stream peut être converti en Observable en le passant au constructeur:


var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print); 

Evènements récurrents


 var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print); 

De cette façon, un observable sera construit qui affiche des valeurs avec une période spécifique. Vous pouvez donc remplacer la minuterie.


D'une seule valeur


Parfois, une API attend un flux / observable où vous avez juste une valeur. Pour de tels cas, l'Observable a une usine.


 var justObservable = Observable<int>.just(42); justObservable.listen(print); //   : 42 

Du futur


  Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print); 

La création d'un Observable partir du futur attendra la fin du futur et renverra une valeur pour son résultat ou null si la valeur n'est pas renvoyée. Une autre façon de créer un flux à partir de Future est d'appeler toStream() pour n'importe quel Future.


Vous vous demandez peut-être quel est l'intérêt de convertir Future en Observable / Stream au lieu de simplement l'attendre. Rassurez-vous, cela deviendra clair lorsque nous examinerons les fonctions disponibles pour manipuler les données pendant qu'elles sont dans le flux.


Sujets


Subjects remplacent StreamController dans RxDart, et c'est ainsi qu'ils sont implémentés quelque part dans les entrailles de la bibliothèque.


Mais leur comportement est légèrement différent des StreamControllers de base:


  • vous pouvez appliquer listen() directement à un sujet, sans accéder à la propriété Stream
  • n'importe quel nombre d'abonnements est disponible, et tous les auditeurs reçoivent les mêmes données en même temps
  • Il existe trois variétés de sujets, qui sont expliqués ci-dessous avec des exemples:

PublishSubjects


PublishSubjects se comportent comme StreamControllers , sauf pour la possibilité de nombreux écouteurs:


 var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); //    subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); //        await Future.delayed(Duration(seconds: 5)); //    subject.close; 

Exécutez ce code et vous obtiendrez:


 Item1 ITEM2 Item2 ITEM3 Item3 

Il est clair que le deuxième auditeur qui était en retard pour la fête (nous les appellerons abonnés tardifs) a raté le premier point. Pour éviter cela, vous pouvez utiliser BehaviourSubject


Sujet de comportement


Avec BehaviourSubject chaque nouvel abonné recevra d'abord la dernière valeur acceptée:


 var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3"); 

A la sortie


 Item1 ITEM2 ITEM3 Item2 Item3 

Vous pouvez voir que Item1 perdu pour le deuxième abonné, mais il reçoit Item2 . Vous pourriez être surpris que le deuxième abonné reçoive Item3 avant que le premier abonné ne reçoive Item2 . En effet, la séquence de maintenance des abonnés n'est pas garantie, bien que tous les abonnés reçoivent les données dans le bon ordre. BehaviourSubject ne met en cache que le dernier élément reçu pour les abonnés en retard. Si vous devez mettre en cache plus d'éléments, vous pouvez utiliser ReplaySubject . Dans la plupart des cas, cela n'est pas nécessaire.


Manipulation des données à la volée



La véritable force de Rx réside dans le fait qu'il vous permet de traiter les données lors de la transmission sur le flux. Chacune des méthodes Rx renvoie un nouveau flux avec les données résultantes (comme dans l'illustration), ce qui signifie que vous pouvez les lier ensemble dans un pipeline de traitement, ce qui fait de Rx un outil extrêmement puissant.


La carte


S'il y a une opération Stream que je ne veux surtout pas manquer, alors c'est map() . Ce que fait map() c'est qu'il prend chaque élément de données à transférer et lui applique une certaine fonction, après quoi il met le résultat dans le flux résultant. Un exemple simple:



 var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3"); 

Résultat:


 ITEM1 ITEM2 ITEM3 

Mais map pas requis pour renvoyer le même type de données qu'il reçoit en entrée. L'exemple suivant prendra des entiers au lieu de chaînes. De plus, nous lierons deux transformations:


 var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3); 

ou quelque chose comme ça:



 class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a)); 

L'une des utilisations les plus utiles de .map est lorsque vous obtenez des données dans un format à partir d'une API REST ou d'une base de données et que vous souhaitez les convertir en vos propres objets:


 class User { final String name; final String adress; final String phoneNumber; final int age; //       - //   factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // -  var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; //   json-,    API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); //    dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); }); 

Je note que non seulement Streams, mais aussi tout Iterable offre une fonction de map que vous pouvez utiliser pour les transformations dans les listes.


O Where


Si vous êtes uniquement intéressé par certaines valeurs qui se produisent dans le flux, vous pouvez utiliser la fonction .where() au lieu d'utiliser l' if dans votre écouteur, c'est plus expressif et plus facile à lire:


 var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3 

Debounce


C'est l'une des petites perles de Rx! Imaginez que vous ayez un champ de recherche qui appelle l'API REST si son texte est modifié. Faire un appel API pour chaque frappe coûte cher. Ainsi, vous souhaitez effectuer un appel uniquement si l'utilisateur fait une pause pendant un moment. Pour cela, la fonction debounce() est utilisée, qui avale tous les événements entrants s'ils ne sont pas suivis d'une pause.


 var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); //    await Future.delayed(Duration(milliseconds: 700)); //       : 'ABC' 

Par conséquent, si vous convertissez le gestionnaire TextField.onChanged en Observable , vous obtiendrez une solution élégante.


Développer


Si votre flux source émet des tableaux d'objets et que vous souhaitez traiter chaque objet vous-même, vous pouvez utiliser .expand , qui fera exactement cela:


image


Vous verrez l'application de cette méthode ci-dessous dans l'exemple FireStore.


Fusionner


Si vous avez plusieurs threads différents, mais que vous souhaitez traiter leurs objets ensemble, vous pouvez utiliser .mergeWith (dans d'autres implémentations Rx, simplement merge ), qui prend un tableau de threads et renvoie un thread fusionné.


image


.mergeWith ne garantit pas que tout ordre des flux soit fusionné. Les données sont émises dans l'ordre d'entrée.


Par exemple, si vous avez deux composants qui signalent des erreurs via un flux et que vous souhaitez les afficher ensemble dans une boîte de dialogue, vous pouvez procéder comme suit (pseudo-code):


 @override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); } 

ou si vous souhaitez un affichage combiné des messages de plusieurs réseaux sociaux, cela peut ressembler à ceci (pseudocode):


 final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]); 

Zipwith


zipWith fusionne également un flux avec un autre. Mais, contrairement à .mergeWith , il n'envoie pas de données dès qu'il reçoit un élément de l'un de ses flux source. Il attend que les éléments des deux flux sources arrivent, puis les combine à l'aide de la fonction de zipper fournie:


image


La signature zipWith semble effrayante, mais maintenant nous la regardons:


 // R :   Stream/Observable // S :   Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) 

Un exemple très simplifié:


 new Observable.just(1) // .just()  Observable,    .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); //  3 

Une application plus pratique consiste à attendre deux fonctions asynchrones qui renvoient Future et à traiter les données dès que les deux résultats sont renvoyés. Dans cet exemple légèrement artificiel, nous présentons deux API REST: l'une renvoie User , l'autre renvoie Product sous forme de chaînes JSON, et nous voulons attendre les deux appels avant de renvoyer l'objet Invoice .


 class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } //  HTTP ,  Product,  JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } //  HTTP ,  User,  JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); //        await Future.delayed(Duration(seconds: 5)); }); } 

En regardant la sortie, vous pouvez voir comment cela se fait de manière asynchrone


 Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99 

Combinelatest


combineLatest fusionne également les valeurs de flux, mais d'une manière légèrement différente de la merge et du zip . Il écoute plus de threads et émet une valeur combinée chaque fois qu'une nouvelle valeur arrive de l'un des threads. Il est intéressant de noter qu'il génère non seulement la valeur modifiée, mais également les dernières valeurs obtenues de tous les autres flux source. Regardez attentivement cette animation:


image


Avant que combineLates sa première valeur, tous les threads source doivent recevoir au moins une entrée.


Contrairement aux méthodes utilisées précédemment, combineLatest est statique. De plus, étant donné que Dart ne permet pas la surcharge des opérateurs, il existe des versions de combLastest fonction du nombre de flux source: combineLatest2 ... combineLatest9


combineLatest bonne utilisation, par exemple, si vous avez deux Observable<bool> qui signalent que certaines parties de votre application sont occupées et que vous souhaitez afficher le spinner Occupé si l'une d'entre elles est occupée. Cela pourrait ressembler à ceci (pseudo-code):


 class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; } 

Dans votre interface utilisateur, vous pouvez utiliser isBusy avec StreamBuilder pour afficher Spinner si la valeur résultante est vraie.


combineLatest très appropriée en combinaison avec les flux d'instantanés FireStore .


Imaginez que vous souhaitiez créer une application qui affiche un fil d'actualités avec des prévisions météorologiques. Les messages de téléscripteur et les données météorologiques sont stockés dans deux collections FireStore différentes. Les deux sont mis à jour indépendamment. Vous souhaitez afficher les mises à jour de données à l'aide de StreamBuilder. Avec combineLatest c'est facile:


 class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } } 

Dans votre interface utilisateur, cela ressemblerait à ceci: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


Distinct


Dans le scénario décrit ci-dessus, il peut arriver que isBusyOne et isBusyTwo donnent la même valeur, ce qui entraînera une mise à jour de l'interface utilisateur avec les mêmes données. Pour éviter cela, nous pouvons utiliser .distinct() . Il garantit que les données ne sont diffusées que si la valeur du nouvel élément est différente de la dernière. Ainsi, nous changerions le code en:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct(); 

et cela démontre également que nous pouvons combiner nos fonctions dans différentes chaînes à volonté.


Asyncmap


En plus de map() il existe également une fonction asyncMap , qui vous permet d'utiliser une fonction asynchrone comme fonction de carte. Introduisons un paramètre légèrement différent pour notre exemple FireStore. Désormais, le WeatherForecast nécessaire dépend de l'emplacement de NewsMessage et ne doit être mis à jour que lorsqu'un nouveau NewsMessage est reçu :


 Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); } 

L'observable renvoyé par getDependendMessages générera un nouveau CombinedMessage chaque fois que newsCollection change.


Debug Observables


En regardant les élégantes chaînes d'appel Rx, il semble presque impossible de déboguer une expression comme celle-ci:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); 

Mais gardez à l'esprit que => n'est qu'une forme abrégée pour une fonction anonyme. En utilisant Convertir pour bloquer le corps , vous obtiendrez:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); 

Et maintenant, nous pouvons définir un point d'arrêt ou ajouter des instructions d'impression à chaque étape de notre pipeline.


Méfiez-vous des effets secondaires


Si vous souhaitez capitaliser sur Rx pour rendre votre code plus robuste, gardez toujours à l'esprit que Rx est une transformation de données lorsqu'il se déplace "le long de la bande transporteuse". Par conséquent, n'appelez jamais de fonctions qui modifient des variables / états en dehors du pipeline de traitement jusqu'à ce que vous atteigniez la fonction .listen.
Au lieu de le faire:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen(); 

faites ceci:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); }); 

Le devoir de map() est de transformer les données dans le flux, ET RIEN DE PLUS! Si la fonction d'affichage passée fait autre chose, elle sera considérée comme un effet secondaire, générant des erreurs potentielles difficiles à détecter lors de la lecture du code.


Quelques réflexions sur la libération des ressources


Pour éviter les fuites de mémoire, appelez toujours cancel() pour les abonnements, dispose() pour StreamControllers, close() pour Subjects, dès que vous n'en avez plus besoin.


Conclusion


Félicitations si vous êtes resté avec moi jusqu'à ce moment. Maintenant, vous pouvez non seulement utiliser Rx pour vous simplifier la vie, mais aussi préparer les prochains articles dans lesquels nous nous pencherons sur les détails de RxVMS .

Source: https://habr.com/ru/post/fr451292/


All Articles