异步编程:数据流
目录内容
重要的是:
- 流提供异步数据序列。
- 数据序列包含用户事件和从文件读取的数据。
- 可以使用
Stream
API中的await for或listen()
处理Stream
。 - 流提供了一种响应错误的方法。
- 流有两种类型:
single subscription
流和广播。
Dart中的异步编程的特征是Future和Stream类。
Future
是递延计算。 如果普通函数返回结果,则异步函数返回最终将包含结果的Future
( future
)对象。 操作完成后, future
将返回结果。
流是一系列异步事件。 这类似于Iterable
异步对象,其中线程在准备就绪时报告事件,而不是在您请求时接收下一个事件。
接收流事件
流可以以不同的方式创建,这是另一篇文章的主题,但是它们都可以以相同的方式使用:异步的for循环(通常称为await for )迭代流事件,就像for循环遍历集合一样。 例如:
Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (var value in stream) { sum += value; } return sum; }
此代码仅从流中接收整数事件,将它们相加并返回( future
)金额。 当循环的主体结束时,函数将暂停直到下一个事件或线程终止。
该函数用async
标记,这在使用await for循环时是必需的。
以下示例(在DartPad上 )通过使用带有async*
( 注释生成器 )的函数创建简单的整数流来检查先前的代码:
错误事件
当线程中没有更多事件时,将执行线程,并且以与通知新事件到来的方式相同的方式来通知接收事件的代码。 当读取带有等待事件时,循环在流结束时结束。
在某些情况下,流完成之前会发生错误; 可能是从远程服务器检索文件时出现网络故障,或者生成事件的代码包含错误,应该有人知道。
流可以以与数据事件相同的方式报告错误事件。 大多数线程在第一个错误发生后停止,但是返回多个错误的线程和在发生错误事件后报告数据的线程都是可能的。 在本文档中,我们仅讨论返回不超过一个错误的线程。
当使用await for读取流时,循环运算符将引发错误。 这也完成了循环。 您可以使用try-catch捕获错误。 在以下示例中(在DartPad上 ),如果循环迭代器为4,则会发生错误:
使用流
Stream
类包含许多可对流执行常规操作的辅助方法,类似于Iterable方法。 例如,您可以使用Stream
API中的lastWhere()
找到流中最小的正整数。
Future<int> lastPositive(Stream<int> stream) => stream.lastWhere((x) => x >= 0);
流类型
订阅流
流的最常见类型包含一系列事件,这些事件是较大整体的一部分。 必须以正确的顺序交付事件,而不会丢失任何事件。 这是您在读取文件或接收Web请求时收到的流的类型。
这样的流只能被收听一次。 稍后收听可能意味着跳过初始事件,然后其余流没有意义。 当您开始收听时,数据将被提取并分段提供。
广播流
流的另一种类型是针对一次可以处理的单个消息。 这样的流可以用于例如浏览器中的鼠标事件。
您可以随时开始收听此类流,并且您将收到在侦听期间发生的事件。 流可以收听多个侦听器。 取消上一个订阅后,您可以再次开始收听流事件。
流处理方法
Stream <T>中的以下方法处理流并返回结果:
Future<T> get first; Future<bool> get isEmpty; Future<T> get last; Future<int> get length; Future<T> get single; Future<bool> any(bool Function(T element) test); Future<bool> contains(Object needle); Future<E> drain<E>([E futureValue]); Future<T> elementAt(int index); Future<bool> every(bool Function(T element) test); Future<T> firstWhere(bool Function(T element) test, {T Function() orElse}); Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine); Future forEach(void Function(T element) action); Future<String> join([String separator = ""]); Future<T> lastWhere(bool Function(T element) test, {T Function() orElse}); Future pipe(StreamConsumer<T> streamConsumer); Future<T> reduce(T Function(T previous, T element) combine); Future<T> singleWhere(bool Function(T element) test, {T Function() orElse}); Future<List<T>> toList(); Future<Set<T>> toSet();
除了drain()
和pipe()
以外,所有这些功能都与Iterable中的类似功能相对应。 可以使用带有等待循环的异步函数轻松地编写每个脚本 (或者简单地使用其他方法之一)。 例如,一些实现可能如下:
除了排水()和管道()以外,所有这些功能都与Iterable上的类似功能相对应。 通过使用带有await for循环的异步函数(或仅使用其他方法之一),可以轻松编写每个脚本。 例如,一些实现可能是:
Future<bool> contains(Object needle) async { await for (var event in this) { if (event == needle) return true; } return false; } Future forEach(void Function(T element) action) async { await for (var event in this) { action(event); } } Future<List<T>> toList() async { final result = <T>[]; await this.forEach(result.add); return result; } Future<String> join([String separator = ""]) async => (await this.toList()).join(separator);
(实际实现稍微复杂一些,但主要是出于历史原因。)
流修改方法
Stream中的以下方法基于原始流返回新的流。 他们每个人都在等待某人收听新的流,然后再听原始的。
Stream<R> cast<R>(); Stream<S> expand<S>(Iterable<S> Function(T element) convert); Stream<S> map<S>(S Function(T event) convert); Stream<R> retype<R>(); Stream<T> skip(int count); Stream<T> skipWhile(bool Function(T element) test); Stream<T> take(int count); Stream<T> takeWhile(bool Function(T element) test); Stream<T> where(bool Function(T event) test);
上面的方法对应于Iterable中的类似方法,该方法将可迭代对象转换为另一个可迭代对象。 所有这些都可以使用带有等待循环的异步函数轻松编写。
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert); Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert); Stream<T> distinct([bool Function(T previous, T next) equals]);
asyncExpand()
和asyncMap()
函数类似于expand()
和map()
函数,但是允许该函数参数为异步函数。 distinct()
函数在Iterable中不存在,但可以实现。
Stream<T> handleError(Function onError, {bool test(error)}); Stream<T> timeout(Duration timeLimit, {void Function(EventSink<T> sink) onTimeout}); Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
最后三个功能更具体。 这些包括错误处理,因为第一个错误会终止循环并订阅流,因此await for循环无法执行。 没有什么可做的。 您可以handleError()
从流中删除错误,然后在等待循环中使用它。
transform()函数不仅用于错误处理; 它是流的更通用的“映射”。 正常映射需要为每个传入事件提供一个值。 但是,尤其是对于I / O流,可能需要多个入站事件才能创建输出事件。 StreamTransformer可以帮助您。 例如,像Utf8Decoder这样的解码器就是变压器。 转换器仅需要一个bind()函数,该函数可以通过异步函数轻松实现。
Stream<S> mapLogErrors<S, T>( Stream<T> stream, S Function(T event) convert, ) async* { var streamWithoutErrors = stream.handleError((e) => log(e)); await for (var event in streamWithoutErrors) { yield convert(event); } }
读取和解码文件
以下代码读取文件并在流中执行两次转换。 首先,它将转换来自UTF8的数据,然后将其通过LineSplitter传递。 打印所有行,除了以井号( #
)开头的行。
import 'dart:convert'; import 'dart:io'; Future<void> main(List<String> args) async { var file = File(args[0]); var lines = file .openRead() .transform(utf8.decoder) .transform(LineSplitter()); await for (var line in lines) { if (!line.startsWith('#')) print(line); } }
Listen()方法
listen()
方法是“低级”方法,该函数的所有其他功能都是通过listen()
定义的。
StreamSubscription<T> listen(void Function(T event) onData, {Function onError, void Function() onDone, bool cancelOnError});
要创建一种新型的流,您可以简单地继承Stream
类并实现listen()
方法,所有其他Stream
方法都可以调用listen()
来工作。
listen()方法使您可以开始侦听流。 在执行此操作之前,流是一个惰性对象,该对象描述您要侦听的事件。 侦听时,将返回一个StreamSubscription对象,该对象表示正在生成事件的活动流。 这类似于Iterable
只是对象的集合,而Iterator是执行实际迭代的对象。
您可以停止订阅信息流,在暂停后恢复它,然后完全取消它。 您可以指定将为每个数据事件或错误事件以及流关闭时调用的回调。
还有什么要读的?
Dart 2.异步编程:期货