Dart Streams基础知识

这是我关于Flutter Architecture的系列文章的第二部分:



流是RxVMS的主要构建块,对它们的理解对于使用此库是绝对必要的,因此我们将在本文中更详细地介绍它们。


原来,在这篇文章中包含Rx会使它过长,因此我将其分为两部分。


让它流动


我读了很多评论,他们说流程,尤其是Rx,太复杂了,难以理解,无法使用。


我想让您知道,我不认为自己是Rx专家。 利用他的全部能力并不容易,我承认我会继续学习。 但是,让我从一开始就纠正一个错误: 您不必成为Rx向导就可以开始从使用线程和这项技术中获得大量收益 。 我将尽一切可能以最便捷的方式向您解释流程。


什么是线程?


我认为,最类似于螺纹的是传送带。 您可以将某物放在其一端,该“物”将自动转移到另一物。 与物理管道不同,线程处理数据对象,从头开始自动传输它们-但在哪里? 就像在实际管道中一样,如果没有任何另一端可以捕获数据,它们将简单地“掉下”并消失(当然,对于Dart Streams而言并非如此,但是最好像这样处理流) 。



为了避免数据丢失,可以在流输出上设置“陷阱”。 这样,只要数据对象到达流的末尾,就可以捕获数据并对其执行必要的操作。



请记住:


  • 如果没有设置陷阱,则数据将永远消失,并且将无法再次获取(同样,使用Dart Streams也不完全是,但是最好假装是)
  • 将数据发送到流后,您无需暂停程序,而不必等到结束时,所有这些操作都在后台进行。
  • 陷阱可以随时接收数据,发送后不必立即发送(但不用担心,流实际上非常快)。 想象一下,您不知道传送带移动的速度或速度有多长。 这意味着在流中放置某些东西与另一端与元素的反应完全分开。 您的疏水阀将起作用并在物品到达那里时将其抓住。 (有些人可能已经意识到,这与Flutter更新其窗口小部件的反应方式非常吻合)
  • 您可以在工作开始很久之前设置陷阱,然后第一项出现
  • 该流程基于FIFO原理。 数据总是按照它们在流中的放置顺序排列。

什么是Rx?


Rx是Reactive Extensions的缩写,是类固醇流。 这是与Streams非常相似的概念,Streams是Microsoft团队为.Net框架发明的。 由于.Net已经具有用于文件I / O的Stream类型,因此他们将Rx流称为Observable,并创建了许多函数来处理通过它们的数据。 Dart在其语言规范中内置了Streams,该Streams已经提供了大多数功能,但不是全部。 这就是开发RxDart包的原因。 它基于Dart Streams,但扩展了其功能。 在本系列的下一部分中,我将介绍Rx和RxDart。


一些条款


Dart Streams和Rx使用了一些看起来很吓人的术语,所以这里是翻译。 首先是术语Dart,然后是Rx。


  • 流/可观察 。 这就是前面所述的“管道”。 可以将Stream转换为Observable,并且在需要Stream的任何地方都可以分配Observable。 因此,如果我在解释过程中混合使用这些术语,请不要感到困惑
  • 监听/订阅 -设置监听器陷阱
  • StreamController /主题 。 将数据放入Stream中的传送带的“左侧”。 它们的性质和特性略有不同,但用途相同。
  • 发出项目/数据 。 数据出现在“管道”出口的时刻

流创建


如果您打算继续研究该主题,请使用示例克隆此项目 。 我将使用Dart / Flutter测试系统。


要创建流,请创建一个StreamController


var controller = new StreamController<String>(); controller.add("Item1"); //      

创建StreamController时传递的模板类型(在本例中为String)确定了我们可以发送到流的对象的类型。 可以是任何类型! 如果需要,可以创建StreamController<List<MyObject>>() ,并且流将转移整个工作表而不是单个对象。


陷阱设定


如果您运行了指定的测试,那么您将看不到任何内容,因为在流的输出中没有任何内容抓住我们的行。 现在设置陷阱:


 var controller = new StreamController<String>(); controller.stream.listen((item) => print(item)); //  controller.add("Item1"); controller.add("Item2"); controller.add("Item3"); 

现在,使用.listen()方法设置陷阱。 记录看起来像controller.stream.listen ,但是如果向后滚动(例如60年代的某种专辑),那么所写内容的真实含义就会出现:“收听此控制器的流”


您需要将某个函数传递给.listen()方法,以便以某种方式处理传入的数据。 该函数必须接受创建StreamController时指定的类型的参数,在这种情况下为String。


如果运行上面的代码,您将看到


 Item1 Item2 Item3 

我认为,对于Streams的新手来说,最大的问题是,您可以在将第一个元素放入流中很久之前就确定所发出元素的反应,从而触发对该反应的调用。


结束聆听


上面的代码错过了很小但很重要的部分。 listen()返回一个StreamSubscription一个流订阅对象。 对其他的.cancel()方法的调用将终止订阅,释放资源,并防止在不必要的情况下调用您的侦听功能。


 var controller = new StreamController<String>(); StreamSubscription subscription = controller.stream.listen((item) => print(item)); // This is the Trap controller.add("Item1"); controller.add("Item2"); controller.add("Item3"); //    ,        //  ,     Stream   await Future.delayed(Duration(milliseconds: 500)); subscription.cancel; 

侦听器详细信息


