البرمجة غير المتزامنة: تدفقات البيانات
المحتويات
ما هو المهم:
- توفر التدفقات تسلسل بيانات غير متزامن.
- تحتوي تسلسل البيانات على أحداث المستخدم والبيانات المقروءة من الملفات.
- يمكن معالجة الدفق باستخدام انتظار أو
listen()
من Stream
API. - توفر التدفقات طريقة للرد على الأخطاء.
- هناك نوعان من التدفقات: دفق
single subscription
والبث.
تتميز البرمجة غير المتزامنة في دارت بفئات المستقبل والدفق .
Future
هو حساب مؤجل. في حالة إرجاع دالة عادية لنتيجة ، تقوم الدالة غير المتزامنة بإرجاع كائن Future
( future
) الذي سيحتوي في النهاية على النتيجة. future
سيعود النتيجة عند اكتمال العملية.
الدفق هو سلسلة من الأحداث غير المتزامنة. يشبه هذا كائن غير متزامن Iterable
، حيث يقوم مؤشر الترابط بالإبلاغ عن الحدث عندما يكون جاهزًا بدلاً من تلقي الحدث التالي عند طلبه.
تلقي الأحداث تيار
يمكن إنشاء التدفقات بطرق مختلفة ، وهو موضوع مقال آخر ، لكن يمكن استخدامها جميعًا بالطريقة نفسها: متزامن للحلقة (تسمى عادةً تنتظر ) تتكرر أحداث الدفق ، مثل تكرار الحلقات من خلال مجموعة. على سبيل المثال:
Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (var value in stream) { sum += value; } return sum; }
يتلقى هذا الرمز ببساطة الحدث الصحيح من الدفق ، ويضيفهم ويعيد المبلغ ( future
). عندما ينتهي نص الحلقة ، تتوقف الوظيفة مؤقتًا حتى ينتهي الحدث أو الخيط التالي.
يتم تمييز الوظيفة async
، والتي تكون مطلوبة عند استخدام انتظار الحلقة.
المثال التالي (على DartPad ) يتحقق من الكود السابق عن طريق إنشاء دفق بسيط من الأعداد الصحيحة باستخدام دالة مع async*
( note generator ):
أحداث خطأ
يتم تنفيذ مؤشرات الترابط عند عدم وجود المزيد من الأحداث فيها ، ويتم إخطار الكود الذي يستقبل الأحداث بذلك بنفس طريقة إخطاره بوصول حدث جديد. عند قراءة الأحداث بانتظار ، تنتهي الحلقة عندما ينتهي الدفق.
في بعض الحالات ، يحدث خطأ قبل انتهاء الدفق ؛ ربما كان هناك فشل في الشبكة أثناء استرداد ملف من خادم بعيد أو أن الكود الذي أنشأ الأحداث يحتوي على خطأ ، يجب أن يعرف شخص ما عن ذلك.
يمكن للتدفقات الإبلاغ عن حدث خطأ بنفس طريقة أحداث البيانات. تتوقف معظم مؤشرات الترابط بعد الخطأ الأول ، ولكن مؤشرات الترابط التي تُرجع أكثر من خطأ واحد ومؤشرات الترابط التي تُبلغ عن بيانات بعد حدث خطأ ممكنة. في هذا المستند ، نناقش فقط مؤشرات الترابط التي لا تُرجع أكثر من خطأ واحد.
عند قراءة دفق باستخدام تنتظر ، يتم إلقاء خطأ من قبل مشغل الحلقة. هذا يكمل أيضا الحلقة. يمكنك التقاط الخطأ باستخدام try-catch. في المثال التالي (على DartPad ) ، يحدث خطأ إذا كان تكرار حلقة 4:
العمل مع تيارات
تحتوي فئة Stream
على عدد من طرق المساعدة التي يمكن أن تؤدي عمليات عامة على دفق ، على غرار طرق Iterable . على سبيل المثال ، يمكنك العثور على أصغر عدد صحيح موجب في دفق باستخدام lastWhere()
من Stream
API.
Future<int> lastPositive(Stream<int> stream) => stream.lastWhere((x) => x >= 0);
أنواع الدفق
تيارات الاشتراك
يحتوي النوع الأكثر شيوعًا من الدفق على سلسلة من الأحداث التي تشكل أجزاء من الكل أكبر. يجب تسليم الأحداث بالترتيب الصحيح دون فقد أي منها. هذا هو نوع التدفق الذي تتلقاه عند قراءة ملف أو تلقي طلب ويب.
لا يمكن الاستماع إلى مثل هذا الدفق مرة واحدة. قد يعني الاستماع لاحقًا تخطي الأحداث الأولية ، ثم ما تبقى من البث غير منطقي. عندما تبدأ الاستماع ، سيتم استخراج البيانات وتقديمها في قطع.
تيارات البث
نوع آخر من الدفق هو للرسائل الفردية التي يمكن معالجتها واحدة في وقت واحد. يمكن استخدام هذا الدفق ، على سبيل المثال ، لأحداث الماوس في المستعرض.
يمكنك البدء في الاستماع إلى هذا الدفق في أي وقت ، وسوف تتلقى الأحداث التي وقعت أثناء الاستماع. تيار يمكن الاستماع إلى العديد من المستمعين. يمكنك البدء في الاستماع إلى بث الأحداث مرة أخرى بعد إلغاء اشتراك سابق.
طرق معالجة التدفق
الطرق التالية في الدفق <T> معالجة الدفق وإرجاع النتيجة:
Future<T> get first; Future<bool> get isEmpty; Future<T> get last; Future<int> get length; Future<T> get single; Future<bool> any(bool Function(T element) test); Future<bool> contains(Object needle); Future<E> drain<E>([E futureValue]); Future<T> elementAt(int index); Future<bool> every(bool Function(T element) test); Future<T> firstWhere(bool Function(T element) test, {T Function() orElse}); Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine); Future forEach(void Function(T element) action); Future<String> join([String separator = ""]); Future<T> lastWhere(bool Function(T element) test, {T Function() orElse}); Future pipe(StreamConsumer<T> streamConsumer); Future<T> reduce(T Function(T previous, T element) combine); Future<T> singleWhere(bool Function(T element) test, {T Function() orElse}); Future<List<T>> toList(); Future<Set<T>> toSet();
جميع هذه الوظائف ، باستثناء drain()
pipe()
، تتوافق مع وظيفة مماثلة في Iterable . يمكن كتابة كل منها بسهولة باستخدام وظيفة غير متزامنة مع انتظار حلقة (أو ببساطة باستخدام إحدى الطرق الأخرى). على سبيل المثال ، قد تكون بعض التطبيقات كما يلي:
جميع هذه الوظائف ، باستثناء الصرف () والأنابيب () ، تتوافق مع وظيفة مماثلة على Iterable. يمكن كتابة كل واحدة بسهولة باستخدام وظيفة غير متزامنة مع انتظار حلقة (أو فقط باستخدام إحدى الطرق الأخرى). على سبيل المثال ، يمكن أن تكون بعض التطبيقات:
Future<bool> contains(Object needle) async { await for (var event in this) { if (event == needle) return true; } return false; } Future forEach(void Function(T element) action) async { await for (var event in this) { action(event); } } Future<List<T>> toList() async { final result = <T>[]; await this.forEach(result.add); return result; } Future<String> join([String separator = ""]) async => (await this.toList()).join(separator);
(التنفيذ الفعلي أكثر تعقيدًا قليلاً ، ولكن لأسباب تاريخية بشكل أساسي.)
طرق تعديل التدفق
الأساليب التالية في دفق إرجاع دفق جديد استناداً إلى الدفق الأصلي. ينتظر كل منهم أن يستمع شخص ما إلى الدفق الجديد قبل الاستماع إلى النص الأصلي.
Stream<R> cast<R>(); Stream<S> expand<S>(Iterable<S> Function(T element) convert); Stream<S> map<S>(S Function(T event) convert); Stream<R> retype<R>(); Stream<T> skip(int count); Stream<T> skipWhile(bool Function(T element) test); Stream<T> take(int count); Stream<T> takeWhile(bool Function(T element) test); Stream<T> where(bool Function(T event) test);
الأساليب المذكورة أعلاه تتوافق مع الأساليب المماثلة في Iterable ، والتي تحول الكائن القابل للتكرار إلى كائن آخر قابل للتكرار. كل هذا يمكن كتابته بسهولة باستخدام وظيفة غير متزامنة مع انتظار حلقة.
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert); Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert); Stream<T> distinct([bool Function(T previous, T next) equals]);
asyncExpand()
و asyncMap()
الدالتين expand()
و map()
، لكن تسمحان للوسيطة الدالة بأن تكون دالة غير متزامنة. لا توجد وظائف distinct()
في Iterable ، لكن يمكن تنفيذها.
Stream<T> handleError(Function onError, {bool test(error)}); Stream<T> timeout(Duration timeLimit, {void Function(EventSink<T> sink) onTimeout}); Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
آخر ثلاث وظائف هي أكثر تحديدا. يتضمن ذلك معالجة الأخطاء التي يتعذر على انتظار الحلقة تنفيذها ، لأن الخطأ الأول ينهي الحلقة ويشترك في الدفق. لا يوجد شيء يجب القيام به حيال ذلك. يمكنك handleError()
لإزالة الأخطاء من الدفق قبل استخدامها في انتظار الحلقة.
وظيفة convert () ليست فقط لمعالجة الأخطاء ؛ إنها "خريطة" أكثر تعميماً للتيارات. تتطلب الخريطة العادية قيمة واحدة لكل حدث وارد. ومع ذلك ، خاصة بالنسبة لتدفقات الإدخال / الإخراج ، قد تكون هناك حاجة إلى أحداث واردة متعددة لإنشاء حدث إخراج. StreamTransformer يمكن أن تساعد في هذا. على سبيل المثال ، وحدات فك الترميز مثل Utf8Decoder هي محولات. يتطلب المحول وظيفة bind () واحدة فقط ، والتي يمكن تنفيذها بسهولة من خلال وظيفة غير متزامنة.
Stream<S> mapLogErrors<S, T>( Stream<T> stream, S Function(T event) convert, ) async* { var streamWithoutErrors = stream.handleError((e) => log(e)); await for (var event in streamWithoutErrors) { yield convert(event); } }
قراءة وفك تشفير الملف
تقوم التعليمة البرمجية التالية بقراءة الملف وتنفيذ تحويلين في الدفق. أولاً ، يحول البيانات من UTF8 ، ثم يمر عبر LineSplitter . تتم طباعة جميع الخطوط ، باستثناء تلك التي تبدأ بعلامة هاشتاج ( #
).
import 'dart:convert'; import 'dart:io'; Future<void> main(List<String> args) async { var file = File(args[0]); var lines = file .openRead() .transform(utf8.decoder) .transform(LineSplitter()); await for (var line in lines) { if (!line.startsWith('#')) print(line); } }
طريقة الاستماع ()
طريقة listen()
هي طريقة "منخفضة المستوى" ، ويتم تحديد جميع وظائف الوظيفة الأخرى من خلال listen()
.
StreamSubscription<T> listen(void Function(T event) onData, {Function onError, void Function() onDone, bool cancelOnError});
لإنشاء نوع جديد من الدفق ، يمكنك ببساطة ترث فئة Stream
وتنفيذ طريقة listen()
، وتدعو جميع أساليب Stream
الأخرى listen()
إلى العمل.
تسمح لك طريقة الاستماع () ببدء الاستماع إلى البث. حتى تقوم بذلك ، يعد التدفق كائنًا خاملًا يصف الأحداث التي ترغب في الاستماع إليها. عند الاستماع ، يتم إرجاع كائن StreamSubscription يمثل الدفق النشط الذي يقوم بإنشاء الأحداث. هذا يشبه كيف Iterable
هو مجرد مجموعة من الأشياء ، والتكرار هو الذي يقوم بالتكرار الفعلي.
يمكنك إيقاف الاشتراك في دفق ، واستئنافه بعد توقف مؤقت ، وإلغائه تمامًا. يمكنك تحديد عمليات رد الاتصال التي سيتم استدعاؤها لكل حدث بيانات أو حدث خطأ ، وكذلك عند إغلاق الدفق.
ماذا تقرأ؟
دارت 2. البرمجة غير المتزامنة: العقود الآجلة