Implémentation de Kotlin Flow en C #

image


Bonjour à tous!


Ces dernières années, j'ai développé pour Android sur Kotlin. Il n'y a pas si longtemps, faute de RxJava sur la multiplateforme Kotlin, nous avons commencé à utiliser des coroutines et des flux froids pour Kotlin dès le départ. Avant Android, j'ai passé de nombreuses années avec C #, et il y a mes coroutines depuis très longtemps, mais elles ne sont pas appelées ainsi. Mais je n'ai pas entendu parler de l'analogue du flux sur async / wait. Le principal outil de programmation réactive est Rx.Net (en fait, rx est né ici). J'ai donc décidé de nostalgique et d'essayer de voir un vélo.


Il est en outre entendu que le lecteur est conscient des choses qui ont été discutées dans le paragraphe précédent. Pour les impatients - liez immédiatement le référentiel .


Avertissement: ce code n'est pas destiné à être utilisé en production. C'est un concept, rien de plus. Quelque chose peut ne pas fonctionner exactement comme prévu.


IFlow et IFlowCollector


Eh bien, commençons par réécrire les interfaces Flow et FlowCollector en C # sur le front.
C'était:


interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) } 

C'est devenu:


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); } 

Je pense que les différences sont compréhensibles et s'expliquent par la mise en œuvre différente de l'asynchronie.


Pour utiliser ces interfaces, elles doivent être implémentées. Voici ce qui s'est passé:


  internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } } 

Dans le constructeur de flux, nous passons une fonction qui émettra des valeurs. Et pour le constructeur du collecteur, une fonction qui traitera chaque valeur émise.


Vous pouvez l'utiliser comme ceci


 var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector); 

Je pense que tout est clair dans le code ci-dessus. Nous créons d'abord un Flow, puis créons un collecteur (gestionnaire pour chaque élément). Ensuite, nous démarrons Flow, après avoir "signé" un collecteur dessus. Si vous ajoutez un peu de sucre (voir github), nous obtenons quelque chose comme ceci:


 await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine); 

Sur Kotlin, cela ressemble à ceci:


 scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } } 

Personnellement, je n'aime surtout pas l'option sur Sharpe pour indiquer explicitement le type d'élément lors de la création d'un flux. Mais le point ici n'est pas que l'inférence de type dans Kotlin est beaucoup plus abrupte. La fonction de flux ressemble à ceci:


 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) 

Comme nous pouvons le voir, le paramètre de bloc est marqué avec l'annotation BuilderInference, qui indique au compilateur que le type doit être pris de ce paramètre. Est-ce que quelqu'un sait s'il est possible de déposer des fichiers similaires pour C # sur Roslyn?


AnnulationToken


Dans rx, il existe un abonnement dont vous pouvez vous désinscrire. Dans Kotlin Flow, Job est responsable de l'annulation, qui est retournée par le constructeur ou Coroutine Scope. Nous avons également absolument besoin d'un outil qui permette à Flow de se terminer tôt. En C #, pour annuler les opérations asynchrones, je n'ai pas peur de ce mot, le modèle de jeton d'annulation est utilisé. CancellationToken est une classe dont l'objet fournit des informations à l'opération asynchrone indiquant qu'elle a été annulée. Il se lance dans une opération asynchrone au démarrage, et cette opération elle-même gère son état. Et l'état change de l'extérieur.


En bref, nous devons jeter le CancellationToken dans notre Flow et FlowCollector.


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); } 

Je ne collerai pas l'implémentation ici - voir github.
Le test ressemblera maintenant à ceci:


  var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask; 

Le point est le suivant. En parallèle avec Flow, nous démarrons une opération qui l'annulera après 3 secondes. Par conséquent, Flow ne parvient pas à émettre le troisième élément et se termine par une TaskCanceledException, qui est le comportement requis.


Un peu de pratique


