تنفيذ Kotlin Flow في C #

صورة


مرحبا بالجميع!


في السنوات الأخيرة ، كنت أطور لنظام Android على Kotlin. منذ وقت ليس ببعيد ، نظرًا لعدم وجود RxJava على منصة Kotlin المتعددة ، بدأنا في استخدام coroutines وتدفق بارد لتدفق Kotlin خارج الصندوق. قبل أندرويد ، قضيت عدة سنوات مع C # ، وكانت هناك خطوطي لوقت طويل للغاية ، فقط لا يطلق عليها بهذه الطريقة. لكنني لم أسمع عن التناظرية تدفق على المتزامن / تنتظر. الأداة الرئيسية للبرمجة التفاعلية هي Rx.Net (في الواقع ، ولدت آر إكس هنا). لذلك قررت الحنين ومحاولة رؤية دراجة.


من المفهوم كذلك أن القارئ يدرك الأشياء التي تمت مناقشتها في الفقرة السابقة. لفارغ الصبر - ربط على الفور إلى مستودع .


إخلاء المسئولية: هذا الكود غير مخصص للاستخدام في الإنتاج. هذا مفهوم ، لا شيء أكثر من ذلك. شيء قد لا يعمل بالضبط كما هو المقصود.


IFlow و IFlowCollector


حسنًا ، لنبدأ بإعادة كتابة واجهات Flow و FlowCollector في C # على الجبهة.
كان:


interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) } 

أصبح:


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); } 

أعتقد أن الاختلافات مفهومة ومفسرة من خلال التنفيذ المختلف للتزامن.


لاستخدام هذه الواجهات ، يجب تنفيذها. إليك ما حدث:


  internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } } 

في مُنشئ التدفق ، نقوم بتمرير دالة تنبعث منها القيم. وإلى مُنشئ المجمع ، وظيفة ستقوم بمعالجة كل قيمة منبعثة.


يمكنك استخدامه مثل هذا


 var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector); 

أعتقد أن كل شيء واضح في الرمز أعلاه. أولاً نقوم بإنشاء تدفق ، ثم نقوم بإنشاء أداة تجميع (معالج لكل عنصر). ثم نبدأ التدفق ، بعد أن "وقعنا" جامعًا عليه. إذا أضفت القليل من السكر (انظر جيثب) ، فسنحصل على شيء مثل هذا:


 await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine); 

على Kotlin يبدو مثل هذا:


 scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } } 

أنا شخصياً ، لا أحب الخيار في Sharpe للإشارة بوضوح إلى نوع العنصر عند إنشاء التدفق. لكن النقطة هنا ليست أن الاستدلال النوعي في Kotlin أكثر حدة. وظيفة التدفق تبدو كالتالي:


 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) 

كما نرى ، يتم وضع علامة على المعلمة block مع تعليق توضيحي BuilderInference ، والذي يخبر المترجم أنه يجب أخذ النوع من هذه المعلمة. هل يعرف أحد ما إذا كان من الممكن تقديم ملف مماثل لـ C # على Roslyn؟


CancellationToken


في rx هناك اشتراك يمكنك من خلاله إلغاء الاشتراك. في Kotlin Flow ، يكون Job مسؤولاً عن الإلغاء ، والذي يعود إليه المنشئ ، أو Coroutine Scope. نحتاج أيضًا إلى أداة تسمح بإكمال Flow مبكرًا. في C # ، لإلغاء العمليات غير المتزامنة ، لا أخاف من هذه الكلمة ، يتم استخدام نمط إلغاء الرمز المميز. CancellationToken هي فئة يقدم كائنها معلومات إلى العملية غير المتزامنة التي تم إلغاؤها. إنه يلقي نفسه في عملية غير متزامنة عند بدء التشغيل ، وهذه العملية نفسها تهتم بحالتها. والدولة تتغير من الخارج.


باختصار ، نحن بحاجة إلى رمي CancellationToken في Flow and FlowCollector.


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); } 

لن ألصق التنفيذ هنا - انظر جيثب.
سيبدو الاختبار الآن كالتالي:


  var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask; 

النقطة هي هذا. بالتوازي مع Flow ، نبدأ عملية ستلغيها بعد 3 ثوان. نتيجة لذلك ، لا يتمكن Flow من إرسال العنصر الثالث وينتهي بـ TaskCanceledException ، وهو السلوك المطلوب.


قليلا من الممارسة


