RxDart: magische Transformationen von Flüssen

Willkommen - Dies ist der dritte Teil meiner Artikelserie Flutter Architecture.



Dieses Mal werden wir einen kleinen Sprung in das magische Reich der reaktiven Erweiterungen (Rx) machen. Ich werde mich auf die am häufigsten verwendeten Rx-Funktionen konzentrieren und deren Anwendung erläutern. Wenn Sie den vorherigen Beitrag nicht gelesen haben, ist jetzt die Zeit dafür, bevor Sie fortfahren.


RxDart ist eine Implementierung des Rx-Konzepts für die Dart-Sprache, dank Frank Pepermans und Brian Egan dafür . Wenn Sie Rx zuvor in anderen Sprachen verwendet haben, werden Sie wahrscheinlich einen Unterschied in der Benennung einer Reihe von Funktionen bemerken, aber dies wird Ihnen wahrscheinlich keine Schwierigkeiten bereiten.


Der Code zum Testen ist hier .


Bisher haben wir Streams verwendet, um Daten in unserer Anwendung von einem Ort zum anderen zu übertragen, aber sie können noch viel mehr. Werfen wir einen Blick auf einige der Funktionen, die Rx Streams hinzufügt.


Observables erstellen


Wie bereits erwähnt , handelt es sich bei Observables um Rx-Versionen von Streams mit hervorragenden Funktionen. Es gibt verschiedene interessante Möglichkeiten, sie zu erstellen:


Aus dem Strom


Jeder Stream kann in Observable konvertiert werden, indem er an den Konstruktor übergeben wird:


var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print); 

Wiederkehrende Ereignisse


 var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print); 

Auf diese Weise wird ein Observable erstellt, das Werte mit einem bestimmten Zeitraum anzeigt. So können Sie den Timer ersetzen.


Aus einem einzigen Wert


Manchmal erwartet eine API einen Stream / Observable, in dem Sie nur einen Wert haben. In solchen Fällen verfügt das Observable über eine Fabrik.


 var justObservable = Observable<int>.just(42); justObservable.listen(print); //   : 42 

Aus der Zukunft


  Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print); 

Das Erstellen eines Observable from Future wartet darauf, dass Future abgeschlossen ist, und gibt einen Wert für das Ergebnis oder null wenn der Wert nicht zurückgegeben wird. Eine andere Möglichkeit, einen Stream aus Future zu erstellen, besteht darin, toStream() für jede Zukunft toStream() .


Sie fragen sich vielleicht, wozu Future in ein Observable / Stream konvertiert werden soll, anstatt nur darauf zu warten. Seien Sie versichert, dies wird deutlich, wenn wir die verfügbaren Funktionen zum Bearbeiten von Daten untersuchen, während sie sich im Stream befinden.


Themen


Subjects sind ein Ersatz für StreamController in RxDart, und so werden sie irgendwo im Darm der Bibliothek implementiert.


Ihr Verhalten unterscheidet sich jedoch geringfügig von den grundlegenden StreamControllern:


  • Sie können listen() direkt auf einen Betreff anwenden, ohne auf die Stream-Eigenschaft zuzugreifen
  • Es stehen beliebig viele Abonnements zur Verfügung, und alle Listener erhalten gleichzeitig dieselben Daten
  • Es gibt drei Arten von Themen, die im Folgenden anhand von Beispielen erläutert werden:

PublishSubjects


PublishSubjects verhalten sich wie StreamControllers , mit Ausnahme der Möglichkeit vieler Listener:


 var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); //    subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); //        await Future.delayed(Duration(seconds: 5)); //    subject.close; 

Führen Sie diesen Code aus und Sie erhalten:


 Item1 ITEM2 Item2 ITEM3 Item3 

Es ist klar, dass der zweite Zuhörer, der zu spät zur Party kam (wir werden sie als späte Abonnenten bezeichnen), den ersten Punkt verpasst hat. Um dies zu vermeiden, können Sie BehaviourSubject


Verhaltensgegenstand


Mit BehaviourSubject erhält jeder neue Abonnent zuerst den zuletzt akzeptierten Wert:


 var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3"); 

Am Ausgang


 Item1 ITEM2 ITEM3 Item2 Item3 

Sie können sehen, dass Item1 für den zweiten Abonnenten verloren geht, aber Item2 empfängt. Sie werden überrascht sein, dass der zweite Teilnehmer Item3 erhält, bevor der erste Teilnehmer Item2 erhält. Dies liegt daran, dass die Reihenfolge der Serviceteilnehmer nicht garantiert ist, obwohl alle Teilnehmer Daten in der richtigen Reihenfolge empfangen. BehaviourSubject nur das zuletzt für verspätete Abonnenten empfangene Element zwischen. Wenn Sie mehr Elemente zwischenspeichern müssen, können Sie ReplaySubject verwenden . In den meisten Fällen ist dies nicht erforderlich.


Daten im laufenden Betrieb bearbeiten