Essayons d'utiliser ce qui s'est passé dans la pratique. Par exemple, encapsulez un événement dans notre Flow. Dans Rx.Net, il existe même une méthode de bibliothèque FromEventPattern pour cela.


Afin de ne pas jouer avec la vraie interface utilisateur, j'ai écrit la classe ClicksEmulator, qui génère des clics de souris conditionnels à des intervalles aléatoires.


  public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs { //… public int X { get; } public int Y { get; } public Button Button { get; } } public event EventHandler<ClickEventArgs> ButtonClick; public async Task Start(CancellationToken cancellationToken = default) {… } } 

J'ai omis la mise en œuvre comme elle n'est pas très importante ici. L'essentiel est l'événement ButtonClick, que nous voulons transformer en Flow. Pour cela nous écrivons une méthode d'extension


 public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); } 

Tout d'abord, nous déclarons un gestionnaire d'événements qui ne fait que transmettre l'argument de l'événement au collecteur. Ensuite, nous nous abonnons aux événements et enregistrons une désinscription en cas d'annulation (achèvement) du flux. Eh bien, nous attendons sans cesse et écoutons les événements ButtonClick jusqu'à ce que le CancellationToken se déclenche.


Si vous avez utilisé callbackFlow ou channelFlow dans Kotlin ou créé Cold Observable à partir d'écouteurs dans Rx, vous remarquerez que la structure du code est très similaire dans tous les cas. C'est bien, mais la question se pose - quoi de mieux dans ce cas que Flow qu'un événement brut? Toute la puissance des jet streams réside dans les opérateurs qui y effectuent diverses transformations: filtrage, mapping, et bien d'autres encore plus complexes. Mais nous ne les avons pas encore. Essayons de faire quelque chose.


Filtre, carte, OnNext


Commençons par l'un des opérateurs les plus simples - Filtre. Comme son nom l'indique, il filtrera les éléments de flux en fonction du prédicat donné. Ce sera une méthode d'extension appliquée au flux d'origine et au flux de retour avec des éléments filtrés uniquement. Il s'avère que nous devons prendre chaque élément du flux d'origine, vérifier et émettre plus loin si le prédicat retourne vrai. Alors faisons-le:


  public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) ); 

Maintenant, si nous avons seulement besoin de cliquer sur le bouton gauche de la souris, nous pouvons écrire ceci:


 emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token); 

Par analogie, nous écrivons les opérateurs Map et OnNext. Le premier convertit chaque élément du flux d'origine en un autre à l'aide de la fonction de mappage passée. Le second retourne le flux avec les mêmes éléments que l'original, mais en effectuant une action sur chacun (généralement la journalisation).

  public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) ); 

Et un exemple d'utilisation:


 emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token); 

En général, de nombreux opérateurs ont été inventés pour les jets, ils peuvent être trouvés, par exemple, ici .


Et rien n'empêche de les implémenter pour IFlow.


Ceux qui connaissent Rx.Net savent que là, en plus des opérateurs nouveaux et spécifiques pour IObservable, des méthodes d'extension de Linq-to-objects sont utilisées, ce qui vous permet de considérer les flux comme des «collections d'événements» et de les manipuler avec le Linq habituel méthodes. Pourquoi, au lieu d'écrire les déclarations vous-même, n'essayez pas de mettre IFlow sur les rails Linq?


IAsyncEnumerable


En C # 8, une version asynchrone d'IEnumerable a été introduite - IAsyncEnumerable - l'interface de collecte, qui peut être itérée de manière asynchrone. La différence fondamentale entre les flux IAsyncEnumerable et réactifs (IObservable et IFlow) est la suivante. IAsyncEnumerable, comme IEnumerable, est un modèle pull. Nous itérons sur la collection combien et quand nous en avons besoin et nous en tirons nous-mêmes des éléments. Les flux sont push. Nous souscrivons aux événements et nous «réagissons» à leur arrivée - pour cela ils sont réactifs. Cependant, un comportement de type push peut être obtenu à partir du modèle pull. Cela s'appelle un long sondage https://en.wikipedia.org/wiki/Push_technology#Long_polling . L'essence est la suivante: nous parcourons la collection, demandons son prochain élément et attendons aussi longtemps que nous le souhaitons jusqu'à ce que la collection nous le retourne, c'est-à-dire jusqu'au prochain événement. IAsyncEnumerable, contrairement à IEnumerable, nous permettra d'attendre de manière asynchrone. En bref, nous devons en quelque sorte extraire IAsyncEnumerable sur IFlow.


