Willkommen - Dies ist der dritte Teil meiner Artikelserie Flutter Architecture.
Dieses Mal werden wir einen kleinen Sprung in das magische Reich der reaktiven Erweiterungen (Rx) machen. Ich werde mich auf die am häufigsten verwendeten Rx-Funktionen konzentrieren und deren Anwendung erläutern. Wenn Sie den vorherigen Beitrag nicht gelesen haben, ist jetzt die Zeit dafür, bevor Sie fortfahren.
RxDart ist eine Implementierung des Rx-Konzepts für die Dart-Sprache, dank Frank Pepermans und Brian Egan dafür . Wenn Sie Rx zuvor in anderen Sprachen verwendet haben, werden Sie wahrscheinlich einen Unterschied in der Benennung einer Reihe von Funktionen bemerken, aber dies wird Ihnen wahrscheinlich keine Schwierigkeiten bereiten.
Der Code zum Testen ist hier .
Bisher haben wir Streams verwendet, um Daten in unserer Anwendung von einem Ort zum anderen zu übertragen, aber sie können noch viel mehr. Werfen wir einen Blick auf einige der Funktionen, die Rx Streams hinzufügt.
Observables erstellen
Wie bereits erwähnt , handelt es sich bei Observables um Rx-Versionen von Streams mit hervorragenden Funktionen. Es gibt verschiedene interessante Möglichkeiten, sie zu erstellen:
Aus dem Strom
Jeder Stream kann in Observable konvertiert werden, indem er an den Konstruktor übergeben wird:
var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print);
Wiederkehrende Ereignisse
var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print);
Auf diese Weise wird ein Observable erstellt, das Werte mit einem bestimmten Zeitraum anzeigt. So können Sie den Timer ersetzen.
Aus einem einzigen Wert
Manchmal erwartet eine API einen Stream / Observable, in dem Sie nur einen Wert haben. In solchen Fällen verfügt das Observable über eine Fabrik.
var justObservable = Observable<int>.just(42); justObservable.listen(print);
Aus der Zukunft
Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print);
Das Erstellen eines Observable
from Future wartet darauf, dass Future abgeschlossen ist, und gibt einen Wert für das Ergebnis oder null
wenn der Wert nicht zurückgegeben wird. Eine andere Möglichkeit, einen Stream aus Future zu erstellen, besteht darin, toStream()
für jede Zukunft toStream()
.
Sie fragen sich vielleicht, wozu Future in ein Observable / Stream konvertiert werden soll, anstatt nur darauf zu warten. Seien Sie versichert, dies wird deutlich, wenn wir die verfügbaren Funktionen zum Bearbeiten von Daten untersuchen, während sie sich im Stream befinden.
Themen
Subjects
sind ein Ersatz für StreamController
in RxDart, und so werden sie irgendwo im Darm der Bibliothek implementiert.
Ihr Verhalten unterscheidet sich jedoch geringfügig von den grundlegenden StreamControllern:
- Sie können
listen()
direkt auf einen Betreff anwenden, ohne auf die Stream-Eigenschaft zuzugreifen - Es stehen beliebig viele Abonnements zur Verfügung, und alle Listener erhalten gleichzeitig dieselben Daten
- Es gibt drei Arten von Themen, die im Folgenden anhand von Beispielen erläutert werden:
PublishSubjects
PublishSubjects
verhalten sich wie StreamControllers
, mit Ausnahme der Möglichkeit vieler Listener:
var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1");
Führen Sie diesen Code aus und Sie erhalten:
Item1 ITEM2 Item2 ITEM3 Item3
Es ist klar, dass der zweite Zuhörer, der zu spät zur Party kam (wir werden sie als späte Abonnenten bezeichnen), den ersten Punkt verpasst hat. Um dies zu vermeiden, können Sie BehaviourSubject
Verhaltensgegenstand
Mit BehaviourSubject
erhält jeder neue Abonnent zuerst den zuletzt akzeptierten Wert:
var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3");
Am Ausgang
Item1 ITEM2 ITEM3 Item2 Item3
Sie können sehen, dass Item1
für den zweiten Abonnenten verloren geht, aber Item2
empfängt. Sie werden überrascht sein, dass der zweite Teilnehmer Item3
erhält, bevor der erste Teilnehmer Item2
erhält. Dies liegt daran, dass die Reihenfolge der Serviceteilnehmer nicht garantiert ist, obwohl alle Teilnehmer Daten in der richtigen Reihenfolge empfangen. BehaviourSubject
nur das zuletzt für verspätete Abonnenten empfangene Element zwischen. Wenn Sie mehr Elemente zwischenspeichern müssen, können Sie ReplaySubject verwenden . In den meisten Fällen ist dies nicht erforderlich.
Daten im laufenden Betrieb bearbeiten