listen()的函数可以是lambda或简单函数。


 void myPrint(String message) { print(message); } StreamSubscription subscription = controller.stream.listen((item) => print(item)); //  - StreamSubscription subscription2 = controller.stream.listen(myPrint); //    StreamSubscription subscription3 = controller.stream.listen((item) { print(item); print(item.toUpperCase); }); // - 

重要说明:大多数Dart流仅允许一次性订阅,也就是说,订阅完成后就无法重新订阅-这将引发异常。 这是它们与其他Rx实现的区别。


listen()的完整签名如下所示:


  /* excerpt from the API doc * The [onError] callback must be of type `void onError(error)` or * `void onError(error, StackTrace stackTrace)`. If [onError] accepts * two arguments it is called with the error object and the stack trace * (which could be `null` if the stream itself received an error without * stack trace). * Otherwise it is called with just the error object. * If [onError] is omitted, any errors on the stream are considered unhandled, * and will be passed to the current [Zone]'s error handler. * By default unhandled async errors are treated * as if they were uncaught top-level errors. * * If this stream closes and sends a done event, the [onDone] handler is * called. If [onDone] is `null`, nothing happens. * * If [cancelOnError] is true, the subscription is automatically canceled * when the first error event is delivered. The default is `false`. */ StreamSubscription<T> listen(void onData(T event), {Function onError, void onDone(), bool cancelOnError}); 

这意味着您不仅可以为传递的数据传递一个处理程序,还可以做更多的事情。 您还可以使用一个错误处理程序,另一个用于在控制器端关闭流( onDone )。 如果您提供它,则从Stream内部onError()onError()否则它们将被吞没,并且您将永远不知道出了什么问题。


颤振线程示例


为了便于理解以下各章,我创建了一个单独的存储库分支。
请克隆她


作为第一个示例,我采用了在创建新的Flutter项目时获得的著名计数器应用程序,并对它进行了一些重新组织。 我添加了一个模型类来保存应用程序的状态,这基本上是一个计数器值:


 class Model { int _counter = 0; StreamController _streamController = new StreamController<int>(); Stream<int> get counterUpdates => _streamController.stream; void incrementCounter() { _counter++; _streamController.add(_counter); } } 

在这里,您可以看到一个非常典型的模板:我们无需发布整个StreamController,而是简单地发布其Stream属性。


为了使模型可用于UI,我将其设置为App对象中的静态字段,因为我不想输入InheritedWidget或ServiceLocator。 举一个简单的例子,这可以解决,但在此应用程序中我不会这样做!


添加到main.dart


 class _MyHomePageState extends State<MyHomePage> { int _counter = 0; StreamSubscription streamSubscription; @override void initState() { streamSubscription = MyApp.model.counterUpdates.listen((newVal) => setState(() { _counter = newVal; })); super.initState(); } //   State   ,   , //       @override void dispose() { streamSubscription?.cancel(); super.dispose(); } 

initState()设置侦听initState()好地方,作为Darts的好公民,我们总是在dispose()释放订阅,对吗?


在小部件树中,我们只需要调整FAB按钮(带有浮动动作的按钮)的onPressed处理程序即可。


 floatingActionButton: new FloatingActionButton( onPressed: MyApp.model.incrementCounter, tooltip: 'Increment', child: new Icon(Icons.add), ), 

这样,我们使用Stream在View和Model之间创建了清晰的分隔。


应用StreamBuilder


来源


Flutter提供了一个方便的StreamBuilder小部件,而不是使用initState()setState()来满足我们的需求。 您可能已经猜到了,它需要Stream函数和一个构造函数方法,只要Stream返回新值,该方法就会被调用。 现在我们不需要显式的初始化和释放:


 body: new Center( child: new Column( mainAxisAlignment: MainAxisAlignment.center, children: <Widget>[ new Text( 'You have pushed the button this many times:', ), StreamBuilder<int>( initialData: 0, stream: MyApp.model.counterUpdates, builder: (context, snappShot) { String valueAsString = 'NoData'; if (snappShot != null && snappShot.hasData) { valueAsString = snappShot.data.toString(); } return Text( valueAsString, style: Theme.of(context).textTheme.display1, ); }), ], ), ), 

我保证,我们快完成了。 这是您应该知道的三件事:


  • 与第一种解决方案相比,使用StreamBuilder的最大优势在于,在listen()中调用setState() listen()总是会重新排列整个页面,而StreamBuilder只会调用其生成builder
  • snapShot变量包含从Stream接收的最新数据。 使用前,请务必先验证其中是否包含有效数据。
  • 根据期间的初始化原理,StreamBuilder在第一帧期间无法获取值。 为了解决这个问题,我们传递了initialData的值,该值用于第一个程序集,即屏幕的第一帧。 如果我们不传递initialData ,则将第一次使用无效数据调用我们的构建器。 另一种使用initialData的替代方法是,如果snapShot无效,则返回一个占位符小部件,该小部件将显示直到我们获得有效数据为止,例如:


     // ,           StreamBuilder<int>( stream: MyApp.model.databaseUpdates, builder: (context, snappShot) { if (snappShot != null && snappShot.hasData) { return Text( snappShot.data.toString(), style: Theme.of(context).textTheme.display1, ); } //      ,   Spinner return CircularProgressIndicator (); }) 


在下一篇文章中,我们将研究如何在流中转换数据并进行实时处理。 非常感谢Scott Stoll阅读了证明和重要反馈。



Source: https://habr.com/ru/post/zh-CN450950/


All Articles