Programação assíncrona: fluxos de dados
Conteúdo
O que é importante:
- Os fluxos fornecem sequência de dados assíncrona.
- Sequências de dados contêm eventos do usuário e dados lidos de arquivos.
- O fluxo pode ser processado usando aguardar ou
listen()
da API de Stream
. - Os fluxos fornecem uma maneira de responder a erros.
- Existem dois tipos de fluxos: fluxos de
single subscription
e transmissão.
A programação assíncrona no Dart é caracterizada pelas classes Future e Stream .
Future
é um cálculo adiado. Se uma função normal retornar um resultado, a função assíncrona retornará um objeto Future
( future
) que acabará por conter o resultado. future
retornará o resultado quando a operação for concluída.
Um fluxo é uma sequência de eventos assíncronos. Isso é semelhante a um objeto assíncrono Iterable
, onde, em vez de receber o próximo evento quando você o solicita, o encadeamento relata o evento quando está pronto.
Recebendo eventos de fluxo
Os fluxos podem ser criados de maneiras diferentes, que é o tópico de outro artigo, mas todos podem ser usados da mesma maneira: um loop for assíncrono (geralmente chamado de espera ) itera eventos de fluxo, como um loop for itera através de uma coleção. Por exemplo:
Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (var value in stream) { sum += value; } return sum; }
Esse código simplesmente recebe o evento inteiro do fluxo, os adiciona e retorna a quantia ( future
). Quando o corpo do loop termina, a função pausa até o próximo evento ou thread terminar.
A função é marcada com a async
, necessária ao usar o loop de espera .
O exemplo a seguir (no DartPad ) verifica o código anterior, criando um fluxo simples de números inteiros usando uma função com async*
( gerador de notas ):
Eventos de erro
Os encadeamentos são executados quando não há mais eventos neles, e o código que recebe os eventos é notificado sobre isso da mesma maneira que é notificado sobre a chegada de um novo evento. Ao ler eventos com aguardar, o loop termina quando o fluxo termina.
Em alguns casos, ocorre um erro antes que o fluxo termine; talvez uma falha na rede ao recuperar um arquivo de um servidor remoto ou o código que gerou os eventos contenha um erro, alguém deve saber sobre isso.
Os fluxos podem relatar um evento de erro da mesma maneira que os eventos de dados. A maioria dos threads é interrompida após o primeiro erro, mas os threads que retornam mais de um erro e os threads que relatam dados após um evento de erro são possíveis. Neste documento, discutimos apenas os threads que retornam não mais que um erro.
Ao ler um fluxo usando aguardar, um erro é gerado pelo operador de loop. Isso também completa o loop. Você pode capturar o erro com try-catch. No exemplo a seguir (no DartPad ), ocorrerá um erro se o iterador de loop for 4:
Trabalhar com fluxos
A classe Stream
contém vários métodos auxiliares que podem executar operações gerais em um fluxo, semelhantes aos métodos Iterable . Por exemplo, você pode encontrar o menor número inteiro positivo em um fluxo usando lastWhere()
da API de Stream
.
Future<int> lastPositive(Stream<int> stream) => stream.lastWhere((x) => x >= 0);
Tipos de stream
Fluxos de Assinaturas
O tipo mais comum de fluxo contém uma sequência de eventos que fazem parte de um todo maior. Os eventos devem ser entregues na ordem correta, sem perder nenhum deles. Este é o tipo de fluxo que você recebe ao ler um arquivo ou ao receber uma solicitação da web.
Esse fluxo só pode ser ouvido uma vez. Ouvir mais tarde pode significar pular os eventos iniciais e, em seguida, o restante do fluxo não faz sentido. Quando você começa a ouvir, os dados serão extraídos e fornecidos em partes.
Fluxos de transmissão
Outro tipo de fluxo é para mensagens individuais que podem ser processadas uma por vez. Esse fluxo pode ser usado, por exemplo, para eventos de mouse em um navegador.
Você pode começar a ouvir esse fluxo a qualquer momento e receberá eventos que ocorreram durante a audição. O fluxo pode ouvir vários ouvintes. Você pode começar a ouvir eventos de streaming novamente depois de cancelar uma assinatura anterior.
Métodos de processamento de fluxo
Os seguintes métodos no fluxo <T> processam o fluxo e retornam o resultado:
Future<T> get first; Future<bool> get isEmpty; Future<T> get last; Future<int> get length; Future<T> get single; Future<bool> any(bool Function(T element) test); Future<bool> contains(Object needle); Future<E> drain<E>([E futureValue]); Future<T> elementAt(int index); Future<bool> every(bool Function(T element) test); Future<T> firstWhere(bool Function(T element) test, {T Function() orElse}); Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine); Future forEach(void Function(T element) action); Future<String> join([String separator = ""]); Future<T> lastWhere(bool Function(T element) test, {T Function() orElse}); Future pipe(StreamConsumer<T> streamConsumer); Future<T> reduce(T Function(T previous, T element) combine); Future<T> singleWhere(bool Function(T element) test, {T Function() orElse}); Future<List<T>> toList(); Future<Set<T>> toSet();
Todas essas funções, exceto drain()
e pipe()
, correspondem a uma função semelhante no Iterable . Cada um deles pode ser facilmente escrito usando uma função assíncrona com um loop de espera (ou simplesmente usando um dos outros métodos). Por exemplo, algumas implementações podem ser as seguintes:
Todas essas funções, exceto dreno () e tubo (), correspondem a uma função semelhante no Iterable. Cada um pode ser escrito facilmente usando uma função assíncrona com um loop de espera (ou apenas usando um dos outros métodos). Por exemplo, algumas implementações podem ser:
Future<bool> contains(Object needle) async { await for (var event in this) { if (event == needle) return true; } return false; } Future forEach(void Function(T element) action) async { await for (var event in this) { action(event); } } Future<List<T>> toList() async { final result = <T>[]; await this.forEach(result.add); return result; } Future<String> join([String separator = ""]) async => (await this.toList()).join(separator);
(A implementação real é um pouco mais complicada, mas principalmente por razões históricas.)
Métodos de modificação de fluxo
Os métodos a seguir no Stream retornam um novo fluxo com base no fluxo original. Cada um deles espera que alguém escute o novo fluxo antes de ouvir o original.
Stream<R> cast<R>(); Stream<S> expand<S>(Iterable<S> Function(T element) convert); Stream<S> map<S>(S Function(T event) convert); Stream<R> retype<R>(); Stream<T> skip(int count); Stream<T> skipWhile(bool Function(T element) test); Stream<T> take(int count); Stream<T> takeWhile(bool Function(T element) test); Stream<T> where(bool Function(T event) test);
Os métodos acima correspondem a métodos semelhantes no Iterable , que convertem o objeto iterável em outro objeto iterável. Tudo isso pode ser facilmente escrito usando uma função assíncrona com um loop de espera .
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert); Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert); Stream<T> distinct([bool Function(T previous, T next) equals]);
As asyncExpand()
e asyncMap()
são semelhantes às funções expand()
e map()
, mas permitem que o argumento da função seja uma função assíncrona. funções distinct()
não existem no Iterable, mas podem ser implementadas.
Stream<T> handleError(Function onError, {bool test(error)}); Stream<T> timeout(Duration timeLimit, {void Function(EventSink<T> sink) onTimeout}); Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
As últimas três funções são mais específicas. Isso inclui a manipulação de erros que o loop de espera não pode executar, pois o primeiro erro finaliza o loop e assina o fluxo. Não há nada a ser feito sobre isso. Você pode handleError()
para remover erros do fluxo antes de usá-lo no loop de espera .
A função transform () não é apenas para manipulação de erros; é um "mapa" mais generalizado para fluxos. Um mapa normal requer um valor para cada evento recebido. No entanto, especialmente para fluxos de E / S, vários eventos de entrada podem ser necessários para criar um evento de saída. StreamTransformer pode ajudar com isso. Por exemplo, decodificadores como Utf8Decoder são transformadores. Um transformador requer apenas uma função bind () , que pode ser facilmente implementada por meio de uma função assíncrona.
Stream<S> mapLogErrors<S, T>( Stream<T> stream, S Function(T event) convert, ) async* { var streamWithoutErrors = stream.handleError((e) => log(e)); await for (var event in streamWithoutErrors) { yield convert(event); } }
Lendo e decodificando um arquivo
O código a seguir lê o arquivo e realiza duas conversões no fluxo. Primeiro, ele converte os dados do UTF8 e depois os passa pelo LineSplitter . Todas as linhas são impressas, exceto aquelas que começam com uma hashtag ( #
).
import 'dart:convert'; import 'dart:io'; Future<void> main(List<String> args) async { var file = File(args[0]); var lines = file .openRead() .transform(utf8.decoder) .transform(LineSplitter()); await for (var line in lines) { if (!line.startsWith('#')) print(line); } }
Método Listen ()
O método listen()
é um método de "baixo nível", todas as outras funções da função são definidas por listen()
.
StreamSubscription<T> listen(void Function(T event) onData, {Function onError, void Function() onDone, bool cancelOnError});
Para criar um novo tipo de fluxo, você pode simplesmente herdar a classe Stream
e implementar o método listen()
, todos os outros métodos de Stream
chamam listen()
para funcionar.
O método listen () permite que você comece a ouvir o fluxo. Até você fazer isso, o fluxo é um objeto inerte que descreve quais eventos você deseja ouvir. Ao ouvir, é retornado um objeto StreamSubscription que representa o fluxo ativo que está gerando eventos. Isso é semelhante a como Iterable
é apenas uma coleção de objetos e um iterador é quem realiza a iteração real.
Você pode parar de se inscrever em um fluxo, retomar depois de uma pausa e cancelá-lo completamente. Você pode especificar retornos de chamada que serão chamados para cada evento de dados ou erro, bem como quando o fluxo for fechado.
O que mais ler?
Dardo 2. Programação assíncrona: futuros