Bienvenue - Ceci est la troisième partie de ma série d'articles Flutter Architecture.
Cette fois, nous allons faire une petite plongée dans le domaine magique des extensions réactives (Rx). Je vais me concentrer sur les fonctions Rx les plus utilisées et expliquer leur application. Si vous n'avez pas lu le post précédent, c'est le moment de le faire avant de continuer.
RxDart est une implémentation du concept Rx pour le langage Dart, merci à Frank Pepermans et Brian Egan pour cela . Si vous avez déjà utilisé Rx dans d'autres langues, vous remarquerez probablement une différence dans la dénomination d'un certain nombre de fonctions, mais il est peu probable que cela vous cause des difficultés.
Le code de test est ici .
Jusqu'à présent, nous avons utilisé les flux comme moyen de transférer des données d'un endroit à un autre dans notre application, mais ils peuvent faire beaucoup plus. Jetons un coup d'œil à certaines des fonctionnalités que Rx ajoute aux Streams.
Créer des observables
Comme indiqué précédemment , les observables sont des versions Rx de flux avec de grandes fonctionnalités. Il existe plusieurs façons intéressantes de les créer:
Hors flux
N'importe quel Stream peut être converti en Observable en le passant au constructeur:
var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print);
Evènements récurrents
var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print);
De cette façon, un observable sera construit qui affiche des valeurs avec une période spécifique. Vous pouvez donc remplacer la minuterie.
D'une seule valeur
Parfois, une API attend un flux / observable où vous avez juste une valeur. Pour de tels cas, l'Observable a une usine.
var justObservable = Observable<int>.just(42); justObservable.listen(print);
Du futur
Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print);
La création d'un Observable
partir du futur attendra la fin du futur et renverra une valeur pour son résultat ou null
si la valeur n'est pas renvoyée. Une autre façon de créer un flux à partir de Future est d'appeler toStream()
pour n'importe quel Future.
Vous vous demandez peut-être quel est l'intérêt de convertir Future en Observable / Stream au lieu de simplement l'attendre. Rassurez-vous, cela deviendra clair lorsque nous examinerons les fonctions disponibles pour manipuler les données pendant qu'elles sont dans le flux.
Sujets
Subjects
remplacent StreamController
dans RxDart, et c'est ainsi qu'ils sont implémentés quelque part dans les entrailles de la bibliothèque.
Mais leur comportement est légèrement différent des StreamControllers de base:
- vous pouvez appliquer
listen()
directement à un sujet, sans accéder à la propriété Stream - n'importe quel nombre d'abonnements est disponible, et tous les auditeurs reçoivent les mêmes données en même temps
- Il existe trois variétés de sujets, qui sont expliqués ci-dessous avec des exemples:
PublishSubjects
PublishSubjects
se comportent comme StreamControllers
, sauf pour la possibilité de nombreux écouteurs:
var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1");
Exécutez ce code et vous obtiendrez:
Item1 ITEM2 Item2 ITEM3 Item3
Il est clair que le deuxième auditeur qui était en retard pour la fête (nous les appellerons abonnés tardifs) a raté le premier point. Pour éviter cela, vous pouvez utiliser BehaviourSubject
Sujet de comportement
Avec BehaviourSubject
chaque nouvel abonné recevra d'abord la dernière valeur acceptée:
var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3");
A la sortie
Item1 ITEM2 ITEM3 Item2 Item3
Vous pouvez voir que Item1
perdu pour le deuxième abonné, mais il reçoit Item2
. Vous pourriez être surpris que le deuxième abonné reçoive Item3
avant que le premier abonné ne reçoive Item2
. En effet, la séquence de maintenance des abonnés n'est pas garantie, bien que tous les abonnés reçoivent les données dans le bon ordre. BehaviourSubject
ne met en cache que le dernier élément reçu pour les abonnés en retard. Si vous devez mettre en cache plus d'éléments, vous pouvez utiliser ReplaySubject . Dans la plupart des cas, cela n'est pas nécessaire.
Manipulation des données à la volée