Comme vous le savez, l'interface IAsyncEnumerator est chargée de renvoyer l'élément actuel de la collection IAsyncEnumerable et de passer à l'élément suivant. Dans ce cas, nous devons prendre des éléments d'IFlow, et IFlowCollector le fait. Il s'avère que voici un objet qui implémente ces interfaces:


 internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } } 

Les principales méthodes ici sont Emit , Finish et MoveNextAsync .
Emit au début attend le moment où le prochain article de la collection sera demandé. C'est-à-dire N'émet pas d'élément tant qu'il n'est pas nécessaire. C'est ce qu'on appelle la contre-pression, d'où le nom du sémaphore. Ensuite, l'élément en cours est défini et il est signalé qu'une longue requête d'interrogation peut obtenir le résultat.
MoveNextAsync est appelé lorsqu'un autre élément est extrait de la collection. Il libère _backpressureSemaphore et attend que Flow déclenche l'élément suivant. Ensuite, il renvoie un signe que la collection est terminée. Cet indicateur définit la méthode Finish.


La finition fonctionne sur le même principe que Emit, mais au lieu de l'élément suivant, elle définit le signe de la fin de la collection.


Maintenant, nous devons utiliser cette classe.


 public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } } 

La méthode d'extension ExtensionEnumerable pour IFlow crée un FlowCollectorEnumerator et signe un flux dessus, après quoi la méthode Finish () est appelée. Et il renvoie un FlowEnumerableAdapter, qui est l'implémentation la plus simple de IAsyncEnumerable, en utilisant le FlowCollectorEnumerator comme un IEnumerator.
Nous essayons ce qui s'est passé.


 var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); } 

Ici, nous obtenons des clics sur Flow (), enregistrez chaque clic, puis transformez IFlow en IAsyncEnumerable. Ensuite, il applique les fameux opérateurs Linq: nous ne laissons que les clics-droit et nous obtenons dans quelle partie de l'écran ils sont réalisés.


Ensuite, considérons un exemple plus compliqué. Nous remplacerons le clic droit par un double gauche. C'est-à-dire nous devrons mapper chaque élément non pas à un autre, mais à la collection. Ou dans Flow, converti en collection.


 var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); } 

Pour ce faire, il existe un opérateur SelectMany dans Linq. Son homologue dans les courants-jets est FlatMap. Tout d'abord, mappez chaque clic dans IFlow: pour le clic gauche - Débit avec celui-ci un clic, pour la droite - Débit de deux clics gauches avec un délai entre eux. Et puis, dans SelectMany, nous transformons IFlow en IAyncEnumerable.


Et ça marche! C'est-à-dire de nombreux opérateurs ne doivent pas être implémentés pour IFlow - vous pouvez utiliser Linq.


Conclusion


Rx.Net - était et reste l'outil principal lorsque vous travaillez avec des séquences d'événements asynchrones en C #. Mais c'est une bibliothèque assez grande en termes de taille de code. Comme nous l'avons vu, des fonctionnalités similaires peuvent être obtenues beaucoup plus simple - seulement deux interfaces plus une certaine liaison. Ceci est possible grâce à l'utilisation des fonctionnalités du langage - async / wait. À la naissance de Rx, cette fonctionnalité n'était pas encore entrée en C #.


Merci de votre attention!

Source: https://habr.com/ru/post/fr473258/


All Articles