Dart 2. Asynchrone Programmierung: Datenströme

Asynchrone Programmierung: Datenströme


Inhalt



Was ist wichtig:


  • Streams bieten eine asynchrone Datensequenz.
  • Datensequenzen enthalten Benutzerereignisse und aus Dateien gelesene Daten.
  • Der Stream kann mit await for oder listen() von der Stream API verarbeitet werden.
  • Streams bieten eine Möglichkeit, auf Fehler zu reagieren.
  • Es gibt zwei Arten von Streams: single subscription Streams und Broadcast.

Die asynchrone Programmierung in Dart ist durch die Klassen Future und Stream gekennzeichnet .


Future ist eine verzögerte Berechnung. Wenn eine normale Funktion ein Ergebnis zurückgibt, gibt die asynchrone Funktion ein Future ( future ) -Objekt zurück, das letztendlich das Ergebnis enthält. future gibt das Ergebnis zurück, wenn der Vorgang abgeschlossen ist.


Ein Stream ist eine Folge von asynchronen Ereignissen. Dies ähnelt einem Iterable asynchronen Objekt, bei dem der Thread das Ereignis nicht meldet, wenn Sie es anfordern, sondern das Ereignis meldet, wenn es bereit ist.


Stream-Ereignisse empfangen


Streams können auf verschiedene Arten erstellt werden. Dies ist das Thema eines anderen Artikels. Sie können jedoch alle auf dieselbe Weise verwendet werden: Eine asynchrone for-Schleife (normalerweise als Warten auf bezeichnet ) iteriert Stream-Ereignisse wie eine for- Schleife durch eine Sammlung. Zum Beispiel:


 Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (var value in stream) { sum += value; } return sum; } 

Dieser Code empfängt einfach das ganzzahlige Ereignis vom Stream, fügt sie hinzu und gibt den ( future ) Betrag zurück. Wenn der Hauptteil der Schleife endet, wird die Funktion angehalten, bis das nächste Ereignis oder der nächste Thread beendet wird.


Die Funktion ist mit dem async , das bei Verwendung der Warteschleife async erforderlich ist.


Das folgende Beispiel (auf DartPad ) überprüft den vorherigen Code, indem ein einfacher Strom von Ganzzahlen mithilfe einer Funktion mit async* ( Notengenerator ) erstellt wird:


Beispielcode
 // Copyright (c) 2015, the Dart project authors. // Please see the AUTHORS file for details. // All rights reserved. Use of this source code is governed // by a BSD-style license that can be found in the LICENSE file. import 'dart:async'; Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (var value in stream) { sum += value; } return sum; } Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; } } main() async { var stream = countStream(10); var sum = await sumStream(stream); print(sum); // 55 } 

Fehlerereignisse


Threads werden ausgeführt, wenn sie keine Ereignisse mehr enthalten, und der Code, der die Ereignisse empfängt, wird auf dieselbe Weise darüber informiert, wie er über das Eintreffen eines neuen Ereignisses informiert wird. Wenn Sie Ereignisse mit Wartezeit lesen , endet die Schleife, wenn der Stream endet.


In einigen Fällen tritt ein Fehler auf, bevor der Stream endet. Möglicherweise enthält ein Netzwerkfehler beim Abrufen einer Datei von einem Remoteserver oder der Code, der die Ereignisse generiert hat, einen Fehler. Jemand sollte darüber Bescheid wissen.


Streams können ein Fehlerereignis auf dieselbe Weise wie Datenereignisse melden. Die meisten Threads werden nach dem ersten Fehler gestoppt, aber Threads, die mehr als einen Fehler zurückgeben, und Threads, die Daten nach einem Fehlerereignis melden, sind möglich. In diesem Dokument werden nur Threads behandelt, die nicht mehr als einen Fehler zurückgeben.


Beim Lesen eines Streams mit " Warten auf" wird vom Schleifenoperator ein Fehler ausgegeben. Dies vervollständigt auch die Schleife. Sie können den Fehler mit try-catch abfangen. Im folgenden Beispiel (auf DartPad ) tritt ein Fehler auf, wenn der Schleifeniterator 4 ist:


Beispielcode
 // Copyright (c) 2015, the Dart project authors. // Please see the AUTHORS file for details. // All rights reserved. Use of this source code is governed // by a BSD-style license that can be found in the LICENSE file. import 'dart:async'; Future<int> sumStream(Stream<int> stream) async { var sum = 0; try { await for (var value in stream) { sum += value; } } catch (e) { return -1; } return sum; } Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { if (i == 4) { throw new Exception('Intentional exception'); } else { yield i; } } } main() async { var stream = countStream(10); var sum = await sumStream(stream); print(sum); // -1 } 

