Implementação do fluxo Kotlin em C #

imagem


Olá pessoal!


Nos últimos anos, desenvolvo o Android no Kotlin. Há pouco tempo, por falta de RxJava na multiplataforma Kotlin, começamos a usar corotinas e fluxos de fluxo frio para o Kotlin fora da caixa. Antes do Android, eu passei muitos anos com C #, e minhas corotinas existem há muito tempo, mas elas não são chamadas dessa maneira. Mas eu não ouvi sobre o análogo do fluxo em assíncrono / espera. A principal ferramenta para programação reativa é o Rx.Net (na verdade, o rx nasceu aqui). Então eu decidi nostálgico e tente ver uma bicicleta.


Entende-se ainda que o leitor está ciente das coisas discutidas no parágrafo anterior. Para os impacientes - conecte-se imediatamente ao repositório .


Isenção de responsabilidade: este código não se destina a ser usado na produção. Este é um conceito, nada mais. Algo pode não funcionar exatamente como pretendido.


IFlow e IFlowCollector


Bem, vamos começar reescrevendo as interfaces Flow e FlowCollector em C # na testa.
Foi:


interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) } 

Tornou-se:


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); } 

Acredito que as diferenças sejam compreensíveis e explicadas pela implementação diferente da assincronia.


Para usar essas interfaces, elas devem ser implementadas. Aqui está o que aconteceu:


  internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } } 

No construtor de fluxo, passamos uma função que emitirá valores. E para o construtor do coletor, uma função que processará cada valor emitido.


Você pode usá-lo assim


 var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector); 

Eu acho que tudo está claro no código acima. Primeiro, criamos um fluxo e, em seguida, criamos um coletor (manipulador para cada elemento). Então começamos o Flow, tendo "assinado" um coletor nele. Se você adicionar um pouco de açúcar (consulte o github), obteremos algo assim:


 await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine); 

No Kotlin, fica assim:


 scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } } 

Pessoalmente, acima de tudo, não gosto da opção no Sharpe para indicar explicitamente o tipo de elemento ao criar um fluxo. Mas o ponto aqui não é que a inferência de tipo no Kotlin seja muito mais acentuada. A função de fluxo é assim:


 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) 

Como podemos ver, o parâmetro block é marcado com a anotação BuilderInference, que informa ao compilador que o tipo deve ser obtido desse parâmetro. Alguém sabe se é possível arquivar semelhante para c # em Roslyn?


CancelamentoToken


Na rx, há uma assinatura da qual você pode cancelar a inscrição. No Kotlin Flow, Job é responsável pelo cancelamento, que é retornado pelo construtor, ou Escopo da Coroutine. Definitivamente, também precisamos de uma ferramenta que permita que o Flow seja concluído mais cedo. Em C #, para cancelar operações assíncronas, não tenho medo dessa palavra, o padrão Token de cancelamento é usado. CancellationToken é uma classe cujo objeto fornece informações para a operação assíncrona que foi cancelada. Ele se lança em uma operação assíncrona na inicialização, e essa própria operação cuida de seu estado. E o estado muda do lado de fora.


Em suma, precisamos lançar o CancellationToken em nosso Flow e FlowCollector.


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); } 

Não vou colar a implementação aqui - veja o github.
O teste agora terá a seguinte aparência:


  var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask; 

O ponto é este. Paralelamente ao Flow, iniciamos uma operação que a cancelará após 3 segundos. Como resultado, o Flow não consegue emitir o terceiro elemento e termina com uma TaskCanceledException, que é o comportamento necessário.


Um pouco de prática


Vamos tentar usar o que aconteceu na prática. Por exemplo, envolva algum evento em nosso Flow. No Rx.Net, existe até um método de biblioteca FromEventPattern para isso.


Para não mexer com a interface do usuário real, escrevi a classe ClicksEmulator, que gera cliques condicionais do mouse em intervalos aleatórios.


  public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs { //… public int X { get; } public int Y { get; } public Button Button { get; } } public event EventHandler<ClickEventArgs> ButtonClick; public async Task Start(CancellationToken cancellationToken = default) {… } } 

Omiti a implementação como ela não é muito importante aqui. O principal é o evento ButtonClick, que queremos transformar em Flow. Para isso, escrevemos um método de extensão


 public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); } 

Primeiro, declaramos um manipulador de eventos que não faz nada além de passar o argumento do evento para o coletor. Em seguida, assinamos eventos e registramos um cancelamento em caso de cancelamento (conclusão) do fluxo. Bem, então esperamos sem parar e ouvimos os eventos do ButtonClick até o cancelamento do Token disparar.


Se você usou callbackFlow ou channelFlow no Kotlin ou criou o Observable frio de ouvintes no Rx, notará que a estrutura do código é muito semelhante em todos os casos. Tudo bem, mas surge a pergunta - o que é melhor nesse caso do que o Flow do que um evento bruto? Todo o poder dos fluxos de jato está nos operadores que realizam várias transformações neles: filtragem, mapeamento e muitos outros mais complexos. Mas ainda não os temos. Vamos tentar fazer algo sobre isso.


Filtro, Mapa, Ao Avançar


Vamos começar com um dos operadores mais simples - Filtro. Como o nome indica, filtrará os elementos de fluxo de acordo com o predicado especificado. Este será um método de extensão aplicado ao fluxo original e retornando o fluxo apenas com elementos filtrados. Acontece que precisamos pegar cada elemento do fluxo original, verificar e emitir mais se o predicado retornar verdadeiro. Então vamos fazer:


  public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) ); 

