用C#实现Kotlin Flow

图片


大家好!


近年来,我一直在Kotlin上为Android开发。 不久前,由于Kotlin多平台上缺少RxJava,我们开始使用协程和流程-开箱即用的Kotlin冷流。 在使用Android之前,我在C#中度过了很多年,并且我的协程已经存在很长时间了,只是没有这样命名。 但是我没有听说过异步/等待流的模拟。 反应式编程的主要工具是Rx.Net(实际上,rx就是在这里诞生的)。 因此,我决定怀旧,并尝试看自行车。


进一步理解,读者了解上一段中讨论的内容。 如果不耐烦,请立即链接到存储库


免责声明:此代码不适用于生产。 这是一个概念,仅此而已。 某些功能可能无法完全按预期工作。


IFlow和IFlowCollector


好吧,让我们从用额头上的C#重写Flow和FlowCollector接口开始。
那是:


interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) } 

它变成了:


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); } 

我相信这些差异可以通过异步的不同实现来理解和解释。


要使用这些接口,必须实现它们。 这是发生了什么:


  internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } } 

在流程的构造函数中,我们传递了一个将发出值的函数。 对于收集器的构造函数,该函数将处理每个发出的值。


你可以这样使用


 var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector); 

我认为上面的代码中所有内容都很清楚。 首先,我们创建一个Flow,然后创建一个收集器(每个元素的处理程序)。 然后我们开始Flow,在其上“签名”一个收集器。 如果您添加一点糖(请参阅github),我们将得到如下内容:


 await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine); 

在Kotlin上,它看起来像这样:


 scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } } 

就我个人而言,最重要的是,我不喜欢Sharpe上用于在创建流时明确指出元素类型的选项。 但这并不是说Kotlin中的类型推断要陡峭得多。 流函数如下所示:


 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) 

如我们所见,block参数标记有BuilderInference批注,该批注指示编译器应从该参数中获取类型。 有谁知道是否可以在罗斯林对C#进行类似的归档?


取消令牌


在rx中,您可以取消订阅。 在Kotlin Flow中,工作由Job负责,制造商将其退回,即协程范围。 我们当然也需要一个允许Flow尽早完成的工具。 在C#中,要取消异步操作,我不怕这个词,使用了Cancellation Token模式。 CancellationToken是一个类,其对象向异步操作提供已被取消的信息。 它在启动时将自身置于异步操作中,并且该操作本身会照顾其状态。 并且状态从外部改变。


简而言之,我们需要将CancellationToken放入Flow和FlowCollector中。


  public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); } 

我不会在此处粘贴实现-参见github。
现在,测试将如下所示:


  var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask; 

关键是这个。 与Flow并行,我们开始执行一项操作,该操作将在3秒钟后将其取消。 结果,Flow无法设法发出第三个元素,并以TaskCanceledException结尾,这是必需的行为。


一点练习


让我们尝试使用实际发生的事情。 例如,将一些事件包装在Flow中。 在Rx.Net中甚至为此提供了一个库方法FromEventPattern。


为了不干扰实际的UI,我编写了ClicksEmulator类,该类会随机产生条件的鼠标单击。


  public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs { //… public int X { get; } public int Y { get; } public Button Button { get; } } public event EventHandler<ClickEventArgs> ButtonClick; public async Task Start(CancellationToken cancellationToken = default) {… } } 

我省略了实现 她在这里不是很重要。 最主要的是事件ButtonClick,我们希望将其转换为Flow。 为此,我们编写了一个扩展方法


 public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); } 

首先,我们声明一个事件处理程序,该事件处理程序除了将事件的参数传递给收集器外什么也不做。 然后,我们订阅事件并在流程取消(完成)的情况下注册退订。 好吧,然后我们将无休止地等待并监听ButtonClick事件,直到cancelToken触发为止。


如果您在Kotlin中使用callbackFlow或channelFlow或在Rx中从侦听器创建了冷Observable,那么您会注意到在所有情况下代码结构都非常相似。 很好,但是问题来了-在这种情况下,有什么比Flow比粗暴的事件更好的了? 喷射流的全部力量在于操作员,他们可以对其进行各种转换:过滤,映射以及许多其他更复杂的转换。 但是我们还没有。 让我们尝试对此做些事情。


筛选,地图,OnNext


让我们从最简单的运算符之一开始-过滤器。 顾名思义,它将根据给定谓词过滤流元素。 这将是一种扩展方法,仅适用于原始流和返回流(仅包含过滤后的元素)。 事实证明,如果谓词返回true,则需要从原始流中获取每个元素,进行检查并进一步发出。 因此,让我们做吧:


  public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) ); 