Arbeite mit Streams


Die Stream Klasse enthält eine Reihe von Hilfsmethoden , die allgemeine Operationen an einem Stream ausführen können, ähnlich wie Iterable- Methoden. Mit lastWhere() aus der Stream API können Sie beispielsweise die kleinste positive Ganzzahl in einem Stream .


 Future<int> lastPositive(Stream<int> stream) => stream.lastWhere((x) => x >= 0); 

Stream-Typen


Abonnement-Streams


Der häufigste Stream-Typ enthält eine Folge von Ereignissen, die Teile eines größeren Ganzen sind. Ereignisse müssen in der richtigen Reihenfolge geliefert werden, ohne dass eines davon fehlt. Dies ist die Art von Stream, die Sie erhalten, wenn Sie eine Datei lesen oder eine Webanforderung erhalten.


Ein solcher Stream kann nur einmal angehört werden. Späteres Abhören kann bedeuten, dass die ersten Ereignisse übersprungen werden, und dann macht der Rest des Streams keinen Sinn. Wenn Sie mit dem Hören beginnen, werden die Daten extrahiert und in Teilen bereitgestellt.


Broadcast-Streams


Eine andere Art von Stream ist für einzelne Nachrichten, die einzeln verarbeitet werden können. Ein solcher Stream kann beispielsweise für Mausereignisse in einem Browser verwendet werden.


Sie können jederzeit mit dem Abhören eines solchen Streams beginnen und erhalten Ereignisse, die während des Abhörens aufgetreten sind. Stream kann mehrere Listener anhören. Sie können Streaming-Ereignisse erneut abhören, nachdem Sie ein vorheriges Abonnement gekündigt haben.


Flow-Verarbeitungsmethoden


Die folgenden Methoden in Stream <T> verarbeiten den Stream und geben das Ergebnis zurück:


 Future<T> get first; Future<bool> get isEmpty; Future<T> get last; Future<int> get length; Future<T> get single; Future<bool> any(bool Function(T element) test); Future<bool> contains(Object needle); Future<E> drain<E>([E futureValue]); Future<T> elementAt(int index); Future<bool> every(bool Function(T element) test); Future<T> firstWhere(bool Function(T element) test, {T Function() orElse}); Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine); Future forEach(void Function(T element) action); Future<String> join([String separator = ""]); Future<T> lastWhere(bool Function(T element) test, {T Function() orElse}); Future pipe(StreamConsumer<T> streamConsumer); Future<T> reduce(T Function(T previous, T element) combine); Future<T> singleWhere(bool Function(T element) test, {T Function() orElse}); Future<List<T>> toList(); Future<Set<T>> toSet(); 

Alle diese Funktionen außer drain() und pipe() entsprechen einer ähnlichen Funktion in Iterable . Jeder von ihnen kann einfach mit einer asynchronen Funktion mit einer Warteschleife (oder einfach mit einer der anderen Methoden) geschrieben werden. Einige Implementierungen können beispielsweise wie folgt aussehen:
Alle diese Funktionen außer Drain () und Pipe () entsprechen einer ähnlichen Funktion in Iterable. Jedes kann einfach geschrieben werden, indem eine asynchrone Funktion mit einer Wait-for-Schleife verwendet wird (oder einfach eine der anderen Methoden verwendet wird). Einige Implementierungen könnten beispielsweise sein:


 Future<bool> contains(Object needle) async { await for (var event in this) { if (event == needle) return true; } return false; } Future forEach(void Function(T element) action) async { await for (var event in this) { action(event); } } Future<List<T>> toList() async { final result = <T>[]; await this.forEach(result.add); return result; } Future<String> join([String separator = ""]) async => (await this.toList()).join(separator); 

(Die tatsächliche Implementierung ist etwas komplizierter, jedoch hauptsächlich aus historischen Gründen.)


Methoden zur Flussänderung