Die wahre Stärke von Rx liegt in der Tatsache, dass Sie Daten während der Übertragung über den Stream verarbeiten können. Jede der Rx-Methoden gibt einen neuen Stream mit den resultierenden Daten zurück (wie in der Abbildung). Dies bedeutet, dass Sie sie in einer Verarbeitungspipeline zusammenbinden können. Dies macht Rx zu einem äußerst leistungsstarken Tool.
Karte
Wenn es einen Stream-Vorgang gibt, den ich am meisten nicht missen möchte, dann ist dies map()
. map()
verwendet jedes zu übertragende Datenelement und wendet eine bestimmte Funktion darauf an. Anschließend wird das Ergebnis in den resultierenden Stream eingefügt. Ein einfaches Beispiel:

var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3");
Ergebnis:
ITEM1 ITEM2 ITEM3
Die map
nicht denselben Datentyp zurückgeben, den sie als Eingabe empfängt. Im folgenden Beispiel werden Ganzzahlen anstelle von Zeichenfolgen verwendet. Zusätzlich werden wir zwei Transformationen verknüpfen:
var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3);
oder so ähnlich:

class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a));
Eine der nützlichsten Anwendungen von .map
ist, wenn Sie Daten in einem Format von einer REST-API oder aus einer Datenbank .map
und möchten, dass sie in Ihre eigenen Objekte konvertiert werden:
class User { final String name; final String adress; final String phoneNumber; final int age;
Ich stelle fest, dass nicht nur Streams, sondern auch jedes Iterable eine Kartenfunktion bietet, die Sie für Transformationen in Listen verwenden können.
Wo
Wenn Sie nur an bestimmten Werten interessiert sind, die im Stream vorkommen, können Sie die Funktion .where()
anstelle der if
in Ihrem Listener verwenden. Dies ist aussagekräftiger und leichter zu lesen:
var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3);
Entprellen
Dies ist eine der kleinen Perlen von Rx! Stellen Sie sich vor, Sie haben ein Suchfeld, das die REST-API aufruft, wenn ihr Text geändert wird. Ein API-Aufruf für jeden Tastendruck ist teuer. Daher möchten Sie nur dann einen Anruf tätigen, wenn der Benutzer einen Moment innehält. Hierzu wird die Funktion debounce()
verwendet, die alle eingehenden Ereignisse verschluckt, wenn ihnen keine Pause folgt.
var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC");
Wenn Sie den TextField.onChanged
Handler in Observable
konvertieren, erhalten Sie daher eine elegante Lösung.
Erweitern
Wenn Ihr Quell-Stream Arrays von Objekten .expand
und Sie jedes Objekt selbst verarbeiten möchten, können Sie .expand
, um genau das zu tun:

Die Anwendung dieser Methode sehen Sie unten im FireStore-Beispiel.
Zusammenführen
Wenn Sie mehrere verschiedene Threads haben, aber deren Objekte zusammen verarbeiten möchten, können Sie .mergeWith
(in anderen Rx-Implementierungen nur merge
) verwenden, das ein Array von Threads verwendet und einen zusammengeführten Thread zurückgibt.

.mergeWith
garantiert nicht, dass eine Bestellung in den Streams zusammengeführt wird. Die Daten werden in der Eingabereihenfolge ausgegeben.
Wenn Sie beispielsweise zwei Komponenten haben, die Fehler über den Stream melden, und möchten, dass sie zusammen in einem Dialogfeld angezeigt werden, können Sie dies wie folgt tun (Pseudocode):
@override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); }
oder wenn Sie eine kombinierte Anzeige von Nachrichten aus mehreren sozialen Netzwerken wünschen, kann dies folgendermaßen aussehen (Pseudocode):
final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]);
Reißverschluss
zipWith
auch einen Stream mit einem anderen zusammen. Im Gegensatz zu .mergeWith
es jedoch keine Daten, sobald es ein Element aus einem seiner Quelldatenströme empfängt. Er wartet, bis die Elemente aus beiden Quelldatenströmen eintreffen, und kombiniert sie dann mithilfe der bereitgestellten zipper
:

Die zipWith
Signatur sieht beängstigend aus, aber jetzt sehen wir sie uns an:
Ein sehr vereinfachtes Beispiel:
new Observable.just(1)
Eine praktischere Anwendung ist, wenn Sie auf zwei asynchrone Funktionen warten müssen, die Future
, und die Daten verarbeiten möchten, sobald beide Ergebnisse zurückgegeben werden. In diesem leicht erfundenen Beispiel stellen wir zwei REST-APIs vor: eine gibt User
, die andere gibt Product
als JSON-Zeichenfolgen zurück und wir möchten auf beide Aufrufe warten, bevor wir das Invoice
Objekt zurückgeben.
class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } }
Wenn Sie sich die Ausgabe ansehen, können Sie sehen, wie dies asynchron erfolgt
Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99
Kombiniertest
combineLatest
auch Stream-Werte zusammen, jedoch auf etwas andere Weise als merge
und zip
. Es wartet auf weitere Threads und gibt einen kombinierten Wert aus, wenn ein neuer Wert von einem der Threads eingeht. Es ist interessant, dass nicht nur der geänderte Wert, sondern auch die zuletzt erhaltenen Werte aller anderen Quelldatenströme generiert werden. Schauen Sie sich diese Animation genau an:

Bevor combineLates
seinen ersten Wert combineLates
, müssen alle Quell-Threads mindestens einen Eintrag erhalten.
Im Gegensatz zu den zuvor verwendeten Methoden ist combineLatest
statisch. Da Dart kein Überladen von Operatoren zulässt, gibt es außerdem Versionen von combLastest
von der Anzahl der Quelldatenströme abhängen : combinLatest2 ... kombinierenLatest9
combineLatest
gute Verwendung, wenn Sie beispielsweise zwei Observable<bool>
, die signalisieren, dass einige Teile Ihrer Anwendung ausgelastet sind, und Sie den Busy-Spinner anzeigen möchten, wenn einer von ihnen beschäftigt ist. Es könnte so aussehen (Pseudocode):
class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; }
In Ihrer Benutzeroberfläche können Sie isBusy
mit StreamBuilder
, um Spinner
anzuzeigen, wenn der resultierende Wert wahr ist.
combineLatest
sehr geeignete Funktion in Kombination mit FireStore-Snapshots- Streams.
Stellen Sie sich vor, Sie möchten eine Anwendung erstellen, die einen Newsfeed zusammen mit einer Wettervorhersage anzeigt. Tickernachrichten und Wetterdaten werden in zwei verschiedenen FireStore-Sammlungen gespeichert. Beide werden unabhängig voneinander aktualisiert. Sie möchten Datenaktualisierungen mit StreamBuilder anzeigen. Mit combineLatest
es ganz einfach:
class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } }
In Ihrer Benutzeroberfläche würde es StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).
so aussehen: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).
Deutlich
In dem oben beschriebenen Szenario kann es vorkommen, dass isBusyOne und isBusyTwo denselben Wert angeben , was zu einer Aktualisierung der Benutzeroberfläche mit denselben Daten führt. Um dies zu verhindern, können wir .distinct()
. Es stellt sicher, dass Daten nur gestreamt werden, wenn sich der Wert des neuen Elements vom letzten unterscheidet. Daher würden wir den Code ändern in:
Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
und es zeigt auch, dass wir unsere Funktionen nach Belieben in verschiedenen Ketten kombinieren können.
Asyncmap
Neben map()
gibt es auch eine asyncMap
Funktion, mit der Sie eine asynchrone Funktion als Map-Funktion verwenden können. Lassen Sie uns eine etwas andere Einstellung für unser FireStore-Beispiel einführen. Jetzt hängt der erforderliche WeatherForecast vom Standort von NewsMessage ab und sollte nur aktualisiert werden, wenn eine neue NewsMessage empfangen wird :
Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); }
Das von getDependendMessages zurückgegebene Observable generiert bei jeder Änderung von newsCollection eine neue CombinedMessage.
Debuggen von Observables
Wenn man sich die eleganten Rx-Aufrufketten ansieht, scheint es fast unmöglich, einen Ausdruck wie diesen zu debuggen:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
Beachten Sie jedoch, dass =>
nur eine Kurzform für eine anonyme Funktion ist. Wenn Sie Konvertieren verwenden, um den Körper zu blockieren , erhalten Sie:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); });
Und jetzt können wir in jedem Schritt unserer Pipeline einen Haltepunkt setzen oder Druckanweisungen hinzufügen.
Vorsicht vor Nebenwirkungen
Wenn Sie Rx nutzen möchten, um Ihren Code robuster zu machen, denken Sie immer daran, dass Rx eine Datenkonvertierung ist, wenn Sie es "entlang des Förderbandes" bewegen. Rufen Sie daher niemals Funktionen auf, die Variablen / Zustände außerhalb der Verarbeitungspipeline ändern, bis Sie die Funktion .listen erreichen.
Anstatt dies zu tun:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen();
mach das:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); });
Die Aufgabe von map()
ist es, die Daten im Stream zu transformieren und NICHTS MEHR! Wenn die übergebene Anzeigefunktion etwas anderes bewirkt, wird dies als Nebeneffekt betrachtet, der potenzielle Fehler erzeugt, die beim Lesen des Codes schwer zu erkennen sind.
Einige Gedanken zur Ressourcenfreigabe
Um Speicherverluste zu vermeiden, rufen Sie bei Abonnements immer cancel()
auf, dispose dispose()
für StreamController, close()
für Subjects, sobald Sie diese nicht mehr benötigen.
Fazit
Herzlichen Glückwunsch, wenn Sie bis zu diesem Moment bei mir geblieben sind. Jetzt können Sie Rx nicht nur verwenden, um Ihr Leben zu erleichtern, sondern sich auch auf die nächsten Beiträge vorbereiten, in denen wir uns mit den Details von RxVMS befassen .