Implementasi Kotlin Flow dalam C #

gambar


Halo semuanya!


Dalam beberapa tahun terakhir, saya telah mengembangkan untuk Android di Kotlin. Belum lama ini, karena kurangnya RxJava di multiplatform Kotlin, kami mulai menggunakan coroutine dan aliran - aliran dingin untuk Kotlin di luar kotak. Sebelum Android, saya menghabiskan bertahun-tahun dengan C #, dan sudah ada coroutine saya di sana untuk waktu yang sangat lama, hanya saja mereka tidak dipanggil seperti itu. Tapi saya tidak mendengar tentang analog aliran di async / menunggu. Alat utama untuk pemrograman reaktif adalah Rx.Net (sebenarnya, rx lahir di sini). Jadi saya memutuskan untuk nostalgia dan mencoba melihat sepeda.


Lebih lanjut dipahami bahwa pembaca mengetahui hal-hal yang telah dibahas pada paragraf sebelumnya. Untuk yang tidak sabar - segera tautkan ke repositori .


Penafian: kode ini tidak dimaksudkan untuk digunakan dalam produksi. Ini adalah konsep, tidak lebih. Sesuatu mungkin tidak berfungsi persis seperti yang dimaksudkan.


IFlow dan IFlowCollector


Baiklah, mari kita mulai dengan menulis ulang antarmuka Flow dan FlowCollector di C # pada dahi.
Itu:


interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) } 

Itu menjadi:


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); } 

Saya percaya perbedaan dapat dimengerti dan dijelaskan oleh implementasi asynchrony yang berbeda.


Untuk menggunakan antarmuka ini, mereka harus diimplementasikan. Inilah yang terjadi:


  internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } } 

Dalam konstruktor aliran, kami melewati fungsi yang akan memancarkan nilai. Dan untuk konstruktor kolektor, sebuah fungsi yang akan memproses setiap nilai yang diemisikan.


Anda bisa menggunakannya seperti ini


 var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector); 

Saya pikir semuanya jelas dalam kode di atas. Pertama kita buat Flow, lalu buat kolektor (handler untuk setiap elemen). Lalu kita mulai Flow, setelah "menandatangani" seorang kolektor di atasnya. Jika Anda menambahkan sedikit gula (lihat github), kami mendapatkan sesuatu seperti ini:


 await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine); 

Di Kotlin terlihat seperti ini:


 scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } } 

Secara pribadi, saya kebanyakan tidak suka opsi pada Sharpe untuk secara eksplisit menunjukkan jenis elemen saat membuat aliran. Tapi intinya di sini bukanlah inferensi tipe di Kotlin yang jauh lebih curam. Fungsi aliran terlihat seperti ini:


 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) 

Seperti yang dapat kita lihat, parameter blok ditandai dengan anotasi BuilderInference, yang memberi tahu kompiler bahwa jenisnya harus diambil dari parameter ini. Apakah ada yang tahu jika dimungkinkan untuk mengajukan yang serupa untuk C # di Roslyn?


PembatalanToken


Di rx ada langganan tempat Anda dapat berhenti berlangganan. Di Kotlin Flow, Ayub bertanggung jawab atas pembatalan, yang dikembalikan oleh pembangun, atau Lingkup Coroutine. Kami juga pasti membutuhkan alat yang memungkinkan Flow menyelesaikan lebih awal. Di C #, untuk membatalkan operasi yang tidak sinkron, saya tidak takut dengan kata ini, pola Pembatalan Token digunakan. PembatalanToken adalah kelas yang objeknya memberikan informasi kepada operasi asinkron yang telah dibatalkan. Itu melemparkan dirinya ke dalam operasi asinkron saat startup, dan operasi ini sendiri menjaga kondisinya. Dan negara berubah dari luar.


Singkatnya, kita perlu membuang CancuteToken ke dalam Flow dan FlowCollector kita.


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); } 

Saya tidak akan menempelkan implementasinya di sini - lihat github.
Tes sekarang akan terlihat seperti ini:


  var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask; 

Intinya adalah ini. Sejalan dengan Flow, kami memulai operasi yang akan membatalkannya setelah 3 detik. Akibatnya, Flow tidak berhasil memancarkan elemen ketiga dan berakhir dengan TaskCanceledException, yang merupakan perilaku yang diperlukan.


