Programmation asynchrone: flux de données
Table des matières
Ce qui est important:
- Les flux fournissent une séquence de données asynchrone.
- Les séquences de données contiennent des événements utilisateur et des données lues dans des fichiers.
- Le flux peut être traité à l'aide de l' attente ou de l'
listen()
de l'API Stream
. - Les flux fournissent un moyen de répondre aux erreurs.
- Il existe deux types de flux: les flux
single subscription
et la diffusion.
La programmation asynchrone dans Dart est caractérisée par les classes Future et Stream .
Future
est un calcul différé. Si une fonction normale renvoie un résultat, la fonction asynchrone renvoie un objet Future
( future
) qui contiendra finalement le résultat. future
renverra le résultat une fois l'opération terminée.
Un flux est une séquence d'événements asynchrones. Cela est similaire à un objet asynchrone Iterable
, où au lieu de recevoir l'événement suivant lorsque vous le demandez, le thread signale l'événement lorsqu'il est prêt.
Réception d'événements de flux
Les flux peuvent être créés de différentes manières, ce qui est le sujet d'un autre article, mais ils peuvent tous être utilisés de la même manière: une boucle for asynchrone (généralement appelée en attente de ) itère les événements de flux, comme une boucle for itère à travers une collection. Par exemple:
Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (var value in stream) { sum += value; } return sum; }
Ce code reçoit simplement l'événement entier du flux, les ajoute et renvoie le montant ( future
). Lorsque le corps de la boucle se termine, la fonction se met en pause jusqu'à la fin de l'événement ou du thread suivant.
La fonction est marquée avec le async
- async
, qui est requis lors de l'utilisation de la boucle wait for .
L'exemple suivant (sur DartPad ) vérifie le code précédent en créant un simple flux d'entiers à l'aide d'une fonction avec async*
( générateur de notes ):
Événements d'erreur
Les threads sont exécutés lorsqu'il n'y a plus d'événements en eux, et le code qui reçoit les événements en est informé de la même manière qu'il est informé de l'arrivée d'un nouvel événement. Lors de la lecture des événements avec wait for, la boucle se termine à la fin du flux.
Dans certains cas, une erreur se produit avant la fin du flux; peut-être une défaillance du réseau lors de la récupération d'un fichier à partir d'un serveur distant ou le code qui a généré les événements contient une erreur, quelqu'un devrait le savoir.
Les flux peuvent signaler un événement d'erreur de la même manière que les événements de données. La plupart des threads s'arrêtent après la première erreur, mais les threads qui renvoient plus d'une erreur et les threads qui signalent des données après un événement d'erreur sont possibles. Dans ce document, nous discutons uniquement des threads qui ne retournent pas plus d'une erreur.
Lors de la lecture d'un flux à l'aide de l' attente de, une erreur est générée par l'opérateur de boucle. Cela termine également la boucle. Vous pouvez attraper l'erreur avec try-catch. Dans l'exemple suivant (sur DartPad ), une erreur se produit si l'itérateur de boucle est 4:
Travailler avec des flux
La classe Stream
contient un certain nombre de méthodes d'assistance qui peuvent effectuer des opérations générales sur un flux, similaires aux méthodes Iterable . Par exemple, vous pouvez trouver le plus petit entier positif dans un flux à l'aide de lastWhere()
de l'API Stream
.
Future<int> lastPositive(Stream<int> stream) => stream.lastWhere((x) => x >= 0);
Types de flux
Flux d'abonnement
Le type de flux le plus courant contient une séquence d'événements qui font partie d'un ensemble plus large. Les événements doivent être livrés dans le bon ordre sans en manquer aucun. Il s'agit du type de flux que vous recevez lors de la lecture d'un fichier ou de la réception d'une demande Web.
Un tel flux ne peut être écouté qu'une seule fois. Écouter plus tard peut signifier ignorer les événements initiaux, puis le reste du flux n'a pas de sens. Lorsque vous commencez à écouter, les données seront extraites et fournies en morceaux.
Flux de diffusion
Un autre type de flux concerne les messages individuels qui peuvent être traités un par un. Un tel flux peut être utilisé, par exemple, pour des événements de souris dans un navigateur.
Vous pouvez commencer à écouter un tel flux à tout moment et vous recevrez des événements survenus pendant l'écoute. Stream peut écouter plusieurs auditeurs. Vous pouvez recommencer à écouter les événements de streaming après avoir annulé un abonnement précédent.
Méthodes de traitement des flux
Les méthodes suivantes dans Stream <T> traitent le flux et renvoient le résultat:
Future<T> get first; Future<bool> get isEmpty; Future<T> get last; Future<int> get length; Future<T> get single; Future<bool> any(bool Function(T element) test); Future<bool> contains(Object needle); Future<E> drain<E>([E futureValue]); Future<T> elementAt(int index); Future<bool> every(bool Function(T element) test); Future<T> firstWhere(bool Function(T element) test, {T Function() orElse}); Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine); Future forEach(void Function(T element) action); Future<String> join([String separator = ""]); Future<T> lastWhere(bool Function(T element) test, {T Function() orElse}); Future pipe(StreamConsumer<T> streamConsumer); Future<T> reduce(T Function(T previous, T element) combine); Future<T> singleWhere(bool Function(T element) test, {T Function() orElse}); Future<List<T>> toList(); Future<Set<T>> toSet();
Toutes ces fonctions, à l'exception de drain()
et pipe()
, correspondent à une fonction similaire dans Iterable . Chacun d'eux peut être facilement écrit en utilisant une fonction asynchrone avec une boucle en attente (ou simplement en utilisant l'une des autres méthodes). Par exemple, certaines implémentations peuvent être les suivantes:
Toutes ces fonctions, à l'exception de drain () et pipe (), correspondent à une fonction similaire sur Iterable. Chacun peut être écrit facilement en utilisant une fonction asynchrone avec une boucle d'attente (ou simplement en utilisant l'une des autres méthodes). Par exemple, certaines implémentations pourraient être:
Future<bool> contains(Object needle) async { await for (var event in this) { if (event == needle) return true; } return false; } Future forEach(void Function(T element) action) async { await for (var event in this) { action(event); } } Future<List<T>> toList() async { final result = <T>[]; await this.forEach(result.add); return result; } Future<String> join([String separator = ""]) async => (await this.toList()).join(separator);
(La mise en œuvre réelle est un peu plus compliquée, mais principalement pour des raisons historiques.)
Méthodes de modification du flux
Les méthodes suivantes dans Stream renvoient un nouveau flux basé sur le flux d'origine. Chacun attend que quelqu'un écoute le nouveau flux avant d'écouter l'original.
Stream<R> cast<R>(); Stream<S> expand<S>(Iterable<S> Function(T element) convert); Stream<S> map<S>(S Function(T event) convert); Stream<R> retype<R>(); Stream<T> skip(int count); Stream<T> skipWhile(bool Function(T element) test); Stream<T> take(int count); Stream<T> takeWhile(bool Function(T element) test); Stream<T> where(bool Function(T event) test);
Les méthodes ci-dessus correspondent à des méthodes similaires dans Iterable , qui convertissent l'objet itérable en un autre objet itérable. Tout cela peut être facilement écrit à l'aide d'une fonction asynchrone avec une boucle d'attente .
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert); Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert); Stream<T> distinct([bool Function(T previous, T next) equals]);
Les fonctions asyncExpand()
et asyncMap()
sont similaires aux fonctions expand()
et map()
, mais permettent à l'argument de fonction d'être une fonction asynchrone. distinct()
n'existent pas dans Iterable, mais elles peuvent être implémentées.
Stream<T> handleError(Function onError, {bool test(error)}); Stream<T> timeout(Duration timeLimit, {void Function(EventSink<T> sink) onTimeout}); Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
Les trois dernières fonctions sont plus spécifiques. Il s'agit notamment de la gestion des erreurs que la boucle d'attente ne peut pas effectuer, car la toute première erreur termine la boucle et s'abonne au flux. Il n'y a rien à faire. Vous pouvez handleError()
pour supprimer les erreurs du flux avant de l'utiliser dans la boucle d'attente .
La fonction transform () n'est pas seulement pour la gestion des erreurs; c'est une "carte" plus généralisée pour les flux. Une carte normale nécessite une valeur pour chaque événement entrant. Cependant, en particulier pour les flux d'E / S, plusieurs événements entrants peuvent être nécessaires pour créer un événement de sortie. StreamTransformer peut vous y aider. Par exemple, les décodeurs comme Utf8Decoder sont des transformateurs. Un transformateur ne nécessite qu'une seule fonction bind () , qui peut être facilement implémentée via une fonction asynchrone.
Stream<S> mapLogErrors<S, T>( Stream<T> stream, S Function(T event) convert, ) async* { var streamWithoutErrors = stream.handleError((e) => log(e)); await for (var event in streamWithoutErrors) { yield convert(event); } }
Lecture et décodage d'un fichier
Le code suivant lit le fichier et effectue deux conversions dans le flux. Tout d'abord, il convertit les données UTF8, puis les transmet via LineSplitter . Toutes les lignes sont imprimées, sauf celles commençant par un hashtag ( #
).
import 'dart:convert'; import 'dart:io'; Future<void> main(List<String> args) async { var file = File(args[0]); var lines = file .openRead() .transform(utf8.decoder) .transform(LineSplitter()); await for (var line in lines) { if (!line.startsWith('#')) print(line); } }
Méthode Listen ()
La méthode listen()
est une méthode de «bas niveau», toutes les autres fonctions de la fonction sont définies via listen()
.
StreamSubscription<T> listen(void Function(T event) onData, {Function onError, void Function() onDone, bool cancelOnError});
Pour créer un nouveau type de flux, vous pouvez simplement hériter de la classe Stream
et implémenter la méthode listen()
, toutes les autres méthodes Stream
appellent listen()
pour fonctionner.
La méthode listen () vous permet de commencer à écouter le flux. Tant que vous ne le faites pas, le flux est un objet inerte qui décrit les événements que vous souhaitez écouter. Lors de l'écoute, un objet StreamSubscription est renvoyé qui représente le flux actif qui génère des événements. Ceci est similaire à la façon dont Iterable
n'est qu'une collection d'objets, et un itérateur est celui qui effectue l'itération réelle.
Vous pouvez arrêter de vous abonner à un flux, le reprendre après une pause et l'annuler complètement. Vous pouvez spécifier des rappels qui seront appelés pour chaque événement de données ou événement d'erreur, ainsi que lorsque le flux est fermé.
Que lire d'autre?
Dart 2. Programmation asynchrone: futurs