Agora, se precisarmos apenas clicar no botão esquerdo do mouse, podemos escrever o seguinte:


 emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token); 

Por analogia, escrevemos os operadores Map e OnNext. O primeiro converte cada elemento do fluxo original em outro usando a função do mapeador passado. O segundo retornará o fluxo com os mesmos elementos que o original, mas executando uma ação em cada um (geralmente registrando).

  public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) ); 

E um exemplo de uso:


 emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token); 

Em geral, muitos operadores foram inventados para fluxos de jato, eles podem ser encontrados, por exemplo, aqui .


E nada impede a implementação de nenhum deles para o IFlow.


Os familiarizados com o Rx.Net sabem que, além dos operadores novos e específicos para IObservable, são usados ​​métodos de extensão do Linq para objetos, e isso permite que você considere os fluxos como "coleções de eventos" e os manipule com o Linq usual métodos Por que, em vez de escrever as instruções você mesmo, não tente colocar o IFlow nos trilhos do Linq?


IAsyncEnumerable


No C # 8, uma versão assíncrona do IEnumerable foi introduzida - IAsyncEnumerable - uma interface de coleção que pode ser iterada de forma assíncrona. A diferença fundamental entre os fluxos IAsyncEnumerable e reativos (IObservable e IFlow) é essa. IAsyncEnumerable, como IEnumerable, é um modelo pull. Nós iteramos sobre a coleção quanto e quando precisamos, e nós mesmos extraímos elementos dela. Fluxos são push. Assinamos eventos e "reagimos" a eles quando eles aparecem - por isso são reativos. No entanto, o comportamento do tipo push pode ser alcançado a partir do modelo pull. Isso é chamado de pesquisa longa https://en.wikipedia.org/wiki/Push_technology#Long_polling . A essência é a seguinte: iteramos sobre a coleção, solicitamos seu próximo elemento e esperamos o tempo que quisermos até que a coleção nos devolva, ou seja, até o próximo evento chegar. IAsyncEnumerable, diferentemente de IEnumerable, nos permitirá aguardar assincronamente. Em suma, precisamos de alguma forma puxar IAsyncEnumerable no IFlow.


Como você sabe, a interface IAsyncEnumerator é responsável por retornar o elemento atual da coleção IAsyncEnumerable e passar para o próximo elemento. Nesse caso, precisamos pegar elementos do IFlow, e o IFlowCollector faz isso. Acontece que aqui é um objeto que implementa essas interfaces:


 internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } } 

Os principais métodos aqui são Emit , Finish e MoveNextAsync .
A emissão no início está aguardando o momento em que o próximo item da coleção será solicitado. I.e. Não emite um item até que seja necessário. Isso é chamado de contrapressão, daí o nome do semáforo. Em seguida, o item atual é definido e é relatado que uma solicitação de pesquisa longa pode obter o resultado.
MoveNextAsync é chamado quando outro item é extraído da coleção. Ele libera _backpressureSemaphore e espera que o Flow ative o próximo elemento. Em seguida, ele retorna um sinal de que a coleção foi encerrada. Este sinalizador define o método Concluir.


Concluir funciona com o mesmo princípio que Emit; somente, em vez do próximo elemento, ele define o sinal do final da coleção.


Agora precisamos usar esta classe.


 public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } } 

O método de extensão ExtensionEnumerable para IFlow cria um FlowCollectorEnumerator e assina um fluxo nele, após o qual o método Finish () é chamado. E ele retorna um FlowEnumerableAdapter, que é a implementação mais simples de IAsyncEnumerable, usando o FlowCollectorEnumerator como um IEnumerator.
Tentamos o que aconteceu.


 var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); } 

Aqui obtemos Flow clicks (), registramos cada clique e depois transformamos IFlow em IAsyncEnumerable. Em seguida, aplicam-se os famosos operadores Linq: deixamos apenas os cliques com o botão direito e entramos em que parte da tela eles são feitos.


Em seguida, considere um exemplo mais complicado. Substituiremos o clique direito por um duplo esquerdo. I.e. precisaremos mapear cada elemento não para outro, mas para a coleção. Ou no Flow, convertido em uma coleção.


 var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); } 

Para fazer isso, existe um operador SelectMany no Linq. Sua contrapartida nos fluxos de jato é o FlatMap. Primeiro, mapeie cada clique no IFlow: para o clique esquerdo - Flua com este clique, para o direito - Flua a partir de dois cliques esquerdos com um atraso entre eles. E então, no SelectMany, transformamos o IFlow em IAyncEnumerable.


E funciona! I.e. muitos operadores não precisam ser implementados para o IFlow - você pode usar o Linq.


Conclusão


Rx.Net - foi e continua sendo a ferramenta principal ao trabalhar com sequências assíncronas de eventos em C #. Mas esta é uma biblioteca bastante grande em termos de tamanho do código. Como vimos, funcionalidades semelhantes podem ser obtidas muito mais simples - apenas duas interfaces mais alguma ligação. Isso é possível graças ao uso de recursos de idioma - assíncrono / aguardado. Quando Rx nasceu, esse recurso ainda não foi trazido para o C #.


Obrigado pela atenção!

Source: https://habr.com/ru/post/pt473258/


All Articles