RxDart:流程的神奇转变

欢迎使用-这是我的Flutter Architecture文章系列的第三部分。



这次,我们将深入探讨反应式扩展(Rx)的神奇领域。 我将重点介绍最常用的Rx函数并解释其应用。 如果您还没有阅读上一篇文章,那么现在是进行下一步的时间。


RxDart是Dart语言Rx概念的实现,这要归功于Frank PepermansBrian Egan 。 如果您以前以其他语言使用过Rx,则可能会注意到许多功能的命名有所不同,但这不太可能给您带来任何困难。


测试代码在这里


到目前为止,我们已经使用流作为一种将数据从应用程序中的一个地方传输到另一个地方的方法,但是它们可以做得更多。 让我们看一下Rx添加到Streams的一些功能。


创建可观察物


如前所述 ,Observables是具有出色功能的流的Rx版本。 有几种有趣的创建方式:


流媒体播放


通过将任何Stream传递给构造函数,可以将其转换为Observable:


var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print); 

周期性事件


 var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print); 

这样,将构造一个Observable,以显示具有特定时间段的值。 因此,您可以更换计时器。


从单个值


有时,API希望在您仅有值的地方使用Stream / Observable。 对于这种情况,Observable有一个工厂。


 var justObservable = Observable<int>.just(42); justObservable.listen(print); //   : 42 

从未来


  Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print); 

Future创建Observable将等待Future完成并为其结果返回一个值;如果未返回该值,则返回null 。 从Future创建流的另一种方法是为任何Future调用toStream()


您可能想知道将Future转换为Observable / Stream而不是仅仅等待它的意义何在。 请放心,当我们检查可用于处理数据流中的可用函数时,这一点将变得很清楚。


科目


Subjects替代了RxDart中的StreamController ,这就是在库的某个地方实现这些Subjects方式。


但是它们的行为与基本的StreamControllers略有不同:


  • 您可以直接将listen()直接应用于主题,而无需访问Stream属性
  • 任意数量的订阅可用,并且所有侦听器同时接收相同的数据
  • 共有三种主题,下面通过示例进行说明:

发布主题


PublishSubjects行为类似于StreamControllers ,但可能存在许多侦听器:


 var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); //    subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); //        await Future.delayed(Duration(seconds: 5)); //    subject.close; 

运行此代码,您将获得:


 Item1 ITEM2 Item2 ITEM3 Item3 

显然,参加聚会迟到的第二位听众(我们称其为“晚订户”)错过了第一点。 为了避免这种情况,您可以使用BehaviourSubject


行为对象


使用BehaviourSubject每个新订户将首先收到最后接受的值:


 var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3"); 

在出口处


 Item1 ITEM2 ITEM3 Item2 Item3 

您可以看到第二个订户丢失了Item1 ,但是它收到了Item2 。 您可能会惊讶于第二个订阅者在第一个订阅者接收到Item2之前接收到Item3 。 这是因为尽管所有用户都以正确的顺序接收数据,但是不能保证服务用户的顺序。 BehaviourSubject仅缓存为后期订阅者收到的最后一个项目。 如果需要缓存更多元素,则可以使用ReplaySubject 。 在大多数情况下,这不是必需的。


即时处理数据



Rx的真正优势在于,它允许您在流传输期间处理数据。 每个Rx方法都返回一个带有结果数据的新流(如图所示),这意味着您可以在一个处理管道中将它们绑定在一起,这使Rx成为功能非常强大的工具。


地图


如果我不想错过任何Stream操作,那就是map()map()所做的是,它接收要传输的每个数据项并对其应用特定的功能,然后将结果放入结果流中。 一个简单的例子:



 var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3"); 

结果:


 ITEM1 ITEM2 ITEM3 

但是,不需要map返回与输入相同的数据类型。 以下示例将使用整数而不是字符串。 此外,我们将链接两个转换:


 var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3); 

或类似这样的东西:



 class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a)); 

.map的最有用的用途之一是,当您从某种REST API或数据库中获取某种格式的数据并希望将其转换为自己的对象时:


 class User { final String name; final String adress; final String phoneNumber; final int age; //       - //   factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // -  var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; //   json-,    API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); //    dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); }); 

我注意到,不仅Streams,而且任何Iterable都提供了可用于列表转换的map函数。


哪里


如果您仅对流中遇到的某些值感兴趣,则可以使用.where()函数,而不是在侦听器中使用if ,这样做更具表现力且更易于阅读:


 var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3 

去抖


这是Rx的小明珠之一! 假设您有一个搜索字段,如果其文本发生更改,它将调用REST API。 对每个按键进行API调用都非常昂贵。 因此,仅当用户暂时暂停时,您才想拨打电话。 为此,使用了debounce()函数,该函数吞下所有传入事件,如果它们之后没有暂停。


 var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); //    await Future.delayed(Duration(milliseconds: 700)); //       : 'ABC' 

因此,如果将TextField.onChanged处理程序转换为Observable ,则会得到一个优雅的解决方案。


展开


