
Olá pessoal!
Nos últimos anos, desenvolvo o Android no Kotlin. Há pouco tempo, por falta de RxJava na multiplataforma Kotlin, começamos a usar corotinas e fluxos de fluxo frio para o Kotlin fora da caixa. Antes do Android, eu passei muitos anos com C #, e minhas corotinas existem há muito tempo, mas elas não são chamadas dessa maneira. Mas eu não ouvi sobre o análogo do fluxo em assíncrono / espera. A principal ferramenta para programação reativa é o Rx.Net (na verdade, o rx nasceu aqui). Então eu decidi nostálgico e tente ver uma bicicleta.
Entende-se ainda que o leitor está ciente das coisas discutidas no parágrafo anterior. Para os impacientes - conecte-se imediatamente ao repositório .
Isenção de responsabilidade: este código não se destina a ser usado na produção. Este é um conceito, nada mais. Algo pode não funcionar exatamente como pretendido.
IFlow e IFlowCollector
Bem, vamos começar reescrevendo as interfaces Flow e FlowCollector em C # na testa.
Foi:
interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) }
Tornou-se:
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); }
Acredito que as diferenças sejam compreensíveis e explicadas pela implementação diferente da assincronia.
Para usar essas interfaces, elas devem ser implementadas. Aqui está o que aconteceu:
internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } }
No construtor de fluxo, passamos uma função que emitirá valores. E para o construtor do coletor, uma função que processará cada valor emitido.
Você pode usá-lo assim
var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector);
Eu acho que tudo está claro no código acima. Primeiro, criamos um fluxo e, em seguida, criamos um coletor (manipulador para cada elemento). Então começamos o Flow, tendo "assinado" um coletor nele. Se você adicionar um pouco de açúcar (consulte o github), obteremos algo assim:
await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine);
No Kotlin, fica assim:
scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } }
Pessoalmente, acima de tudo, não gosto da opção no Sharpe para indicar explicitamente o tipo de elemento ao criar um fluxo. Mas o ponto aqui não é que a inferência de tipo no Kotlin seja muito mais acentuada. A função de fluxo é assim:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
Como podemos ver, o parâmetro block é marcado com a anotação BuilderInference, que informa ao compilador que o tipo deve ser obtido desse parâmetro. Alguém sabe se é possível arquivar semelhante para c # em Roslyn?
CancelamentoToken
Na rx, há uma assinatura da qual você pode cancelar a inscrição. No Kotlin Flow, Job é responsável pelo cancelamento, que é retornado pelo construtor, ou Escopo da Coroutine. Definitivamente, também precisamos de uma ferramenta que permita que o Flow seja concluído mais cedo. Em C #, para cancelar operações assíncronas, não tenho medo dessa palavra, o padrão Token de cancelamento é usado. CancellationToken é uma classe cujo objeto fornece informações para a operação assíncrona que foi cancelada. Ele se lança em uma operação assíncrona na inicialização, e essa própria operação cuida de seu estado. E o estado muda do lado de fora.
Em suma, precisamos lançar o CancellationToken em nosso Flow e FlowCollector.
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); }
Não vou colar a implementação aqui - veja o github.
O teste agora terá a seguinte aparência:
var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask;
O ponto é este. Paralelamente ao Flow, iniciamos uma operação que a cancelará após 3 segundos. Como resultado, o Flow não consegue emitir o terceiro elemento e termina com uma TaskCanceledException, que é o comportamento necessário.
Um pouco de prática
Vamos tentar usar o que aconteceu na prática. Por exemplo, envolva algum evento em nosso Flow. No Rx.Net, existe até um método de biblioteca FromEventPattern para isso.
Para não mexer com a interface do usuário real, escrevi a classe ClicksEmulator, que gera cliques condicionais do mouse em intervalos aleatórios.
public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs {
Omiti a implementação como ela não é muito importante aqui. O principal é o evento ButtonClick, que queremos transformar em Flow. Para isso, escrevemos um método de extensão
public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); }
Primeiro, declaramos um manipulador de eventos que não faz nada além de passar o argumento do evento para o coletor. Em seguida, assinamos eventos e registramos um cancelamento em caso de cancelamento (conclusão) do fluxo. Bem, então esperamos sem parar e ouvimos os eventos do ButtonClick até o cancelamento do Token disparar.
Se você usou callbackFlow ou channelFlow no Kotlin ou criou o Observable frio de ouvintes no Rx, notará que a estrutura do código é muito semelhante em todos os casos. Tudo bem, mas surge a pergunta - o que é melhor nesse caso do que o Flow do que um evento bruto? Todo o poder dos fluxos de jato está nos operadores que realizam várias transformações neles: filtragem, mapeamento e muitos outros mais complexos. Mas ainda não os temos. Vamos tentar fazer algo sobre isso.
Filtro, Mapa, Ao Avançar
Vamos começar com um dos operadores mais simples - Filtro. Como o nome indica, filtrará os elementos de fluxo de acordo com o predicado especificado. Este será um método de extensão aplicado ao fluxo original e retornando o fluxo apenas com elementos filtrados. Acontece que precisamos pegar cada elemento do fluxo original, verificar e emitir mais se o predicado retornar verdadeiro. Então vamos fazer:
public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) );
Agora, se precisarmos apenas clicar no botão esquerdo do mouse, podemos escrever o seguinte:
emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);
Por analogia, escrevemos os operadores Map e OnNext. O primeiro converte cada elemento do fluxo original em outro usando a função do mapeador passado. O segundo retornará o fluxo com os mesmos elementos que o original, mas executando uma ação em cada um (geralmente registrando).
public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) );
E um exemplo de uso:
emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token);
Em geral, muitos operadores foram inventados para fluxos de jato, eles podem ser encontrados, por exemplo, aqui .
E nada impede a implementação de nenhum deles para o IFlow.
Os familiarizados com o Rx.Net sabem que, além dos operadores novos e específicos para IObservable, são usados métodos de extensão do Linq para objetos, e isso permite que você considere os fluxos como "coleções de eventos" e os manipule com o Linq usual métodos Por que, em vez de escrever as instruções você mesmo, não tente colocar o IFlow nos trilhos do Linq?
IAsyncEnumerable
No C # 8, uma versão assíncrona do IEnumerable foi introduzida - IAsyncEnumerable - uma interface de coleção que pode ser iterada de forma assíncrona. A diferença fundamental entre os fluxos IAsyncEnumerable e reativos (IObservable e IFlow) é essa. IAsyncEnumerable, como IEnumerable, é um modelo pull. Nós iteramos sobre a coleção quanto e quando precisamos, e nós mesmos extraímos elementos dela. Fluxos são push. Assinamos eventos e "reagimos" a eles quando eles aparecem - por isso são reativos. No entanto, o comportamento do tipo push pode ser alcançado a partir do modelo pull. Isso é chamado de pesquisa longa https://en.wikipedia.org/wiki/Push_technology#Long_polling . A essência é a seguinte: iteramos sobre a coleção, solicitamos seu próximo elemento e esperamos o tempo que quisermos até que a coleção nos devolva, ou seja, até o próximo evento chegar. IAsyncEnumerable, diferentemente de IEnumerable, nos permitirá aguardar assincronamente. Em suma, precisamos de alguma forma puxar IAsyncEnumerable no IFlow.
Como você sabe, a interface IAsyncEnumerator é responsável por retornar o elemento atual da coleção IAsyncEnumerable e passar para o próximo elemento. Nesse caso, precisamos pegar elementos do IFlow, e o IFlowCollector faz isso. Acontece que aqui é um objeto que implementa essas interfaces:
internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } }
Os principais métodos aqui são Emit , Finish e MoveNextAsync .
A emissão no início está aguardando o momento em que o próximo item da coleção será solicitado. I.e. Não emite um item até que seja necessário. Isso é chamado de contrapressão, daí o nome do semáforo. Em seguida, o item atual é definido e é relatado que uma solicitação de pesquisa longa pode obter o resultado.
MoveNextAsync é chamado quando outro item é extraído da coleção. Ele libera _backpressureSemaphore e espera que o Flow ative o próximo elemento. Em seguida, ele retorna um sinal de que a coleção foi encerrada. Este sinalizador define o método Concluir.
Concluir funciona com o mesmo princípio que Emit; somente, em vez do próximo elemento, ele define o sinal do final da coleção.
Agora precisamos usar esta classe.
public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } }
O método de extensão ExtensionEnumerable para IFlow cria um FlowCollectorEnumerator e assina um fluxo nele, após o qual o método Finish () é chamado. E ele retorna um FlowEnumerableAdapter, que é a implementação mais simples de IAsyncEnumerable, usando o FlowCollectorEnumerator como um IEnumerator.
Tentamos o que aconteceu.
var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); }
Aqui obtemos Flow clicks (), registramos cada clique e depois transformamos IFlow em IAsyncEnumerable. Em seguida, aplicam-se os famosos operadores Linq: deixamos apenas os cliques com o botão direito e entramos em que parte da tela eles são feitos.
Em seguida, considere um exemplo mais complicado. Substituiremos o clique direito por um duplo esquerdo. I.e. precisaremos mapear cada elemento não para outro, mas para a coleção. Ou no Flow, convertido em uma coleção.
var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); }
Para fazer isso, existe um operador SelectMany no Linq. Sua contrapartida nos fluxos de jato é o FlatMap. Primeiro, mapeie cada clique no IFlow: para o clique esquerdo - Flua com este clique, para o direito - Flua a partir de dois cliques esquerdos com um atraso entre eles. E então, no SelectMany, transformamos o IFlow em IAyncEnumerable.
E funciona! I.e. muitos operadores não precisam ser implementados para o IFlow - você pode usar o Linq.
Conclusão
Rx.Net - foi e continua sendo a ferramenta principal ao trabalhar com sequências assíncronas de eventos em C #. Mas esta é uma biblioteca bastante grande em termos de tamanho do código. Como vimos, funcionalidades semelhantes podem ser obtidas muito mais simples - apenas duas interfaces mais alguma ligação. Isso é possível graças ao uso de recursos de idioma - assíncrono / aguardado. Quando Rx nasceu, esse recurso ainda não foi trazido para o C #.
Obrigado pela atenção!