
Hallo allerseits!
In den letzten Jahren habe ich für Android auf Kotlin entwickelt. Vor nicht allzu langer Zeit haben wir mangels RxJava auf der Kotlin-Multiplattform begonnen, Coroutinen und Flow-Cold-Streams für Kotlin sofort zu verwenden. Vor Android habe ich viele Jahre mit C # verbracht, und es gab meine Coroutinen schon sehr lange, nur dass sie nicht so genannt werden. Aber ich habe nichts über das Analogon von Flow on Async / Warten gehört. Das Hauptwerkzeug für die reaktive Programmierung ist Rx.Net (tatsächlich wurde rx hier geboren). Also entschied ich mich für Nostalgie und versuchte ein Fahrrad zu sehen.
Es versteht sich ferner, dass der Leser sich der Dinge bewusst ist, die im vorherigen Absatz besprochen wurden. Für Ungeduldige - sofort mit dem Repository verknüpfen .
Haftungsausschluss: Dieser Code ist nicht für die Verwendung in der Produktion vorgesehen. Dies ist ein Konzept, nichts weiter. Möglicherweise funktioniert etwas nicht genau wie beabsichtigt.
IFlow und IFlowCollector
Beginnen wir damit, die Flow- und FlowCollector-Schnittstellen in C # auf der Stirn neu zu schreiben.
Es war:
interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) }
Es wurde:
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); }
Ich glaube, die Unterschiede sind verständlich und werden durch die unterschiedliche Implementierung der Asynchronität erklärt.
Um diese Schnittstellen nutzen zu können, müssen sie implementiert sein. Folgendes ist passiert:
internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } }
Im Konstruktor von flow übergeben wir eine Funktion, die Werte ausgibt. Und für den Konstruktor des Kollektors eine Funktion, die jeden ausgegebenen Wert verarbeitet.
Sie können es so verwenden
var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector);
Ich denke, im obigen Code ist alles klar. Zuerst erstellen wir einen Flow, dann einen Collector (Handler für jedes Element). Dann starten wir Flow, nachdem wir einen Sammler darauf "signiert" haben. Wenn Sie etwas Zucker hinzufügen (siehe Github), erhalten wir ungefähr Folgendes:
await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine);
Auf Kotlin sieht es so aus:
scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } }
Persönlich mag ich vor allem die Option auf Sharpe nicht, den Elementtyp beim Erstellen eines Flows explizit anzugeben. Aber der Punkt hier ist nicht, dass die Typinferenz in Kotlin viel steiler ist. Die Flow-Funktion sieht folgendermaßen aus:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
Wie wir sehen können, ist der Blockparameter mit der BuilderInference-Annotation gekennzeichnet, die dem Compiler mitteilt, dass der Typ diesem Parameter entnommen werden soll. Weiß jemand, ob es möglich ist, ähnliche Dateien für C # auf Roslyn einzureichen?
CancellationToken
In rx gibt es ein Abonnement, von dem Sie sich abmelden können. In Kotlin Flow ist Job für die Stornierung verantwortlich, die vom Builder oder Coroutine Scope zurückgegeben wird. Wir brauchen auch definitiv ein Tool, mit dem Flow frühzeitig abgeschlossen werden kann. Um asynchrone Vorgänge abzubrechen, habe ich in C # keine Angst vor diesem Wort. Es wird das Muster "Stornierungstoken" verwendet. CancellationToken ist eine Klasse, deren Objekt Informationen zu der abgebrochenen asynchronen Operation bereitstellt. Es wirft sich beim Start in eine asynchrone Operation, und diese Operation selbst kümmert sich um ihren Zustand. Und der Zustand ändert sich von außen.
Kurz gesagt, wir müssen das CancellationToken in unseren Flow und FlowCollector werfen.
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); }
Ich werde die Implementierung hier nicht einfügen - siehe Github.
Der Test sieht nun folgendermaßen aus:
var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask;
Der Punkt ist dies. Parallel zu Flow starten wir einen Vorgang, der nach 3 Sekunden abgebrochen wird. Infolgedessen kann Flow das dritte Element nicht ausgeben und endet mit einer TaskCanceledException, die das erforderliche Verhalten darstellt.
Ein bisschen Übung
Versuchen wir, das zu nutzen, was in der Praxis passiert ist. Wickeln Sie beispielsweise ein Ereignis in unseren Flow ein. In Rx.Net gibt es dafür sogar eine Bibliotheksmethode FromEventPattern.
Um mich nicht mit der realen Benutzeroberfläche herumzuschlagen, habe ich die ClicksEmulator-Klasse geschrieben, die in zufälligen Intervallen bedingte Mausklicks generiert.
public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs {
Ich habe die Implementierung als weggelassen Sie ist hier nicht sehr wichtig. Die Hauptsache ist das Ereignis ButtonClick, das wir in Flow verwandeln wollen. Dazu schreiben wir eine Erweiterungsmethode
public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); }
Zuerst deklarieren wir einen Ereignishandler, der nichts anderes tut, als das Argument des Ereignisses an den Kollektor zu übergeben. Dann abonnieren wir Ereignisse und registrieren eine Abmeldung im Falle einer Stornierung (Vervollständigung) des Flusses. Nun, dann warten wir endlos und hören ButtonClick-Ereignisse, bis das CancellationToken ausgelöst wird.
Wenn Sie callbackFlow oder channelFlow in Kotlin verwendet oder Cold Observable von Listenern in Rx erstellt haben, werden Sie feststellen, dass die Codestruktur in allen Fällen sehr ähnlich ist. Das ist in Ordnung, aber es stellt sich die Frage: Was ist in diesem Fall besser als Flow als ein grobes Ereignis? Die ganze Kraft von Jetstreams liegt in Operatoren, die verschiedene Transformationen an ihnen durchführen: Filtern, Mapping und viele andere, komplexere. Aber wir haben sie noch nicht. Versuchen wir etwas dagegen zu unternehmen.
Filter, Karte, OnNext
Beginnen wir mit einem der einfachsten Operatoren - Filter. Wie der Name schon sagt, werden die Flusselemente gemäß dem angegebenen Prädikat gefiltert. Dies ist eine Erweiterungsmethode, die nur mit gefilterten Elementen auf den ursprünglichen Fluss und den Rückfluss angewendet wird. Es stellt sich heraus, dass wir jedes Element aus dem ursprünglichen Fluss nehmen, prüfen und weiter ausgeben müssen, wenn das Prädikat true zurückgibt. Also lass es uns tun:
public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) );
Wenn wir nur mit der linken Maustaste klicken müssen, können wir Folgendes schreiben:
emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);
Analog schreiben wir die Operatoren Map und OnNext. Der erste konvertiert jedes Element des ursprünglichen Flusses mithilfe der übergebenen Mapper-Funktion in ein anderes. Der zweite gibt den Flow mit denselben Elementen wie das Original zurück, führt jedoch für jedes eine Aktion aus (normalerweise Protokollierung).
public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) );
Und ein Anwendungsbeispiel:
emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token);
Im Allgemeinen wurden viele Betreiber für Jetstreams erfunden, sie sind beispielsweise hier zu finden .
Und nichts hindert daran, eines davon für IFlow zu implementieren.
Diejenigen, die mit Rx.Net vertraut sind, wissen, dass dort neben neuen und spezifischen Operatoren für IObservable auch Erweiterungsmethoden von Linq-to-Objects verwendet werden. Auf diese Weise können Sie Streams als „Ereignissammlungen“ betrachten und mit dem üblichen Linq bearbeiten Methoden. Warum sollten Sie nicht versuchen, IFlow auf die Linq-Schienen zu setzen, anstatt die Aussagen selbst zu schreiben?
IAsyncEnumerable
In C # 8 wurde eine asynchrone Version von IEnumerable eingeführt - IAsyncEnumerable - eine Erfassungsschnittstelle, die asynchron iteriert werden kann. Der grundlegende Unterschied zwischen IAsyncEnumerable und reaktiven Streams (IObservable und IFlow) besteht darin. IAsyncEnumerable ist wie IEnumerable ein Pull-Modell. Wir durchlaufen die Sammlung, wie viel und wann wir sie benötigen, und ziehen selbst Elemente daraus. Streams sind Push. Wir abonnieren Ereignisse und „reagieren“ auf sie, wenn sie kommen - dafür sind sie reaktiv. Mit dem Pull-Modell kann jedoch ein Push-ähnliches Verhalten erzielt werden. Dies wird als langes Polling https://en.wikipedia.org/wiki/Push_technology#Long_polling bezeichnet . Das Wesentliche ist Folgendes: Wir durchlaufen die Sammlung, fordern ihr nächstes Element an und warten so lange wir möchten, bis die Sammlung es uns zurückgibt, d. H. bis die nächste Veranstaltung kommt. IAsyncEnumerable ermöglicht es uns im Gegensatz zu IEnumerable, asynchron zu warten. Kurz gesagt, wir müssen IAsyncEnumerable auf IFlow irgendwie ziehen.
Wie Sie wissen, ist die IAsyncEnumerator-Schnittstelle dafür verantwortlich, das aktuelle Element der IAsyncEnumerable-Auflistung zurückzugeben und zum nächsten Element zu wechseln. In diesem Fall müssen wir Elemente aus IFlow übernehmen, und IFlowCollector führt dies aus. Es stellt sich heraus, dass hier ein Objekt diese Schnittstellen implementiert:
internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } }
Die Hauptmethoden hier sind Emit , Finish und MoveNextAsync .
Emit am Anfang wartet auf den Moment, in dem der nächste Artikel aus der Sammlung angefordert wird. Das heißt, Gibt einen Artikel erst aus, wenn er benötigt wird. Dies nennt man Gegendruck, daher der Name des Semaphors. Dann wird das aktuelle Element festgelegt und es wird gemeldet, dass eine lange Abfrageanforderung das Ergebnis erhalten kann.
MoveNextAsync wird aufgerufen, wenn ein anderes Element aus der Sammlung abgerufen wird. Er gibt _backpressureSemaphore frei und wartet darauf, dass Flow das nächste Element auslöst. Dann wird ein Zeichen zurückgegeben, dass die Sammlung beendet wurde. Dieses Flag setzt die Finish-Methode.
Finish funktioniert nach dem gleichen Prinzip wie Emit, nur dass anstelle des nächsten Elements das Vorzeichen für das Ende der Sammlung gesetzt wird.
Jetzt müssen wir diese Klasse verwenden.
public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } }
Die ExtensionEnumerable-Erweiterungsmethode für IFlow erstellt einen FlowCollectorEnumerator und signiert einen Flow darauf. Anschließend wird die Finish () -Methode aufgerufen. Und es wird ein FlowEnumerableAdapter zurückgegeben, der die einfachste Implementierung von IAsyncEnumerable ist, wobei der FlowCollectorEnumerator als IEnumerator verwendet wird.
Wir versuchen was passiert ist.
var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); }
Hier erhalten wir Flow-Klicks (), protokollieren jeden Klick und verwandeln IFlow in IAsyncEnumerable. Dann gelten die berühmten Linq-Operatoren: Wir lassen nur die rechten Klicks und bekommen, in welchem Teil des Bildschirms sie gemacht werden.
Betrachten Sie als nächstes ein komplizierteres Beispiel. Wir werden den rechten Klick durch einen doppelten linken Klick ersetzen. Das heißt, Wir müssen jedes Element nicht einem anderen, sondern der Sammlung zuordnen. Oder in Flow, konvertiert in eine Sammlung.
var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); }
Zu diesem Zweck gibt es in Linq einen SelectMany-Operator. Das Gegenstück zu Jetstreams ist FlatMap. Ordnen Sie zunächst jeden Klick in IFlow zu: für den linken Klick - Fluss mit diesem einen Klick, für den rechten - Fluss von zwei linken Klicks mit einer Verzögerung zwischen ihnen. Und dann verwandeln wir in SelectMany IFlow in IAyncEnumerable.
Und es funktioniert! Das heißt, Viele Operatoren müssen für IFlow nicht implementiert werden - Sie können Linq verwenden.
Fazit
Rx.Net - war und ist das Hauptwerkzeug bei der Arbeit mit asynchronen Ereignissequenzen in C #. Dies ist jedoch eine ziemlich große Bibliothek in Bezug auf die Codegröße. Wie wir gesehen haben, können ähnliche Funktionen viel einfacher erhalten werden - nur zwei Schnittstellen plus etwas Bindung. Dies ist dank der Verwendung von Sprachfunktionen möglich - async / await. Als Rx geboren wurde, wurde diese Funktion noch nicht in C # übernommen.
Vielen Dank für Ihre Aufmerksamkeit!