如果您的源Stream发出对象数组,并且您想自己处理每个对象,则可以使用.expand ,它可以做到这一点:


图片


您将在FireStore示例中看到此方法的应用程序。


合并


如果您有多个不同的线程,但是想要一起处理它们的对象,则可以使用.mergeWith (在其他Rx实现中,只需merge ),该方法接受一组线程并返回一个合并的线程。


图片


.mergeWith不保证流中的任何顺序都被合并。 数据按输入顺序发出。


例如,如果您有两个通过流报告错误的组件,并且希望将它们一起显示在对话框中,则可以执行以下操作(伪代码):


 @override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); } 

或者,如果您想同时显示来自多个社交网络的消息,则可能看起来像这样(伪代码):


 final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]); 

邮编


zipWith还可以将一个流与另一个流合并。 但是,与.mergeWith不同,它不会在从其源流之一接收到元素后立即发送数据。 他等到两个源流中的元素到达,然后使用提供的zipper功能将它们组合:


图片


zipWith签名看起来很吓人,但现在我们来看一下:


 // R :   Stream/Observable // S :   Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) 

一个非常简化的示例:


 new Observable.just(1) // .just()  Observable,    .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); //  3 

一个更实际的应用程序是,如果您需要等待两个返回Future异步函数,并且想要在两个结果都返回时立即处理数据。 在这个稍作设计的示例中,我们提供了两个REST API:一个返回User ,另一个返回Product作为JSON字符串,并且我们希望在返回Invoice对象之前等待两次调用。


 class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } //  HTTP ,  Product,  JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } //  HTTP ,  User,  JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); //        await Future.delayed(Duration(seconds: 5)); }); } 

查看输出,可以看到它是如何异步完成的


 Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99 

组合最新


combineLatest还可以合并流值,但是mergezip方式略有不同。 每当一个线程中有新值到达时,它就会侦听更多线程并发出组合值。 有趣的是,它不仅生成更改后的值,而且还生成所有其他源流的最后获得的值。 仔细看一下这个动画:


图片


combineLates其第一个值之前,所有源线程必须至少接收一个条目。


与之前使用的方法不同, combineLatest是静态的。 此外,由于Dart不允许运算符重载,因此有一些版本的combLastest取决于源线程的数量: CombineLatest2 ... CombineLatest9


combineLatest很好的用法,例如,如果您有两个Observable<bool>表示您的应用程序的某些部分处于繁忙状态,并且您想在其中一个繁忙时显示“繁忙”微调器。 它可能看起来像这样(伪代码):


 class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; } 

在用户界面中,如果结果值为true,则可以将isBusyStreamBuilder一起使用以显示Spinner


combineLatestFireStore快照流结合使用combineLatest非常合适的功能。


想象一下,您想创建一个显示新闻订阅源和天气预报的应用程序。 交易消息和天气数据存储在两个不同的FireStore集合中。 两者均独立更新。 您想使用StreamBuilder显示数据更新。 使用combineLatest很容易:


 class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } } 

在您的UI中,它看起来像这样: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


独特的


在上述情况下,可能会发生isBusyOneisBusyTwo提供相同的值,这将导致使用相同数据的用户界面更新。 为了防止这种情况,我们可以使用.distinct() 。 它确保仅在新元素的值与最后一个元素的值不同时才传输数据。 因此,我们将代码更改为:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct(); 

它也表明我们可以随意将功能组合到不同的链中。


异步映射


除了map()还有asyncMap函数,它允许您将异步函数用作地图函数。 让我们为我们的FireStore示例介绍一个稍有不同的设置。 现在,必要的WeatherForecast取决于NewsMessage的位置,并且仅应在收到新的NewsMessage时进行更新:


 Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); } 

每次newsCollection更改时,getDependendMessages返回的Observable都会生成一个新的CombinedMessage。


调试可观察对象


查看优雅的Rx调用链,似乎几乎不可能调试这样的表达式:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); 

但是请记住, =>只是匿名函数的缩写。 使用“ 转换”块主体 ,您将获得:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); 

现在,我们可以在管道的每个步骤中设置一个断点或添加打印语句。


当心副作用


如果要利用Rx来使代码更健壮,请始终记住Rx是“沿着传送带”移动时的数据转换。 因此,在到达.listen函数之前,切勿调用更改处理管道之外的任何变量/状态的函数。
而不是这样做:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen(); 

这样做:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); }); 

map()的职责是转换流中的数据,并且什么都不做! 如果传递的显示功能还有其他作用,则将其视为副作用,从而生成在读取代码时很难检测到的潜在错误。


关于释放资源的一些想法


为避免内存泄漏,请在不再需要订阅时始终调用cancel()进行订阅,对StreamControllers进行dispose() ,对Subjects进行close()调用。


结论


恭喜您一直陪在我身边。 现在,您不仅可以使用Rx来简化生活,还可以为下一篇文章做准备,在这些文章中我们将深入研究RxVMS的细节。

Source: https://habr.com/ru/post/zh-CN451292/


All Articles