Sedikit latihan


Mari kita coba gunakan apa yang terjadi dalam latihan. Misalnya, bungkus beberapa acara di Alur kami. Di Rx.Net bahkan ada metode pustaka FromEventPattern untuk ini.


Agar tidak mengacaukan dengan UI yang sebenarnya, saya menulis kelas ClicksEmulator, yang menghasilkan klik mouse bersyarat secara berkala.


  public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs { //… public int X { get; } public int Y { get; } public Button Button { get; } } public event EventHandler<ClickEventArgs> ButtonClick; public async Task Start(CancellationToken cancellationToken = default) {… } } 

Saya mengabaikan implementasi sebagai dia tidak terlalu penting di sini. Yang utama adalah event ButtonClick, yang ingin kita ubah menjadi Flow. Untuk ini kami menulis metode ekstensi


 public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); } 

Pertama, kami mendeklarasikan event handler yang tidak melakukan apa-apa selain meneruskan argumen acara kepada kolektor. Kemudian kami berlangganan acara dan mendaftarkan berhenti berlangganan jika terjadi pembatalan (penyelesaian) aliran. Nah, maka kita menunggu tanpa henti dan mendengarkan acara ButtonClick sampai cancellationToken menyala.


Jika Anda menggunakan callbackFlow atau channelFlow di Kotlin atau membuat cold Observable dari pendengar di Rx, maka Anda akan melihat bahwa struktur kode sangat mirip dalam semua kasus. Ini baik-baik saja, tetapi muncul pertanyaan - apa yang lebih baik dalam hal ini daripada Flow daripada peristiwa kasar? Semua kekuatan aliran jet terletak pada operator yang melakukan berbagai transformasi pada mereka: penyaringan, pemetaan, dan banyak lainnya, yang lebih kompleks. Tapi kami belum memilikinya. Mari kita coba melakukan sesuatu.


Saring, Peta, OnNext


Mari kita mulai dengan salah satu operator paling sederhana - Filter. Itu, seperti namanya, akan menyaring elemen aliran sesuai dengan predikat yang diberikan. Ini akan menjadi metode ekstensi yang diterapkan pada aliran asli dan aliran balik dengan elemen yang difilter saja. Ternyata kita perlu mengambil setiap elemen dari aliran asli, memeriksa, dan memancarkan lebih jauh jika predikat mengembalikan true. Jadi mari kita lakukan:


  public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) ); 

Sekarang, jika kita hanya perlu mengklik tombol kiri mouse, kita dapat menulis ini:


 emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token); 

Dengan analogi, kami menulis Peta operator dan OnNext. Yang pertama mengkonversi setiap elemen dari aliran asli ke yang lain menggunakan fungsi mapper yang dilewati. Yang kedua akan mengembalikan aliran dengan elemen yang sama dengan yang asli, tetapi melakukan tindakan pada masing-masing (biasanya logging).

  public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) ); 

Dan contoh penggunaan:


 emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token); 

Secara umum, banyak operator diciptakan untuk aliran jet, mereka dapat ditemukan, misalnya di sini .


Dan tidak ada yang mencegah untuk menerapkan salah satu dari mereka untuk IFlow.


Mereka yang akrab dengan Rx.Net tahu bahwa di sana, selain operator baru dan spesifik untuk IObservable, metode ekstensi dari Linq-to-object digunakan, dan ini memungkinkan Anda untuk mempertimbangkan stream sebagai "kumpulan acara" dan memanipulasi mereka dengan Linq yang biasa metode. Mengapa, alih-alih menulis pernyataan sendiri, jangan mencoba menempatkan IFlow di rel Linq?


IAsyncEnumerable


