Programaci贸n asincr贸nica: flujos de datos
Contenido
Lo que es importante:
- Las secuencias proporcionan una secuencia de datos asincr贸nica.
- Las secuencias de datos contienen eventos de usuario y datos le铆dos de archivos.
- La transmisi贸n se puede procesar usando waitit o
listen()
desde la API Stream
. - Las transmisiones proporcionan una forma de responder a los errores.
- Hay dos tipos de transmisiones: transmisiones de
single subscription
y difusi贸n.
La programaci贸n asincr贸nica en Dart se caracteriza por las clases Future y Stream .
Future
es un c谩lculo diferido. Si una funci贸n normal devuelve un resultado, la funci贸n asincr贸nica devuelve un objeto Future
( future
) que finalmente contendr谩 el resultado. future
devolver谩 el resultado cuando se complete la operaci贸n.
Una secuencia es una secuencia de eventos asincr贸nicos. Esto es similar a un objeto asincr贸nico Iterable
, donde en lugar de recibir el pr贸ximo evento cuando lo solicita, el hilo informa el evento cuando est谩 listo.
Recepci贸n de eventos de transmisi贸n
Las secuencias se pueden crear de diferentes maneras, que es el tema de otro art铆culo, pero todas se pueden usar de la misma manera: un bucle for asincr贸nico (generalmente llamado wait for for ) itera eventos de secuencia, como un bucle for itera a trav茅s de una colecci贸n. Por ejemplo:
Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (var value in stream) { sum += value; } return sum; }
Este c贸digo simplemente recibe el evento entero de la secuencia, los agrega y devuelve la cantidad ( future
). Cuando finaliza el cuerpo del bucle, la funci贸n se detiene hasta que finaliza el siguiente evento o subproceso.
La funci贸n est谩 marcada con la async
, que se requiere cuando se utiliza el ciclo de espera .
El siguiente ejemplo (en DartPad ) verifica el c贸digo anterior creando una secuencia simple de enteros utilizando una funci贸n con async*
( generador de notas ):
Eventos de error
Los subprocesos se ejecutan cuando no hay m谩s eventos en ellos, y el c贸digo que recibe los eventos se notifica al respecto de la misma manera que se notifica la llegada de un nuevo evento. Al leer eventos con wait for, el ciclo finaliza cuando finaliza la transmisi贸n.
En algunos casos, se produce un error antes de que finalice la secuencia; quiz谩s una falla de red al recuperar un archivo de un servidor remoto o el c贸digo que gener贸 los eventos contiene un error, alguien deber铆a saberlo.
Las transmisiones pueden informar un evento de error de la misma manera que los eventos de datos. La mayor铆a de los hilos se detienen despu茅s del primer error, pero son posibles los hilos que devuelven m谩s de un error y los hilos que informan datos despu茅s de un evento de error. En este documento, discutimos solo hilos que no devuelven m谩s de un error.
Al leer una secuencia usando wait for, el operador de bucle emite un error. Esto tambi茅n completa el ciclo. Puede detectar el error con try-catch. En el siguiente ejemplo (en DartPad ), se produce un error si el iterador de bucle es 4:
Trabaja con streams
La clase Stream
contiene varios m茅todos auxiliares que pueden realizar operaciones generales en un flujo, similar a los m茅todos Iterable . Por ejemplo, puede encontrar el entero positivo m谩s peque帽o en una secuencia usando lastWhere()
de la API de Stream
.
Future<int> lastPositive(Stream<int> stream) => stream.lastWhere((x) => x >= 0);
Tipos de flujo
Flujos de suscripci贸n
El tipo de secuencia m谩s com煤n contiene una secuencia de eventos que son partes de un todo m谩s grande. Los eventos deben entregarse en el orden correcto sin perder ninguno de ellos. Este es el tipo de transmisi贸n que recibe cuando lee un archivo o recibe una solicitud web.
Dicha transmisi贸n solo se puede escuchar una vez. Escuchar m谩s tarde puede significar omitir los eventos iniciales, y luego el resto de la transmisi贸n no tiene sentido. Cuando comience a escuchar, los datos se extraer谩n y se proporcionar谩n en partes.
Transmitir transmisiones
Otro tipo de transmisi贸n es para mensajes individuales que pueden procesarse uno a la vez. Dicha secuencia se puede utilizar, por ejemplo, para eventos del mouse en un navegador.
Puede comenzar a escuchar dicha transmisi贸n en cualquier momento, y recibir谩 eventos que ocurrieron durante la escucha. Stream puede escuchar a varios oyentes. Puede comenzar a escuchar eventos de transmisi贸n nuevamente despu茅s de cancelar una suscripci贸n anterior.
M茅todos de procesamiento de flujo
Los siguientes m茅todos en la secuencia <T> procesan la secuencia y devuelven el resultado:
Future<T> get first; Future<bool> get isEmpty; Future<T> get last; Future<int> get length; Future<T> get single; Future<bool> any(bool Function(T element) test); Future<bool> contains(Object needle); Future<E> drain<E>([E futureValue]); Future<T> elementAt(int index); Future<bool> every(bool Function(T element) test); Future<T> firstWhere(bool Function(T element) test, {T Function() orElse}); Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine); Future forEach(void Function(T element) action); Future<String> join([String separator = ""]); Future<T> lastWhere(bool Function(T element) test, {T Function() orElse}); Future pipe(StreamConsumer<T> streamConsumer); Future<T> reduce(T Function(T previous, T element) combine); Future<T> singleWhere(bool Function(T element) test, {T Function() orElse}); Future<List<T>> toList(); Future<Set<T>> toSet();
Todas estas funciones, excepto drain()
y pipe()
, corresponden a una funci贸n similar en Iterable . Cada uno de ellos se puede escribir f谩cilmente usando una funci贸n asincr贸nica con un ciclo de espera (o simplemente usando uno de los otros m茅todos). Por ejemplo, algunas implementaciones pueden ser las siguientes:
Todas estas funciones, excepto drenaje () y tuber铆a (), corresponden a una funci贸n similar en Iterable. Cada uno se puede escribir f谩cilmente usando una funci贸n as铆ncrona con un bucle de espera (o simplemente usando uno de los otros m茅todos). Por ejemplo, algunas implementaciones podr铆an ser:
Future<bool> contains(Object needle) async { await for (var event in this) { if (event == needle) return true; } return false; } Future forEach(void Function(T element) action) async { await for (var event in this) { action(event); } } Future<List<T>> toList() async { final result = <T>[]; await this.forEach(result.add); return result; } Future<String> join([String separator = ""]) async => (await this.toList()).join(separator);
(La implementaci贸n real es un poco m谩s complicada, pero principalmente por razones hist贸ricas).
M茅todos de modificaci贸n de flujo
Los siguientes m茅todos en Stream devuelven una nueva secuencia basada en la secuencia original. Cada uno de ellos espera a que alguien escuche la nueva transmisi贸n antes de escuchar la original.
Stream<R> cast<R>(); Stream<S> expand<S>(Iterable<S> Function(T element) convert); Stream<S> map<S>(S Function(T event) convert); Stream<R> retype<R>(); Stream<T> skip(int count); Stream<T> skipWhile(bool Function(T element) test); Stream<T> take(int count); Stream<T> takeWhile(bool Function(T element) test); Stream<T> where(bool Function(T event) test);
Los m茅todos anteriores corresponden a m茅todos similares en Iterable , que convierten el objeto iterable en otro objeto iterable. Todo esto se puede escribir f谩cilmente utilizando una funci贸n asincr贸nica con un bucle de espera .
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert); Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert); Stream<T> distinct([bool Function(T previous, T next) equals]);
Las asyncExpand()
y asyncMap()
son similares a las funciones expand()
y map()
, pero permiten que el argumento de la funci贸n sea una funci贸n asincr贸nica. distinct()
funciones distinct()
no existen en Iterable, pero se pueden implementar.
Stream<T> handleError(Function onError, {bool test(error)}); Stream<T> timeout(Duration timeLimit, {void Function(EventSink<T> sink) onTimeout}); Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
Las 煤ltimas tres funciones son m谩s espec铆ficas. Estos incluyen el manejo de errores que el bucle de espera no puede realizar, ya que el primer error termina el bucle y se suscribe a la secuencia. No hay nada que hacer al respecto. Puede handleError()
para eliminar errores de la secuencia antes de usarlo en el ciclo de espera .
La funci贸n transform () no es solo para el manejo de errores; Es un "mapa" m谩s generalizado para las transmisiones. Un mapa normal requiere un valor para cada evento entrante. Sin embargo, especialmente para las secuencias de E / S, se pueden requerir m煤ltiples eventos entrantes para crear un evento de salida. StreamTransformer puede ayudar con esto. Por ejemplo, los decodificadores como Utf8Decoder son transformadores. Un transformador requiere solo una funci贸n bind () , que puede implementarse f谩cilmente a trav茅s de una funci贸n asincr贸nica.
Stream<S> mapLogErrors<S, T>( Stream<T> stream, S Function(T event) convert, ) async* { var streamWithoutErrors = stream.handleError((e) => log(e)); await for (var event in streamWithoutErrors) { yield convert(event); } }
Leer y decodificar un archivo
El siguiente c贸digo lee el archivo y realiza dos conversiones en la secuencia. Primero, convierte los datos de UTF8 y luego los pasa a trav茅s de LineSplitter . Todas las l铆neas se imprimen, excepto las que comienzan con un hashtag ( #
).
import 'dart:convert'; import 'dart:io'; Future<void> main(List<String> args) async { var file = File(args[0]); var lines = file .openRead() .transform(utf8.decoder) .transform(LineSplitter()); await for (var line in lines) { if (!line.startsWith('#')) print(line); } }
M茅todo Listen ()
El m茅todo listen()
es un m茅todo de "bajo nivel", todas las dem谩s funciones de la funci贸n se definen a trav茅s de listen()
.
StreamSubscription<T> listen(void Function(T event) onData, {Function onError, void Function() onDone, bool cancelOnError});
Para crear un nuevo tipo de transmisi贸n, simplemente puede heredar la clase Stream
e implementar el m茅todo listen()
, todos los dem谩s m茅todos Stream
llaman a listen()
para que funcione.
El m茅todo listen () le permite comenzar a escuchar la transmisi贸n. Hasta que haga esto, la secuencia es un objeto inerte que describe qu茅 eventos desea escuchar. Al escuchar, se devuelve un objeto StreamSubscription que representa la secuencia activa que genera eventos. Esto es similar a c贸mo Iterable
es solo una colecci贸n de objetos, y un iterador es el que realiza la iteraci贸n real.
Puede dejar de suscribirse a una transmisi贸n, reanudarla despu茅s de una pausa y cancelarla por completo. Puede especificar devoluciones de llamada que se llamar谩n para cada evento de datos o evento de error, as铆 como cuando se cierra la secuencia.
驴Qu茅 m谩s leer?
Dart 2. Programaci贸n asincr贸nica: futuros