C#5中引入了异步/等待功能,以改善用户界面的响应能力和对资源的Web访问。 换句话说,异步方法可以帮助开发人员执行异步操作,这些操作不会阻塞线程并返回单个标量结果。 在Microsoft多次尝试简化异步操作之后,异步/等待模板由于一种简单的方法而在开发人员中赢得了良好的声誉。
现有异步方法受到很大限制,因为它们只能返回一个值。 让我们看一下这种语法常见的async Task<int> DoAnythingAsync()
方法。 他工作的结果是某种意义。 由于此限制,您不能将此函数与yield
关键字和异步IEnumerable<int>
接口一起使用(以返回异步枚举的结果)。

如果将async/await
函数和yield
结合使用,则可以使用功能强大的编程模型,称为异步数据拉取 ,或者基于拉取的枚举枚举或异步异步序列 (在F#中称为)。
在C#8中使用异步线程的新功能消除了与返回单个结果相关的限制,并允许异步方法返回多个值。 这些更改将为异步模板提供更大的灵活性,并且用户将能够使用延迟的异步序列从某个地方(例如,从数据库)检索数据,或者从可用部分中的异步序列中接收数据。
一个例子:
foreach await (var streamChunck in asyncStreams) { Console.WriteLine($“Received data count = {streamChunck.Count}”); }
解决与异步编程有关的问题的另一种方法是使用反应性扩展(Rx)。 Rx
在开发人员中越来越重要,这种方法已在许多编程语言中使用,例如Java(RxJava)和JavaScript(RxJS)。
Rx基于推推模型(“不要问”原理),也称为反应式编程。 即 与IEnumerable不同,当使用者请求下一个元素时,在Rx模型中,数据提供者会向使用者发出信号,告知新元素出现在序列中。 数据以异步模式推送到队列中,并且使用者在接收时使用它。
在本文中,我将比较基于推送数据的模型(例如Rx)和基于提取数据的模型(例如IEnumerable),并展示哪种方案最适合哪种模型。 通过各种示例和演示代码来检查整个概念和好处。 最后,我将展示该应用程序并通过代码示例进行演示。
基于推入数据的模型与基于拉取数据的模型(拉式)的比较

图 -1-基于数据提取的模型与基于数据推送的模型的比较
这些示例基于数据提供者和使用者之间的关系,如图2所示。 -1-。 基于拉的模型很容易理解。 在其中,消费者向供应商请求并接收数据。 另一种方法是推推模型。 在这里,提供者将数据发布到队列中,而使用者必须订阅该数据才能接收它。
数据提取模型适用于提供者生成数据的速度快于消费者使用数据的速度的情况。 因此,消费者仅接收必要的数据,这避免了溢出问题。 如果消费者使用数据的速度比供应商产生数据的速度快,则基于推送数据的模型是合适的。 在这种情况下,供应商可以将更多数据发送给消费者,从而避免不必要的延迟。
Rx和Akka Streams (基于流的编程模型)使用背压方法来控制流。 为了解决上述供应商和接收者的问题,该方法同时使用推和拉数据。
在下面的示例中,缓慢的使用者从较快的提供程序中提取数据。 消费者处理完当前元素后,他将要求供应商提供下一个元素,依此类推,直到序列结束。
要了解异步线程的全部需求,请考虑以下代码。
// (count) static int SumFromOneToCount(int count) { ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; } // : const int count = 5; ConsoleExt.WriteLine($"Starting the application with count: {count}!"); ConsoleExt.WriteLine("Classic sum starting."); ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}"); ConsoleExt.WriteLine("Classic sum completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
结论:

我们可以使用yield语句使该方法推迟,如下所示。
static IEnumerable<int> SumFromOneToCountYield(int count) { ConsoleExt.WriteLine("SumFromOneToCountYield called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; yield return sum; } }
方法调用
const int count = 5; ConsoleExt.WriteLine("Sum with yield starting."); foreach (var i in SumFromOneToCountYield(count)) { ConsoleExt.WriteLine($"Yield sum: {i}"); } ConsoleExt.WriteLine("Sum with yield completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
结论:

如上面的输出窗口所示,结果将分部分返回,而不是单个值。 上面显示的摘要结果称为延迟列表。 但是,问题仍然没有解决:求和方法会阻塞代码。 如果查看线程,您会看到一切都在主线程中运行。
让我们将异步魔术词应用于第一个SumFromOneToCount方法(不使用yield)。
static async Task<int> SumFromOneToCountAsync(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsync called!"); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }); return result; }
方法调用
const int count = 5; ConsoleExt.WriteLine("async example starting."); // . , . , . var result = await SumFromOneToCountAsync(count); ConsoleExt.WriteLine("async Result: " + result); ConsoleExt.WriteLine("async completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
结论:

太好了 现在,计算是在另一个线程中完成的,但是结果仍然存在问题。 系统以单个值返回结果。
想象一下,我们可以以命令式编程方式结合使用递归枚举(yield语句)和异步方法。 这种组合称为异步流,这是C#8中的新功能。它非常适合解决与基于数据提取的编程模型相关的问题,例如,以现代方式从站点下载数据或读取文件或数据库中的记录。
让我们尝试在当前版本的C#中执行此操作。 我将async关键字添加到SumFromOneToCountYield方法中,如下所示:

图 -2-同时使用yield和async关键字时出错。
当我们尝试将异步添加到SumFromOneToCountYield时,会发生错误,如上所示。
让我们尝试不同的方法。 我们可以删除yield关键字并在任务中应用IEnumerable,如下所示:
static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!"); var collection = new Collection<int>(); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; collection.Add(sum); } return collection; }); return result; }
方法调用
const int count = 5; ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!"); var scs = await SumFromOneToCountTaskIEnumerable(count); ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!"); foreach (var sc in scs) { // , . . ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}"); } ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
结论:

从示例中可以看到,所有内容都是在异步模式下计算的,但问题仍然存在。 结果(所有结果收集在一个集合中)作为一个单独的块返回。 这不是我们所需要的。 如果您还记得,我们的目标是将异步计算模式与延迟的可能性结合起来。
为此,您需要使用外部库,例如C#中提供的Ix(Rx的一部分)或异步线程。
让我们回到我们的代码。 为了演示异步行为,我使用了一个外部库 。
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence) { ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called"); await sequence.ForEachAsync(value => { ConsoleExt.WriteLineAsync($"Consuming the value: {value}"); // Task.Delay(TimeSpan.FromSeconds(1)).Wait(); }); } static IEnumerable<int> ProduceAsyncSumSeqeunc(int count) { ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; // Task.Delay(TimeSpan.FromSeconds(0,5)).Wait(); yield return sum; } }
方法调用
const int count = 5; ConsoleExt.WriteLine("Starting Async Streams Demo!"); // . . IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable(); ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#"); // ; . var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence)); // . , . consumingTask.Wait(); ConsoleExt.WriteLineAsync("Async Streams Demo Done!");
结论:

最后,我们看到了所需的行为。 您可以在异步模式下运行枚举循环。
在此处查看源代码 。
以客户端-服务器架构为例的异步数据提取
让我们通过一个更实际的例子来看看这个概念。 在客户端-服务器体系结构的上下文中可以最好地看到此功能的所有优点。
客户端-服务器体系结构中的同步调用
当向服务器发送请求时,客户端被迫等待(即被阻止),直到响应到达为止,如图2所示。 -3-。

图 -3-同步数据提取,在此期间客户端等待直到请求处理完成
异步数据提取
在这种情况下,客户端请求数据并继续执行其他任务。 收到数据后,客户端将继续进行工作。

图 -4-异步数据提取,在此期间客户端可以在请求数据时执行其他任务
异步提取数据
在这种情况下,客户端请求一部分数据并继续执行其他任务。 然后,客户端在接收到数据后对其进行处理并请求下一部分,依此类推,直到接收到所有数据为止。 正是从这种情况下,产生了异步线程的想法。 在图。 -5-显示了客户端如何处理接收到的数据或执行其他任务。

图 -5-将数据作为异步序列(异步流)提取。 客户端未被阻止。
异步线程
与IEnumerable<T>
和IEnumerator<T>
有两个新的IAsyncEnumerable<T>
和IAsyncEnumerator<T>
接口,其定义如下所示:
public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(); } public interface IAsyncEnumerator<out T> : IAsyncDisposable { Task<bool> MoveNextAsync(); T Current { get; } } // public interface IAsyncDisposable { Task DiskposeAsync(); }
在InfoQ中,乔纳森·艾伦(Jonathan Allen)提出了正确的话题。 在这里我将不做详细介绍,因此我建议阅读他的文章 。
重点是Task<bool> MoveNextAsync()
的返回值(从bool更改为Task<bool>
,即bool IEnumerator.MoveNext()
)。 多亏了他,所有计算及其迭代都将异步进行。 消费者决定何时获得下一个值。 尽管它是异步模型,但仍使用数据提取。 对于异步清理资源,可以使用IAsyncDisposable
接口。 有关异步线程的更多信息,请参见此处 。
句法
最终语法应类似于以下内容:
foreach await (var dataChunk in asyncStreams) { // yield . }
从上面的示例可以看出,从理论上讲,我们可以在等待其他异步操作的同时,依次计算一组值,而不是计算单个值。
重新设计的Microsoft示例
我重写了Microsoft的演示代码。 可以完全从我的GitHub存储库下载它。
该示例基于以下想法:在内存(20,000字节的数组)中创建大型流,并以异步模式从中顺序提取元素。 在每次迭代期间,将从阵列中提取8 KB。


在步骤(1),创建一个大数据数组,其中填充了伪值。 然后,在步骤(2)中,定义了一个称为校验和的变量。 此包含校验和的变量旨在验证计算总和的正确性。 在内存中创建一个数组和一个校验和,并在步骤(3)中作为元素序列返回。
步骤(4)涉及应用AsEnumarble
扩展AsEnumarble
(更合适的名称为AsAsyncEnumarble),该方法有助于模拟8 KB的异步流(BufferSize = 8000个元素(6))。
通常不需要从IAsyncEnumerable继承,但是在上面显示的示例中,执行此操作是为了简化演示代码,如步骤(5)所示。
步骤(7)涉及使用foreach
关键字,该关键字从内存中的异步流中提取8 KB的数据块。 拉取过程按顺序进行:当使用者(包含foreach
的代码的一部分)准备好接收下一条数据时,他将其从提供者(包含在内存中的数组中)中拉出。 最后,当循环完成时,程序将根据步骤(8)检查校验和的'c'值,如果它们匹配,则会显示消息“校验和匹配!”。
Microsoft演示输出窗口:

结论
我们研究了异步线程,这些线程非常适合异步提取数据和编写在异步模式下生成多个值的代码。
使用此模型,您可以查询序列中的下一个数据元素并获得响应。 它与IObservable<T>
数据推送模型不同,在IObservable<T>
数据推送模型中,无论使用者的状态如何,都会生成值。 当消费者自己确定接受下一个数据的意愿时,异步流使您可以完美地表示由消费者控制的异步数据源。 示例包括使用Web应用程序或读取数据库中的记录。
我演示了如何在异步模式下创建枚举,以及如何使用带有异步序列的外部库来使用它。 我还展示了从Internet下载内容时此功能提供的好处。 最后,我们研究了异步线程的新语法以及基于Microsoft Build Demo Code( 2018年5月7日至9日,// WA,Seattle )的完整用法示例