Dalam C # 8, versi asynchronous dari IEnumerable diperkenalkan - IAsyncEnumerable - antarmuka koleksi, yang dapat diulang secara tidak sinkron. Perbedaan mendasar antara aliran IAsyncEnumerable dan reaktif (IObservable dan IFlow) adalah ini. IAsyncEnumerable, seperti IEnumerable, adalah model penarik. Kami mengulangi koleksi berapa banyak dan kapan kami membutuhkan dan kami mengambil elemen dari itu sendiri. Streaming didorong. Kami berlangganan acara dan "bereaksi" terhadap mereka ketika mereka datang - untuk itu mereka reaktif. Namun, perilaku seperti mendorong dapat dicapai dari model tarikan. Ini disebut polling panjang https://en.wikipedia.org/wiki/Push_technology#Long_polling . Intinya adalah ini: kita beralih ke koleksi, meminta elemen berikutnya dan menunggu selama yang kita inginkan sampai koleksi mengembalikannya kepada kita, yaitu sampai acara berikutnya tiba. IAsyncEnumerable, tidak seperti IEnumerable, akan memungkinkan kita untuk menunggu secara tidak sinkron. Singkatnya, kita perlu entah bagaimana menarik IAsyncEnumerable di IFlow.


Seperti yang Anda ketahui, antarmuka IAsyncEnumerator bertanggung jawab untuk mengembalikan elemen saat ini dari koleksi IAsyncEnumerable dan pindah ke elemen berikutnya. Dalam hal ini, kita perlu mengambil elemen dari IFlow, dan IFlowCollector melakukan ini. Ternyata di sini adalah objek yang mengimplementasikan antarmuka ini:


 internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } } 

Metode utama di sini adalah Emit , Finish, dan MoveNextAsync .
Emit di awal sedang menunggu saat item berikutnya dari koleksi akan diminta. Yaitu Jangan memancarkan item sampai dibutuhkan. Ini disebut tekanan balik, karenanya nama semafor. Kemudian item saat ini diatur dan dilaporkan bahwa permintaan pemungutan suara yang panjang bisa mendapatkan hasilnya.
MoveNextAsync dipanggil saat item lain ditarik dari koleksi. Dia merilis _backpressureSemaphore dan menunggu Flow untuk memicu elemen berikutnya. Kemudian mengembalikan tanda bahwa koleksi telah berakhir. Bendera ini menetapkan metode Selesai.


Selesai bekerja berdasarkan prinsip yang sama dengan Emit, hanya sebagai ganti elemen berikutnya ia menetapkan tanda akhir koleksi.


Sekarang kita perlu menggunakan kelas ini.


 public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } } 

Metode ekstensi ExtensionEnumerable untuk IFlow membuat FlowCollectorEnumerator dan menandatangani aliran di atasnya, setelah itu metode Finish () dipanggil. Dan ia mengembalikan FlowEnumerableAdapter, yang merupakan implementasi paling sederhana dari IAsyncEnumerable, menggunakan FlowCollectorEnumerator sebagai IEnumerator.
Kami mencoba apa yang terjadi.


 var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); } 

Di sini kita mendapatkan klik Arus (), catat setiap klik, lalu ubah IFlow menjadi IAsyncEnumerable. Kemudian itu berlaku operator Linq terkenal: kita hanya menyisakan klik kanan dan kita mendapatkan di bagian mana dari layar mereka dibuat.


Selanjutnya, pertimbangkan contoh yang lebih rumit. Kami akan mengganti klik kanan dengan yang kiri ganda. Yaitu kita perlu memetakan setiap elemen bukan untuk yang lain, tetapi untuk koleksi. Atau di Flow, dikonversi ke koleksi.


 var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); } 

Untuk melakukan ini, ada operator SelectMany di Linq. Mitranya dalam stream jet adalah FlatMap. Pertama, petakan setiap klik dalam IFlow: untuk klik kiri - Alirkan dengan satu klik ini, untuk kanan - Alirkan dari dua klik kiri dengan penundaan di antara mereka. Dan kemudian di SelectMany kita mengubah IFlow menjadi IAyncEnumerable.


Dan itu berhasil! Yaitu banyak operator tidak harus diimplementasikan untuk IFlow - Anda dapat menggunakan Linq.


Kesimpulan


Rx.Net - adalah dan tetap menjadi alat utama ketika bekerja dengan urutan peristiwa yang tidak sinkron dalam C #. Tapi ini adalah perpustakaan yang agak besar dalam hal ukuran kode. Seperti yang kita lihat, fungsionalitas yang sama dapat diperoleh lebih sederhana - hanya dua antarmuka ditambah beberapa yang mengikat. Ini dimungkinkan berkat penggunaan fitur bahasa - async / menunggu. Ketika Rx lahir, fitur ini belum dibawa ke C #.


Terima kasih atas perhatian anda!

Source: https://habr.com/ru/post/id473258/


All Articles