Die wahre Stärke von Rx liegt in der Tatsache, dass Sie Daten während der Übertragung über den Stream verarbeiten können. Jede der Rx-Methoden gibt einen neuen Stream mit den resultierenden Daten zurück (wie in der Abbildung). Dies bedeutet, dass Sie sie in einer Verarbeitungspipeline zusammenbinden können. Dies macht Rx zu einem äußerst leistungsstarken Tool.


Karte


Wenn es einen Stream-Vorgang gibt, den ich am meisten nicht missen möchte, dann ist dies map() . map() verwendet jedes zu übertragende Datenelement und wendet eine bestimmte Funktion darauf an. Anschließend wird das Ergebnis in den resultierenden Stream eingefügt. Ein einfaches Beispiel:



 var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3"); 

Ergebnis:


 ITEM1 ITEM2 ITEM3 

Die map nicht denselben Datentyp zurückgeben, den sie als Eingabe empfängt. Im folgenden Beispiel werden Ganzzahlen anstelle von Zeichenfolgen verwendet. Zusätzlich werden wir zwei Transformationen verknüpfen:


 var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3); 

oder so ähnlich:



 class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a)); 

Eine der nützlichsten Anwendungen von .map ist, wenn Sie Daten in einem Format von einer REST-API oder aus einer Datenbank .map und möchten, dass sie in Ihre eigenen Objekte konvertiert werden:


 class User { final String name; final String adress; final String phoneNumber; final int age; //       - //   factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // -  var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; //   json-,    API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); //    dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); }); 

Ich stelle fest, dass nicht nur Streams, sondern auch jedes Iterable eine Kartenfunktion bietet, die Sie für Transformationen in Listen verwenden können.


Wo


Wenn Sie nur an bestimmten Werten interessiert sind, die im Stream vorkommen, können Sie die Funktion .where() anstelle der if in Ihrem Listener verwenden. Dies ist aussagekräftiger und leichter zu lesen:


 var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3 

Entprellen


Dies ist eine der kleinen Perlen von Rx! Stellen Sie sich vor, Sie haben ein Suchfeld, das die REST-API aufruft, wenn ihr Text geändert wird. Ein API-Aufruf für jeden Tastendruck ist teuer. Daher möchten Sie nur dann einen Anruf tätigen, wenn der Benutzer einen Moment innehält. Hierzu wird die Funktion debounce() verwendet, die alle eingehenden Ereignisse verschluckt, wenn ihnen keine Pause folgt.


 var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); //    await Future.delayed(Duration(milliseconds: 700)); //       : 'ABC' 

Wenn Sie den TextField.onChanged Handler in Observable konvertieren, erhalten Sie daher eine elegante Lösung.


Erweitern


Wenn Ihr Quell-Stream Arrays von Objekten .expand und Sie jedes Objekt selbst verarbeiten möchten, können Sie .expand , um genau das zu tun:


Bild


Die Anwendung dieser Methode sehen Sie unten im FireStore-Beispiel.


Zusammenführen


Wenn Sie mehrere verschiedene Threads haben, aber deren Objekte zusammen verarbeiten möchten, können Sie .mergeWith (in anderen Rx-Implementierungen nur merge ) verwenden, das ein Array von Threads verwendet und einen zusammengeführten Thread zurückgibt.


Bild


.mergeWith garantiert nicht, dass eine Bestellung in den Streams zusammengeführt wird. Die Daten werden in der Eingabereihenfolge ausgegeben.


Wenn Sie beispielsweise zwei Komponenten haben, die Fehler über den Stream melden, und möchten, dass sie zusammen in einem Dialogfeld angezeigt werden, können Sie dies wie folgt tun (Pseudocode):


 @override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); } 

oder wenn Sie eine kombinierte Anzeige von Nachrichten aus mehreren sozialen Netzwerken wünschen, kann dies folgendermaßen aussehen (Pseudocode):


 final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]); 

Reißverschluss


zipWith auch einen Stream mit einem anderen zusammen. Im Gegensatz zu .mergeWith es jedoch keine Daten, sobald es ein Element aus einem seiner Quelldatenströme empfängt. Er wartet, bis die Elemente aus beiden Quelldatenströmen eintreffen, und kombiniert sie dann mithilfe der bereitgestellten zipper :


Bild


Die zipWith Signatur sieht beängstigend aus, aber jetzt sehen wir sie uns an:


 // R :   Stream/Observable // S :   Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) 

Ein sehr vereinfachtes Beispiel:


 new Observable.just(1) // .just()  Observable,    .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); //  3 

Eine praktischere Anwendung ist, wenn Sie auf zwei asynchrone Funktionen warten müssen, die Future , und die Daten verarbeiten möchten, sobald beide Ergebnisse zurückgegeben werden. In diesem leicht erfundenen Beispiel stellen wir zwei REST-APIs vor: eine gibt User , die andere gibt Product als JSON-Zeichenfolgen zurück und wir möchten auf beide Aufrufe warten, bevor wir das Invoice Objekt zurückgeben.


 class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } //  HTTP ,  Product,  JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } //  HTTP ,  User,  JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); //        await Future.delayed(Duration(seconds: 5)); }); } 

Wenn Sie sich die Ausgabe ansehen, können Sie sehen, wie dies asynchron erfolgt


 Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99 

Kombiniertest


