Pemrograman Asinkron: Aliran Data
Isi
Yang penting:
- Streaming menyediakan urutan data yang tidak sinkron.
- Urutan data berisi peristiwa pengguna dan data dibaca dari file.
- Streaming dapat diproses menggunakan menunggu atau
listen()
dari Stream
API. - Streaming menyediakan cara untuk merespons kesalahan.
- Ada dua jenis aliran: aliran
single subscription
dan siaran.
Pemrograman asinkron di Dart ditandai oleh kelas Future dan Stream .
Future
adalah perhitungan yang ditangguhkan. Jika fungsi normal mengembalikan hasil, fungsi asinkron mengembalikan objek Future
( future
) yang pada akhirnya akan berisi hasil. future
akan mengembalikan hasilnya ketika operasi selesai.
Aliran adalah urutan peristiwa asinkron. Ini mirip dengan objek asinkron Iterable
, di mana alih-alih menerima acara berikutnya saat Anda memintanya, utas melaporkan peristiwa saat siap.
Menerima Acara Aliran
Streaming dapat dibuat dengan cara yang berbeda, yang merupakan topik dari artikel lain, tetapi semuanya dapat digunakan dengan cara yang sama: sebuah asinkron untuk loop (biasanya disebut menunggu ) mengulangi stream event, seperti untuk loop yang berulang melalui koleksi. Sebagai contoh:
Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (var value in stream) { sum += value; } return sum; }
Kode ini hanya menerima acara integer dari aliran, menambahkannya dan mengembalikan jumlah ( future
). Ketika badan loop berakhir, fungsi berhenti sampai acara berikutnya atau utas berakhir.
Fungsi ditandai dengan async
, yang diperlukan saat menggunakan menunggu untuk loop.
Contoh berikut (pada DartPad ) memeriksa kode sebelumnya dengan membuat aliran sederhana bilangan bulat menggunakan fungsi dengan async*
( generator catatan ):
Acara Galat
Utas dieksekusi ketika tidak ada lagi peristiwa di dalamnya, dan kode yang menerima peristiwa tersebut diberitahu tentang ini dengan cara yang sama seperti diberitahukan tentang kedatangan acara baru. Saat membaca acara dengan menunggu, loop berakhir ketika aliran berakhir.
Dalam beberapa kasus, kesalahan terjadi sebelum aliran selesai; mungkin kegagalan jaringan saat mengambil file dari server jauh atau kode yang menghasilkan peristiwa mengandung kesalahan, seseorang harus mengetahuinya.
Streaming dapat melaporkan peristiwa kesalahan dengan cara yang sama seperti peristiwa data. Kebanyakan utas berhenti setelah kesalahan pertama, tetapi utas yang mengembalikan lebih dari satu kesalahan dan utas yang melaporkan data setelah peristiwa kesalahan dimungkinkan. Dalam dokumen ini, kami hanya membahas utas yang mengembalikan tidak lebih dari satu kesalahan.
Saat membaca aliran menggunakan menunggu, kesalahan dilemparkan oleh operator loop. Ini juga melengkapi loop. Anda dapat menangkap kesalahan dengan try-catch. Dalam contoh berikut (pada DartPad ), kesalahan terjadi jika loop iterator adalah 4:
Bekerja dengan stream
Kelas Stream
berisi sejumlah metode penolong yang dapat melakukan operasi umum pada aliran, mirip dengan metode Iterable . Misalnya, Anda dapat menemukan bilangan bulat positif terkecil dalam aliran menggunakan lastWhere()
dari Stream
API.
Future<int> lastPositive(Stream<int> stream) => stream.lastWhere((x) => x >= 0);
Jenis aliran
Aliran Berlangganan
Jenis aliran yang paling umum berisi urutan peristiwa yang merupakan bagian dari keseluruhan yang lebih besar. Acara harus disampaikan dalam urutan yang benar tanpa melewatkannya. Ini adalah jenis aliran yang Anda terima saat membaca file atau menerima permintaan web.
Aliran seperti itu hanya dapat didengarkan sekali. Mendengarkan nanti bisa berarti melewatkan acara awal, dan kemudian aliran yang lain tidak masuk akal. Ketika Anda mulai mendengarkan, data akan diekstraksi dan disediakan dalam beberapa bagian.
Aliran siaran
Jenis aliran lainnya adalah untuk pesan individual yang dapat diproses satu per satu. Aliran semacam itu dapat digunakan, misalnya, untuk acara mouse di browser.
Anda dapat mulai mendengarkan aliran seperti itu kapan saja, dan Anda akan menerima acara yang terjadi selama mendengarkan. Stream dapat mendengarkan beberapa pendengar. Anda dapat mulai mendengarkan acara streaming lagi setelah membatalkan langganan sebelumnya.
Metode pemrosesan aliran
Metode berikut dalam Streaming <T> memproses aliran dan mengembalikan hasilnya:
Future<T> get first; Future<bool> get isEmpty; Future<T> get last; Future<int> get length; Future<T> get single; Future<bool> any(bool Function(T element) test); Future<bool> contains(Object needle); Future<E> drain<E>([E futureValue]); Future<T> elementAt(int index); Future<bool> every(bool Function(T element) test); Future<T> firstWhere(bool Function(T element) test, {T Function() orElse}); Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine); Future forEach(void Function(T element) action); Future<String> join([String separator = ""]); Future<T> lastWhere(bool Function(T element) test, {T Function() orElse}); Future pipe(StreamConsumer<T> streamConsumer); Future<T> reduce(T Function(T previous, T element) combine); Future<T> singleWhere(bool Function(T element) test, {T Function() orElse}); Future<List<T>> toList(); Future<Set<T>> toSet();
Semua fungsi ini, kecuali drain()
dan pipe()
, sesuai dengan fungsi serupa di Iterable . Masing-masing dapat dengan mudah ditulis menggunakan fungsi asinkron dengan menunggu loop (atau hanya menggunakan salah satu metode lain). Misalnya, beberapa implementasi mungkin sebagai berikut:
Semua fungsi ini, kecuali tiriskan () dan pipa (), sesuai dengan fungsi serupa pada Iterable. Masing-masing dapat ditulis dengan mudah dengan menggunakan fungsi async dengan menunggu loop (atau hanya menggunakan salah satu metode lain). Misalnya, beberapa implementasi dapat berupa:
Future<bool> contains(Object needle) async { await for (var event in this) { if (event == needle) return true; } return false; } Future forEach(void Function(T element) action) async { await for (var event in this) { action(event); } } Future<List<T>> toList() async { final result = <T>[]; await this.forEach(result.add); return result; } Future<String> join([String separator = ""]) async => (await this.toList()).join(separator);
(Implementasi aktual sedikit lebih rumit, tetapi terutama karena alasan historis.)
Metode modifikasi aliran
Metode berikut dalam Streaming mengembalikan aliran baru berdasarkan aliran asli. Masing-masing dari mereka menunggu seseorang untuk mendengarkan aliran baru sebelum mendengarkan yang asli.
Stream<R> cast<R>(); Stream<S> expand<S>(Iterable<S> Function(T element) convert); Stream<S> map<S>(S Function(T event) convert); Stream<R> retype<R>(); Stream<T> skip(int count); Stream<T> skipWhile(bool Function(T element) test); Stream<T> take(int count); Stream<T> takeWhile(bool Function(T element) test); Stream<T> where(bool Function(T event) test);
Metode di atas sesuai dengan metode serupa di Iterable , yang mengubah objek iterable menjadi objek iterable lainnya. Semua ini dapat dengan mudah ditulis menggunakan fungsi asinkron dengan menunggu loop.
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert); Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert); Stream<T> distinct([bool Function(T previous, T next) equals]);
Fungsi asyncExpand()
dan asyncMap()
mirip dengan fungsi expand()
dan map()
, tetapi memungkinkan argumen fungsi menjadi fungsi asinkron. fungsi-fungsi yang distinct()
tidak ada di Iterable, tetapi dapat diimplementasikan.
Stream<T> handleError(Function onError, {bool test(error)}); Stream<T> timeout(Duration timeLimit, {void Function(EventSink<T> sink) onTimeout}); Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
Tiga fungsi terakhir lebih spesifik. Ini termasuk penanganan kesalahan yang menunggu loop tidak dapat dilakukan, karena kesalahan pertama mengakhiri loop dan berlangganan aliran. Tidak ada yang bisa dilakukan tentang itu. Anda dapat handleError()
untuk menghapus kesalahan dari aliran sebelum menggunakannya dalam menunggu loop.
Fungsi transform () tidak hanya untuk penanganan kesalahan; ini adalah "peta" yang lebih umum untuk stream. Peta normal membutuhkan satu nilai untuk setiap peristiwa yang masuk. Namun, terutama untuk aliran I / O, beberapa peristiwa masuk mungkin diperlukan untuk membuat acara keluaran. StreamTransformer dapat membantu dengan ini. Misalnya, decoder seperti Utf8Decoder adalah transformer. Sebuah transformator hanya membutuhkan satu fungsi bind () , yang dapat dengan mudah diimplementasikan melalui fungsi asinkron.
Stream<S> mapLogErrors<S, T>( Stream<T> stream, S Function(T event) convert, ) async* { var streamWithoutErrors = stream.handleError((e) => log(e)); await for (var event in streamWithoutErrors) { yield convert(event); } }
Membaca dan mendekode file
Kode berikut membaca file dan melakukan dua konversi dalam aliran. Pertama, ia mengonversi data dari UTF8, dan kemudian meneruskannya melalui LineSplitter . Semua baris dicetak, kecuali yang dimulai dengan tagar ( #
).
import 'dart:convert'; import 'dart:io'; Future<void> main(List<String> args) async { var file = File(args[0]); var lines = file .openRead() .transform(utf8.decoder) .transform(LineSplitter()); await for (var line in lines) { if (!line.startsWith('#')) print(line); } }
Dengarkan () metode
Metode listen()
adalah metode "level rendah", semua fungsi lain dari fungsi tersebut didefinisikan melalui listen()
.
StreamSubscription<T> listen(void Function(T event) onData, {Function onError, void Function() onDone, bool cancelOnError});
Untuk membuat jenis aliran baru, Anda cukup mewarisi kelas Stream
dan menerapkan metode listen()
, semua metode Stream
lainnya memanggil listen()
untuk bekerja.
Metode Listen () memungkinkan Anda untuk mulai mendengarkan streaming. Sampai Anda melakukan ini, aliran adalah objek lembam yang menjelaskan acara apa yang ingin Anda dengarkan. Saat mendengarkan, objek StreamSubscription dikembalikan yang mewakili aliran aktif yang menghasilkan acara. Ini mirip dengan bagaimana Iterable
hanya kumpulan objek, dan iterator adalah orang yang melakukan iterasi yang sebenarnya.
Anda dapat berhenti berlangganan aliran, melanjutkannya setelah jeda, dan membatalkannya sepenuhnya. Anda dapat menentukan panggilan balik yang akan dipanggil untuk setiap peristiwa data atau peristiwa kesalahan, serta saat aliran ditutup.
Apa lagi yang harus dibaca?
Dart 2. Pemrograman asinkron: masa depan