Die folgenden Methoden in Stream geben einen neuen Stream zurück, der auf dem ursprünglichen Stream basiert. Jeder von ihnen wartet darauf, dass jemand den neuen Stream hört, bevor er den ursprünglichen hört.


 Stream<R> cast<R>(); Stream<S> expand<S>(Iterable<S> Function(T element) convert); Stream<S> map<S>(S Function(T event) convert); Stream<R> retype<R>(); Stream<T> skip(int count); Stream<T> skipWhile(bool Function(T element) test); Stream<T> take(int count); Stream<T> takeWhile(bool Function(T element) test); Stream<T> where(bool Function(T event) test); 

Die obigen Methoden entsprechen ähnlichen Methoden in Iterable , die das iterierbare Objekt in ein anderes iterierbares Objekt konvertieren. All dies kann einfach mit einer asynchronen Funktion mit einer Warteschleife geschrieben werden.


 Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert); Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert); Stream<T> distinct([bool Function(T previous, T next) equals]); 

Die Funktionen asyncExpand() und asyncMap() ähneln den Funktionen expand() und map() , ermöglichen jedoch, dass das Funktionsargument eine asynchrone Funktion ist. Verschiedene distinct() Funktionen existieren in Iterable nicht, können aber implementiert werden.


 Stream<T> handleError(Function onError, {bool test(error)}); Stream<T> timeout(Duration timeLimit, {void Function(EventSink<T> sink) onTimeout}); Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer); 

Die letzten drei Funktionen sind spezifischer. Dazu gehört die Fehlerbehandlung, die die Warteschleife nicht ausführen kann, da der allererste Fehler die Schleife beendet und den Stream abonniert. Es gibt nichts zu tun. Sie können handleError() , um Fehler aus dem Stream zu entfernen, bevor Sie ihn in der Wait-for- Schleife verwenden.


Transform () Funktion


Die transform () -Funktion dient nicht nur zur Fehlerbehandlung. Es ist eine allgemeinere "Karte" für Streams. Eine normale Karte erfordert einen Wert für jedes eingehende Ereignis. Insbesondere für E / A-Streams können jedoch mehrere eingehende Ereignisse erforderlich sein, um ein Ausgabeereignis zu erstellen. StreamTransformer kann dabei helfen. Decoder wie Utf8Decoder sind beispielsweise Transformatoren. Ein Transformator benötigt nur eine bind () -Funktion, die einfach über eine asynchrone Funktion implementiert werden kann.


 Stream<S> mapLogErrors<S, T>( Stream<T> stream, S Function(T event) convert, ) async* { var streamWithoutErrors = stream.handleError((e) => log(e)); await for (var event in streamWithoutErrors) { yield convert(event); } } 

Lesen und Dekodieren einer Datei


Der folgende Code liest die Datei und führt zwei Konvertierungen im Stream durch. Zuerst werden Daten aus UTF8 konvertiert und dann über LineSplitter weitergeleitet . Alle Zeilen werden gedruckt, mit Ausnahme derjenigen, die mit einem Hashtag ( # ) beginnen.


 import 'dart:convert'; import 'dart:io'; Future<void> main(List<String> args) async { var file = File(args[0]); var lines = file .openRead() .transform(utf8.decoder) .transform(LineSplitter()); await for (var line in lines) { if (!line.startsWith('#')) print(line); } } 

Listen () -Methode


Die listen() -Methode ist eine "Low-Level" -Methode, alle anderen Funktionen der Funktion werden durch listen() .


 StreamSubscription<T> listen(void Function(T event) onData, {Function onError, void Function() onDone, bool cancelOnError}); 

Um einen neuen Stream-Typ zu erstellen, können Sie einfach die Stream Klasse erben und die listen() -Methode implementieren. Alle anderen Stream Methoden rufen listen() auf, um zu funktionieren.


Mit der listen () -Methode können Sie den Stream abhören. Bis Sie dies tun, ist der Stream ein inertes Objekt, das beschreibt, welche Ereignisse Sie abhören möchten. Beim Abhören wird ein StreamSubscription- Objekt zurückgegeben , das den aktiven Stream darstellt, der Ereignisse generiert. Dies ähnelt der Iterable , dass Iterable nur eine Sammlung von Objekten ist und ein Iterator die eigentliche Iteration Iterable .


Sie können das Abonnieren eines Streams beenden, ihn nach einer Pause fortsetzen und ihn vollständig kündigen. Sie können Rückrufe angeben, die für jedes Datenereignis oder Fehlerereignis sowie beim Schließen des Streams aufgerufen werden.




Was noch zu lesen?


Dart 2. Asynchrone Programmierung: Futures

Source: https://habr.com/ru/post/de442274/


All Articles