combineLatest auch Stream-Werte zusammen, jedoch auf etwas andere Weise als merge und zip . Es wartet auf weitere Threads und gibt einen kombinierten Wert aus, wenn ein neuer Wert von einem der Threads eingeht. Es ist interessant, dass nicht nur der geänderte Wert, sondern auch die zuletzt erhaltenen Werte aller anderen Quelldatenströme generiert werden. Schauen Sie sich diese Animation genau an:


Bild


Bevor combineLates seinen ersten Wert combineLates , müssen alle Quell-Threads mindestens einen Eintrag erhalten.


Im Gegensatz zu den zuvor verwendeten Methoden ist combineLatest statisch. Da Dart kein Überladen von Operatoren zulässt, gibt es außerdem Versionen von combLastest von der Anzahl der Quelldatenströme abhängen : combinLatest2 ... kombinierenLatest9


combineLatest gute Verwendung, wenn Sie beispielsweise zwei Observable<bool> , die signalisieren, dass einige Teile Ihrer Anwendung ausgelastet sind, und Sie den Busy-Spinner anzeigen möchten, wenn einer von ihnen beschäftigt ist. Es könnte so aussehen (Pseudocode):


 class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; } 

In Ihrer Benutzeroberfläche können Sie isBusy mit StreamBuilder , um Spinner anzuzeigen, wenn der resultierende Wert wahr ist.


combineLatest sehr geeignete Funktion in Kombination mit FireStore-Snapshots- Streams.


Stellen Sie sich vor, Sie möchten eine Anwendung erstellen, die einen Newsfeed zusammen mit einer Wettervorhersage anzeigt. Tickernachrichten und Wetterdaten werden in zwei verschiedenen FireStore-Sammlungen gespeichert. Beide werden unabhängig voneinander aktualisiert. Sie möchten Datenaktualisierungen mit StreamBuilder anzeigen. Mit combineLatest es ganz einfach:


 class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } } 

In Ihrer Benutzeroberfläche würde es StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...). so aussehen: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


Deutlich


In dem oben beschriebenen Szenario kann es vorkommen, dass isBusyOne und isBusyTwo denselben Wert angeben , was zu einer Aktualisierung der Benutzeroberfläche mit denselben Daten führt. Um dies zu verhindern, können wir .distinct() . Es stellt sicher, dass Daten nur gestreamt werden, wenn sich der Wert des neuen Elements vom letzten unterscheidet. Daher würden wir den Code ändern in:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct(); 

und es zeigt auch, dass wir unsere Funktionen nach Belieben in verschiedenen Ketten kombinieren können.


Asyncmap


Neben map() gibt es auch eine asyncMap Funktion, mit der Sie eine asynchrone Funktion als Map-Funktion verwenden können. Lassen Sie uns eine etwas andere Einstellung für unser FireStore-Beispiel einführen. Jetzt hängt der erforderliche WeatherForecast vom Standort von NewsMessage ab und sollte nur aktualisiert werden, wenn eine neue NewsMessage empfangen wird :


 Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); } 

Das von getDependendMessages zurückgegebene Observable generiert bei jeder Änderung von newsCollection eine neue CombinedMessage.


Debuggen von Observables


Wenn man sich die eleganten Rx-Aufrufketten ansieht, scheint es fast unmöglich, einen Ausdruck wie diesen zu debuggen:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); 

Beachten Sie jedoch, dass => nur eine Kurzform für eine anonyme Funktion ist. Wenn Sie Konvertieren verwenden, um den Körper zu blockieren , erhalten Sie:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); 

Und jetzt können wir in jedem Schritt unserer Pipeline einen Haltepunkt setzen oder Druckanweisungen hinzufügen.


Vorsicht vor Nebenwirkungen


Wenn Sie Rx nutzen möchten, um Ihren Code robuster zu machen, denken Sie immer daran, dass Rx eine Datenkonvertierung ist, wenn Sie es "entlang des Förderbandes" bewegen. Rufen Sie daher niemals Funktionen auf, die Variablen / Zustände außerhalb der Verarbeitungspipeline ändern, bis Sie die Funktion .listen erreichen.
Anstatt dies zu tun:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen(); 

mach das:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); }); 

Die Aufgabe von map() ist es, die Daten im Stream zu transformieren und NICHTS MEHR! Wenn die übergebene Anzeigefunktion etwas anderes bewirkt, wird dies als Nebeneffekt betrachtet, der potenzielle Fehler erzeugt, die beim Lesen des Codes schwer zu erkennen sind.


Einige Gedanken zur Ressourcenfreigabe


Um Speicherverluste zu vermeiden, rufen Sie bei Abonnements immer cancel() auf, dispose dispose() für StreamController, close() für Subjects, sobald Sie diese nicht mehr benötigen.


Fazit


Herzlichen Glückwunsch, wenn Sie bis zu diesem Moment bei mir geblieben sind. Jetzt können Sie Rx nicht nur verwenden, um Ihr Leben zu erleichtern, sondern sich auch auf die nächsten Beiträge vorbereiten, in denen wir uns mit den Details von RxVMS befassen .

Source: https://habr.com/ru/post/de451292/


All Articles