La véritable force de Rx réside dans le fait qu'il vous permet de traiter les données lors de la transmission sur le flux. Chacune des méthodes Rx renvoie un nouveau flux avec les données résultantes (comme dans l'illustration), ce qui signifie que vous pouvez les lier ensemble dans un pipeline de traitement, ce qui fait de Rx un outil extrêmement puissant.
La carte
S'il y a une opération Stream que je ne veux surtout pas manquer, alors c'est map()
. Ce que fait map()
c'est qu'il prend chaque élément de données à transférer et lui applique une certaine fonction, après quoi il met le résultat dans le flux résultant. Un exemple simple:

var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3");
Résultat:
ITEM1 ITEM2 ITEM3
Mais map
pas requis pour renvoyer le même type de données qu'il reçoit en entrée. L'exemple suivant prendra des entiers au lieu de chaînes. De plus, nous lierons deux transformations:
var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3);
ou quelque chose comme ça:

class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a));
L'une des utilisations les plus utiles de .map
est lorsque vous obtenez des données dans un format à partir d'une API REST ou d'une base de données et que vous souhaitez les convertir en vos propres objets:
class User { final String name; final String adress; final String phoneNumber; final int age;
Je note que non seulement Streams, mais aussi tout Iterable offre une fonction de map
que vous pouvez utiliser pour les transformations dans les listes.
O Where
Si vous êtes uniquement intéressé par certaines valeurs qui se produisent dans le flux, vous pouvez utiliser la fonction .where()
au lieu d'utiliser l' if
dans votre écouteur, c'est plus expressif et plus facile à lire:
var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3);
Debounce
C'est l'une des petites perles de Rx! Imaginez que vous ayez un champ de recherche qui appelle l'API REST si son texte est modifié. Faire un appel API pour chaque frappe coûte cher. Ainsi, vous souhaitez effectuer un appel uniquement si l'utilisateur fait une pause pendant un moment. Pour cela, la fonction debounce()
est utilisée, qui avale tous les événements entrants s'ils ne sont pas suivis d'une pause.
var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC");
Par conséquent, si vous convertissez le gestionnaire TextField.onChanged
en Observable
, vous obtiendrez une solution élégante.
Développer
Si votre flux source émet des tableaux d'objets et que vous souhaitez traiter chaque objet vous-même, vous pouvez utiliser .expand
, qui fera exactement cela:

Vous verrez l'application de cette méthode ci-dessous dans l'exemple FireStore.
Fusionner
Si vous avez plusieurs threads différents, mais que vous souhaitez traiter leurs objets ensemble, vous pouvez utiliser .mergeWith
(dans d'autres implémentations Rx, simplement merge
), qui prend un tableau de threads et renvoie un thread fusionné.

.mergeWith
ne garantit pas que tout ordre des flux soit fusionné. Les données sont émises dans l'ordre d'entrée.
Par exemple, si vous avez deux composants qui signalent des erreurs via un flux et que vous souhaitez les afficher ensemble dans une boîte de dialogue, vous pouvez procéder comme suit (pseudo-code):
@override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); }
ou si vous souhaitez un affichage combiné des messages de plusieurs réseaux sociaux, cela peut ressembler à ceci (pseudocode):
final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]);
Zipwith
zipWith
fusionne également un flux avec un autre. Mais, contrairement à .mergeWith
, il n'envoie pas de données dès qu'il reçoit un élément de l'un de ses flux source. Il attend que les éléments des deux flux sources arrivent, puis les combine à l'aide de la fonction de zipper
fournie:

La signature zipWith
semble effrayante, mais maintenant nous la regardons:
Un exemple très simplifié:
new Observable.just(1)
Une application plus pratique consiste à attendre deux fonctions asynchrones qui renvoient Future
et à traiter les données dès que les deux résultats sont renvoyés. Dans cet exemple légèrement artificiel, nous présentons deux API REST: l'une renvoie User
, l'autre renvoie Product
sous forme de chaînes JSON, et nous voulons attendre les deux appels avant de renvoyer l'objet Invoice
.
class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } }
En regardant la sortie, vous pouvez voir comment cela se fait de manière asynchrone
Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99
Combinelatest
combineLatest
fusionne également les valeurs de flux, mais d'une manière légèrement différente de la merge
et du zip
. Il écoute plus de threads et émet une valeur combinée chaque fois qu'une nouvelle valeur arrive de l'un des threads. Il est intéressant de noter qu'il génère non seulement la valeur modifiée, mais également les dernières valeurs obtenues de tous les autres flux source. Regardez attentivement cette animation:

Avant que combineLates
sa première valeur, tous les threads source doivent recevoir au moins une entrée.
Contrairement aux méthodes utilisées précédemment, combineLatest
est statique. De plus, étant donné que Dart ne permet pas la surcharge des opérateurs, il existe des versions de combLastest
fonction du nombre de flux source: combineLatest2 ... combineLatest9
combineLatest
bonne utilisation, par exemple, si vous avez deux Observable<bool>
qui signalent que certaines parties de votre application sont occupées et que vous souhaitez afficher le spinner Occupé si l'une d'entre elles est occupée. Cela pourrait ressembler à ceci (pseudo-code):
class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; }
Dans votre interface utilisateur, vous pouvez utiliser isBusy
avec StreamBuilder
pour afficher Spinner
si la valeur résultante est vraie.
combineLatest
très appropriée en combinaison avec les flux d'instantanés FireStore .
Imaginez que vous souhaitiez créer une application qui affiche un fil d'actualités avec des prévisions météorologiques. Les messages de téléscripteur et les données météorologiques sont stockés dans deux collections FireStore différentes. Les deux sont mis à jour indépendamment. Vous souhaitez afficher les mises à jour de données à l'aide de StreamBuilder. Avec combineLatest
c'est facile:
class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } }
Dans votre interface utilisateur, cela ressemblerait à ceci: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).
Distinct
Dans le scénario décrit ci-dessus, il peut arriver que isBusyOne et isBusyTwo donnent la même valeur, ce qui entraînera une mise à jour de l'interface utilisateur avec les mêmes données. Pour éviter cela, nous pouvons utiliser .distinct()
. Il garantit que les données ne sont diffusées que si la valeur du nouvel élément est différente de la dernière. Ainsi, nous changerions le code en:
Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
et cela démontre également que nous pouvons combiner nos fonctions dans différentes chaînes à volonté.
Asyncmap
En plus de map()
il existe également une fonction asyncMap
, qui vous permet d'utiliser une fonction asynchrone comme fonction de carte. Introduisons un paramètre légèrement différent pour notre exemple FireStore. Désormais, le WeatherForecast nécessaire dépend de l'emplacement de NewsMessage et ne doit être mis à jour que lorsqu'un nouveau NewsMessage est reçu :
Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); }
L'observable renvoyé par getDependendMessages générera un nouveau CombinedMessage chaque fois que newsCollection change.
Debug Observables
En regardant les élégantes chaînes d'appel Rx, il semble presque impossible de déboguer une expression comme celle-ci:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
Mais gardez à l'esprit que =>
n'est qu'une forme abrégée pour une fonction anonyme. En utilisant Convertir pour bloquer le corps , vous obtiendrez:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); });
Et maintenant, nous pouvons définir un point d'arrêt ou ajouter des instructions d'impression à chaque étape de notre pipeline.
Méfiez-vous des effets secondaires
Si vous souhaitez capitaliser sur Rx pour rendre votre code plus robuste, gardez toujours à l'esprit que Rx est une transformation de données lorsqu'il se déplace "le long de la bande transporteuse". Par conséquent, n'appelez jamais de fonctions qui modifient des variables / états en dehors du pipeline de traitement jusqu'à ce que vous atteigniez la fonction .listen.
Au lieu de le faire:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen();
faites ceci:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); });
Le devoir de map()
est de transformer les données dans le flux, ET RIEN DE PLUS! Si la fonction d'affichage passée fait autre chose, elle sera considérée comme un effet secondaire, générant des erreurs potentielles difficiles à détecter lors de la lecture du code.
Quelques réflexions sur la libération des ressources
Pour éviter les fuites de mémoire, appelez toujours cancel()
pour les abonnements, dispose()
pour StreamControllers, close()
pour Subjects, dès que vous n'en avez plus besoin.
Conclusion
Félicitations si vous êtes resté avec moi jusqu'à ce moment. Maintenant, vous pouvez non seulement utiliser Rx pour vous simplifier la vie, mais aussi préparer les prochains articles dans lesquels nous nous pencherons sur les détails de RxVMS .