
Hola a todos!
En los últimos años, he estado desarrollando para Android en Kotlin. No hace mucho tiempo, por falta de RxJava en la multiplataforma de Kotlin, comenzamos a usar corutinas y corrientes de flujo frío para Kotlin fuera de la caja. Antes de Android, pasé muchos años con C #, y han estado mis rutinas durante mucho tiempo, solo que no se llaman así. Pero no escuché sobre el análogo de flujo en async / wait. La herramienta principal para la programación reactiva es Rx.Net (de hecho, rx nació aquí). Así que decidí nostálgico e intenté ver una bicicleta.
Se entiende además que el lector es consciente de las cosas que se discutieron en el párrafo anterior. Para los impacientes: enlace inmediato al repositorio .
Descargo de responsabilidad: este código no está destinado a ser utilizado en la producción. Este es un concepto, nada más. Algo puede no funcionar exactamente como se pretendía.
IFlow e IFlowCollector
Bueno, comencemos reescribiendo las interfaces Flow y FlowCollector en C # en la frente.
Fue:
interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) }
Se convirtió en:
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); }
Creo que las diferencias son comprensibles y explicadas por la implementación diferente de la asincronía.
Para usar estas interfaces, deben implementarse. Esto es lo que sucedió:
internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } }
En el constructor de flujo, pasamos una función que emitirá valores. Y para el constructor del colector, una función que procesará cada valor emitido.
Puedes usarlo así
var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector);
Creo que todo está claro en el código anterior. Primero creamos un Flow, luego creamos un colector (manejador para cada elemento). Luego comenzamos Flow, habiendo "firmado" un colector en él. Si agrega un poco de azúcar (vea github), obtenemos algo como esto:
await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine);
En Kotlin se ve así:
scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } }
Personalmente, sobre todo no me gusta la opción en Sharpe para indicar explícitamente el tipo de elemento al crear un flujo. Pero el punto aquí no es que la inferencia de tipos en Kotlin sea mucho más pronunciada. La función de flujo se ve así:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
Como podemos ver, el parámetro de bloque está marcado con la anotación BuilderInference, que le dice al compilador que el tipo debe tomarse de este parámetro. ¿Alguien sabe si es posible presentar una solicitud similar para C # en Roslyn?
Cancelación Tomado
En rx hay una suscripción desde la que puede darse de baja. En Kotlin Flow, Job es responsable de la cancelación, la cual es devuelta por el constructor o Coroutine Scope. Definitivamente, también necesitamos una herramienta que permita que Flow se complete temprano. En C #, para cancelar operaciones asincrónicas, no tengo miedo de esta palabra, se utiliza el patrón de token de cancelación. CancellationToken es una clase cuyo objeto proporciona información a la operación asincrónica de que se ha cancelado. Se lanza a una operación asincrónica al inicio, y esta operación en sí misma cuida su estado. Y el estado cambia desde el exterior.
En resumen, necesitamos lanzar el CancellationToken en nuestro Flow y FlowCollector.
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); }
No pegaré la implementación aquí, vea github.
La prueba ahora se verá así:
var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask;
El punto es este. En paralelo con Flow, comenzamos una operación que lo cancelará después de 3 segundos. Como resultado, Flow no logra emitir el tercer elemento y termina con una TaskCanceledException, que es el comportamiento requerido.
Un poco de practica
Tratemos de usar lo que sucedió en la práctica. Por ejemplo, envuelva algún evento en nuestro flujo. En Rx.Net hay incluso un método de biblioteca FromEventPattern para esto.
Para no meterse con la interfaz de usuario real, escribí la clase ClicksEmulator, que genera clics condicionales del mouse a intervalos aleatorios.
public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs {
Omití la implementación como ella no es muy importante aquí. Lo principal es el evento ButtonClick, que queremos convertir en Flow. Para esto escribimos un método de extensión
public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); }
Primero, declaramos un controlador de eventos que no hace nada más que pasar el argumento del evento al recopilador. Luego nos suscribimos a eventos y registramos una cancelación de suscripción en caso de cancelación (finalización) del flujo. Bueno, entonces esperamos sin parar y escuchamos los eventos de ButtonClick hasta que se cancele la cancelación de Tomken.
Si utilizó callbackFlow o channelFlow en Kotlin o creó Observable en frío de los oyentes en Rx, notará que la estructura del código es muy similar en todos los casos. Esto está bien, pero surge la pregunta: ¿qué es mejor en este caso que Flow que un evento crudo? Todo el poder de las corrientes en chorro reside en los operadores que realizan diversas transformaciones en ellas: filtrado, mapeo y muchos otros más complejos. Pero todavía no los tenemos. Intentemos hacer algo al respecto.
Filtro, Mapa, Siguiente
Comencemos con uno de los operadores más simples: Filtro. Como su nombre lo indica, filtrará los elementos de flujo de acuerdo con el predicado dado. Este será un método de extensión aplicado al flujo original y al flujo de retorno solo con elementos filtrados. Resulta que necesitamos tomar cada elemento del flujo original, verificar y emitir más si el predicado devuelve verdadero. Entonces hagámoslo:
public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) );
Ahora, si solo necesitamos hacer clic en el botón izquierdo del mouse, podemos escribir esto:
emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);
Por analogía, escribimos los operadores Map y OnNext. El primero convierte cada elemento del flujo original en otro utilizando la función de mapeador aprobada. El segundo devolverá el flujo con los mismos elementos que el original, pero realizando una acción en cada uno (generalmente registrando).
public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) );
Y un ejemplo de uso:
emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token);
En general, se inventaron muchos operadores para las corrientes en chorro, se pueden encontrar, por ejemplo, aquí .
Y nada impide implementar ninguno de ellos para IFlow.
Aquellos familiarizados con Rx.Net saben que, además de los operadores nuevos y específicos para IObservable, se utilizan métodos de extensión de Linq a objetos, y esto le permite considerar las transmisiones como "colecciones de eventos" y manipularlas con el Linq habitual métodos ¿Por qué, en lugar de escribir las declaraciones usted mismo, no intente poner IFlow en los rieles de Linq?
IAsyncEnumerable
En C # 8, se introdujo una versión asincrónica de IEnumerable, IAsyncEnumerable, una interfaz de recopilación que se puede iterar de forma asincrónica. La diferencia fundamental entre los flujos IAsyncEnumerable y reactivo (IObservable e IFlow) es esta. IAsyncEnumerable, como IEnumerable, es un modelo de extracción. Repetimos sobre la colección cuánto y cuándo necesitamos y extraemos elementos de ella. Las corrientes son push. Nos suscribimos a los eventos y "reaccionamos" a ellos cuando vienen, para eso son reactivos. Sin embargo, se puede lograr un comportamiento similar al del modelo pull. Esto se llama sondeo largo https://en.wikipedia.org/wiki/Push_technology#Long_polling . La esencia es esta: iteramos sobre la colección, solicitamos su siguiente elemento y esperamos todo el tiempo que queramos hasta que la colección nos lo devuelva, es decir. hasta que llegue el próximo evento. IAsyncEnumerable, a diferencia de IEnumerable, nos permitirá esperar de forma asincrónica. En resumen, necesitamos extraer de alguna manera IAsyncEnumerable en IFlow.
Como sabe, la interfaz IAsyncEnumerator es responsable de devolver el elemento actual de la colección IAsyncEnumerable y pasar al siguiente elemento. En este caso, necesitamos tomar elementos de IFlow, e IFlowCollector hace esto. Resulta que aquí hay un objeto que implementa estas interfaces:
internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } }
Los métodos principales aquí son Emit , Finish y MoveNextAsync .
Emitir al principio está esperando el momento en que se solicitará el próximo artículo de la colección. Es decir No emite un artículo hasta que se necesita. Esto se llama contrapresión, de ahí el nombre del semáforo. Luego se establece el elemento actual y se informa que una solicitud de sondeo larga puede obtener el resultado.
Se llama a MoveNextAsync cuando se extrae otro elemento de la colección. Libera _backpressureSemaphore y espera a que Flow active el siguiente elemento. Luego devuelve una señal de que la colección ha finalizado. Esta bandera establece el método Finalizar.
Finalizar funciona según el mismo principio que Emit, solo que en lugar del siguiente elemento establece el signo del final de la colección.
Ahora necesitamos usar esta clase.
public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } }
El método de extensión ExtensionEnumerable para IFlow crea un FlowCollectorEnumerator y firma un flujo en él, después de lo cual se llama al método Finish (). Y devuelve un FlowEnumerableAdapter, que es la implementación más simple de IAsyncEnumerable, utilizando FlowCollectorEnumerator como IEnumerator.
Intentamos lo que pasó.
var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); }
Aquí obtenemos clics de Flow (), registramos cada clic y luego convertimos IFlow en IAsyncEnumerable. Luego aplica los famosos operadores Linq: dejamos solo los clics con el botón derecho y obtenemos en qué parte de la pantalla están hechos.
A continuación, considere un ejemplo más complicado. Reemplazaremos el clic derecho por uno doble izquierdo. Es decir necesitaremos asignar cada elemento no a otro, sino a la colección. O en Flow, convertido en una colección.
var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); }
Para hacer esto, hay un operador SelectMany en Linq. Su contraparte en las corrientes en chorro es FlatMap. Primero, asigne cada clic en IFlow: para el clic izquierdo - Flujo con este clic, para el derecho - Flujo desde dos clics izquierdos con un retraso entre ellos. Y luego en SelectMany convertimos IFlow en IAyncEnumerable.
Y funciona! Es decir muchos operadores no tienen que implementarse para IFlow; puede usar Linq.
Conclusión
Rx.Net: fue y sigue siendo la herramienta principal cuando se trabaja con secuencias asíncronas de eventos en C #. Pero esta es una biblioteca bastante grande en términos de tamaño de código. Como vimos, se puede obtener una funcionalidad similar mucho más simple: solo dos interfaces más algún enlace. Esto es posible gracias al uso de funciones de lenguaje: asíncrono / espera. Cuando nació Rx, esta característica aún no se incorporó a C #.
Gracias por su atencion!