دعونا نحاول استخدام ما حدث في الممارسة. على سبيل المثال ، قم بلف بعض الأحداث في تدفقنا. في Rx.Net ، هناك طريقة مكتبة FromEventPattern لهذا الغرض.


لكي لا تعبث مع واجهة المستخدم الحقيقية ، كتبت فئة ClicksEmulator ، والتي تولد نقرات الماوس الشرطية على فترات عشوائية.


  public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs { //… public int X { get; } public int Y { get; } public Button Button { get; } } public event EventHandler<ClickEventArgs> ButtonClick; public async Task Start(CancellationToken cancellationToken = default) {… } } 

لقد حذفت التنفيذ كما انها ليست مهمة جدا هنا. الشيء الرئيسي هو الحدث ButtonClick ، ​​الذي نريد أن يتحول إلى Flow. لهذا نكتب طريقة التمديد


 public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); } 

أولاً ، نعلن أن معالج الأحداث لا يفعل شيئًا سوى تمرير وسيطة الحدث إلى المجمع. ثم نشترك في الأحداث وتسجيل إلغاء الاشتراك في حالة إلغاء (اكتمال) التدفق. حسنًا ، ننتظر إلى ما لا نهاية ونستمع إلى أحداث ButtonClick حتى يتم إلغاء الحذف.


إذا كنت تستخدم callbackFlow أو channelFlow في Kotlin أو قمت بإنشاء cold Observable من المستمعين في Rx ، فستلاحظ أن بنية الكود مشابه جدًا في جميع الحالات. هذا جيد ، لكن السؤال الذي يطرح نفسه - ما هو أفضل في هذه الحالة من Flow من حدث خام؟ تكمن كل قوة التدفقات النفاثة في المشغلين الذين يقومون بإجراء تحويلات مختلفة عليها: التصفية ، ورسم الخرائط ، والعديد من المحولات الأخرى الأكثر تعقيدًا. لكن ليس لدينا بعد. دعونا نحاول القيام بشيء حيال ذلك.


تصفية ، خريطة ، OnNext


لنبدأ مع أحد أبسط المشغلين - تصفية. كما يوحي الاسم ، سيقوم بتصفية عناصر التدفق وفقًا للمسند المعطى. ستكون هذه طريقة تمديد يتم تطبيقها على التدفق الأصلي وإعادة التدفق مع العناصر التي تمت تصفيتها فقط. اتضح أننا نحتاج إلى أخذ كل عنصر من التدفق الأصلي ، والتحقق منه ، والإفصاح عنه أكثر إذا عاد المسند إلى حقيقة. لذلك دعونا نفعل ذلك:


  public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) ); 

الآن ، إذا كنا بحاجة فقط إلى النقر على زر الماوس الأيسر ، يمكننا كتابة هذا:


 emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token); 

عن طريق القياس ، نكتب مشغلي الخريطة و OnNext. الأول يحول كل عنصر من عناصر التدفق الأصلي إلى عنصر آخر باستخدام وظيفة معين معين. سيعود الثاني بالتدفق مع نفس العناصر مثل الأصل ، ولكن تنفيذ إجراء على كل واحد (عادة تسجيل).

  public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) ); 

ومثال على الاستخدام:


 emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token); 

بشكل عام ، تم اختراع الكثير من المشغلين لتدفقات الطائرات النفاثة ، ويمكن العثور عليها ، على سبيل المثال ، هنا .


ولا شيء يمنع تنفيذ أي منها لـ IFlow.


يعرف الأشخاص المطلعون على Rx.Net أنه ، بالإضافة إلى عوامل التشغيل الجديدة والمحددة لـ IObservable ، يتم استخدام طرق التمديد من Linq-to-Objects ، وهذا يتيح لك النظر في التدفقات على أنها "مجموعات من الأحداث" ومعالجتها باستخدام Linq المعتاد -methods. لماذا ، بدلاً من كتابة البيانات بنفسك ، لا تحاول وضع IFlow على سكك Linq؟


IAsyncEnumerable