现在,如果我们只需要单击鼠标左键,我们可以这样写:


 emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token); 

以此类推,我们编写了运算符Map和OnNext。 第一个使用传递的映射器函数将原始流的每个元素转换为另一个。 第二个将返回具有与原始元素相同的元素的流,但是对每个元素执行一个操作(通常是日志记录)。

  public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) ); 

并举例说明:


 emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token); 

通常,发明了许多用于喷射流的操作器,例如, 在此处可以找到它们。


并没有阻止为IFlow实现它们的任何一个。


那些熟悉Rx.Net的人知道,除了使用IObservable的新的和特定的运算符外,还使用从Linq到对象的扩展方法,这使您可以将流视为“事件的集合”,并使用通常的Linq对其进行操作。方法。 为什么不亲自编写语句,而不尝试将IFlow放在Linq轨道上?


IAsyncEnumerable


在C#8中,引入了IEnumerable的异步版本-IAsyncEnumerable-可以异步迭代的集合接口。 IAsyncEnumerable流和响应流(IObservable和IFlow)之间的根本区别是。 IAsyncEnumerable与IEnumerable一样,是拉模型。 我们遍历集合的数量和时间,然后自己从中提取元素。 流被推。 我们订阅事件并在事件发生时对其进行“响应”-因为它们是被动的。 但是,可以通过拉模型实现类似推的行为。 这称为长轮询https://en.wikipedia.org/wiki/Push_technology#Long_polling 。 本质是这样的:我们遍历集合,请求它的下一个元素并等待我们喜欢的时间,直到集合将它返回给我们,即 直到下一个事件到来。 与IEnumerable不同,IAsyncEnumerable将允许我们异步等待。 简而言之,我们需要以某种方式在IFlow上拉IAsyncEnumerable。


如您所知,IAsyncEnumerator接口负责返回IAsyncEnumerable集合的当前元素,并移至下一个元素。 在这种情况下,我们需要从IFlow中获取元素,而IFlowCollector会这样做。 事实证明,这里是一个实现以下接口的对象:


 internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } } 

这里的主要方法是EmitFinishMoveNextAsync
开头的Emit正在等待从集合中请求下一个项目的那一刻。 即 直到需要时才发射物品。 这称为背压,因此称为信号量。 然后设置当前项目,并报告长时间的轮询请求可以得到结果。
从集合中提取另一个项目时,将调用MoveNextAsync 。 他释放_backpressureSemaphore并等待Flow触发下一个元素。 然后它返回一个迹象,表明收集已经结束。 该标志设置Finish方法。


Finish与Emit的原理相同,只是Finish代替了下一个元素,它设置了集合结束的符号。


现在我们需要使用此类。


 public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } } 

IFlow的ExtensionEnumerable扩展方法创建一个FlowCollectorEnumerator并在其上签名流,然后调用Finish()方法。 并使用FlowCollectorEnumerator作为IEnumerator,返回FlowEnumerableAdapter,这是IAsyncEnumerable的最简单实现。
我们尝试发生了什么。


 var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); } 

在这里,我们得到Flow点击(),记录每次点击,然后将IFlow转换为IAsyncEnumerable。 然后,它应用了著名的Linq运算符:我们只保留右键单击,然后进入它们在屏幕的哪个部分。


接下来,考虑一个更复杂的示例。 我们将鼠标左键替换为鼠标左键。 即 我们将不需要将每个元素映射到其他元素,而是映射到集合。 或在Flow中,转换为集合。


 var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); } 

为此,Linq中有一个SelectMany运算符。 它在喷射流中的对应物是FlatMap。 首先,映射IFlow中的每个点击:左键单击-带有此单击的流,右键-来自两个左键单击的流,它们之间有延迟。 然后在SelectMany中,我们将IFlow转换为IAyncEnumerable。


而且有效! 即 IFlow不必实现许多运算符-您可以使用Linq。


结论


Rx.Net-在C#中处理事件的异步序列时,过去并且一直是主要工具。 但是就代码大小而言,这是一个相当大的库。 正如我们所看到的,可以更简单地获得类似的功能-只需两个接口加上一些绑定。 这要归功于使用语言功能-异步/等待。 Rx诞生时,此功能尚未引入C#。


感谢您的关注!

Source: https://habr.com/ru/post/zh-CN473258/


All Articles