في C # 8 ، تم تقديم إصدار غير متزامن من IEnumerable - IAsyncEnumerable - واجهة المجموعة ، والتي يمكن تكرارها بشكل غير متزامن. الفرق الأساسي بين تيارات IAsyncEnable و التفاعلية (IObservable و IFlow) هو هذا. IAsyncEnableable ، مثل IEnumerable ، هو نموذج سحب. نقوم بالتكرار على المجموعة كم ومتى نحتاج ونستخلص عناصر منها. تيارات دفع. نحن نشترك في الأحداث و "نرد عليها" عندما تأتي - لأنها تفاعلية. ومع ذلك ، يمكن تحقيق سلوك يشبه الضغط من نموذج السحب. وهذا ما يسمى الاستقصاء الطويل https://en.wikipedia.org/wiki/Push_technology#Long_polling . الجوهر هو هذا: نحن نكرر المجموعة ونطلب عنصرها التالي وننتظر طالما نرغب حتى تعيدها المجموعة إلينا ، أي حتى يأتي الحدث التالي. IAsyncEnableable ، على عكس IEnumerable ، سيسمح لنا بالانتظار بشكل غير متزامن. باختصار ، نحتاج إلى سحب IAsyncEnableable بطريقة ما على IFlow.


كما تعلم ، واجهة IAsyncEnumerator مسؤولة عن إرجاع العنصر الحالي لمجموعة IAsyncEnumerable والانتقال إلى العنصر التالي. في هذه الحالة ، نحتاج إلى أخذ عناصر من IFlow ، ويفعل ذلك IFlowCollector. والنتيجة هي كائن يقوم بتنفيذ هذه الواجهات:


 internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } } 

الطرق الرئيسية هنا هي Emit و Finish و MoveNextAsync .
ينتظر Emit في البداية اللحظة التي سيطلب فيها العنصر التالي من المجموعة. أي لا ينبعث منها عنصر حتى يتم الحاجة إليه. وهذا ما يسمى الضغط الخلفي ، وبالتالي اسم الإشارة. ثم يتم تعيين العنصر الحالي ويبلغ أن طلب الاستقصاء الطويل يمكن أن يحصل على النتيجة.
يتم استدعاء MoveNextAsync عندما يتم سحب عنصر آخر من المجموعة. يقوم بإطلاق _backpressureSemaphore وينتظر Flow لتشغيل العنصر التالي. ثم تقوم بإرجاع علامة على انتهاء المجموعة. تحدد هذه العلامة طريقة الإنهاء.


يعمل Finish وفقًا لمبدأ Emit ، فقط بدلاً من العنصر التالي ، يقوم بتعيين علامة نهاية المجموعة.


الآن نحن بحاجة إلى استخدام هذه الفئة.


 public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } } 

تنشئ طريقة الامتداد ExtensionEnumerable لـ IFlow FlowCollectorEnumerator وتوقع تدفقًا عليها ، ثم تسمى طريقة Finish (). وتقوم بإرجاع FlowEnumerableAdapter ، والذي يعد أبسط تطبيق لـ IAsyncEnumerable ، باستخدام FlowCollectorEnumerator باعتباره IEnumerator.
نحن نحاول ما حدث.


 var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); } 

هنا نحصل على نقرات التدفق () ، ثم نسجل كل نقرة ، ثم نحول IFlow إلى IAsyncEnumerable. ثم ينطبق الأمر على مشغلي Linq المشهورين: نترك فقط النقرات الصحيحة ونحصل على أي جزء من الشاشة مصنوع.


التالي ، النظر في مثال أكثر تعقيدا. سنستبدل النقرة اليمنى بنقرة واحدة على اليسار. أي سنحتاج إلى تعيين كل عنصر ليس لبعض العناصر الأخرى ، ولكن إلى المجموعة. أو في تدفق ، وتحويلها إلى مجموعة.


 var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); } 

للقيام بذلك ، يحتوي Linq على عبارة SelectMany. نظيره في تيارات طائرة هو FlatMap. أولاً ، قم بتخطيط كل نقرة في IFlow: للنقرة اليسرى - التدفق بنقرة واحدة ، لليمين - التدفق من نقرتين إلى اليسار مع تأخير بينهما. ثم في SelectMany نحول IFlow إلى IAyncEnumerable.


وهي تعمل! أي لا يتعين تنفيذ العديد من المشغلين لـ IFlow - يمكنك استخدام Linq.


استنتاج


Rx.Net - كانت ولا تزال الأداة الرئيسية عند العمل مع تسلسل الأحداث غير المتزامن في C #. ولكن هذه مكتبة كبيرة إلى حد ما من حيث حجم الرمز. كما رأينا ، يمكن الحصول على وظائف مماثلة أبسط بكثير - فقط واجهات اثنين بالإضافة إلى بعض الربط. هذا ممكن بفضل استخدام ميزات اللغة - المزامنة / الانتظار. عندما ولدت Rx ، لم يتم جلب هذه الميزة إلى C # حتى الآن.


شكرا لاهتمامكم!

Source: https://habr.com/ru/post/ar473258/


All Articles