.NET:用于处理多线程和异步的工具。 第一部分

我在Habr上发表了原始文章,其翻译发布在Codingsight博客上。
第二部分在这里可用

甚至在计算机问世之前,就需要异步执行某些操作,而不必等待此时此地的结果,或者需要在执行此操作的多个单元之间共享大量工作。 随着它们的出现,这种需求变得非常明显。 现在,在2019年,在配备8核Intel Core处理器的笔记本电脑上打字此文章,该笔记本电脑上不能同时运行100个进程,但可以同时运行更多线程。 它的旁边是一部略受打击的电话,该电话是几年前购买的,配备了8核处理器。 主题资源中充满了文章和视频,其作者欣赏了今年的旗舰智能手机,其中配备了16核处理器。 MS Azure以每小时不到20美元的价格提供了一个具有128个核心处理器和2 TB RAM的虚拟机。 不幸的是,如果无法控制流的交互作用,就不可能最大化和抑制这种能力。

术语学


进程 -一个OS对象,一个隔离的地址空间,包含线程。
线程(Thread) -OS对象,最小的执行单元,是进程的一部分,线程在进程内相互共享内存和其他资源。
多任务处理是一项操作系统功能,可以同时执行多个进程
多核 -处理器的属性,可以使用多个核进行数据处理
多重处理 -计算机的属性,能够同时在物理上与多个处理器一起工作
多线程是进程的属性,是在多个线程之间分配数据处理能力。
并行性 -每单位时间同时在物理上执行多个动作
异步 -在不等待该处理结束的情况下执行操作,可以稍后处理执行结果。

隐喻


并非所有的定义都是好的,有些还需要其他解释,因此我将在正式引入的术语中添加一个烹饪早餐的隐喻。 在这个比喻中煮早餐是一个过程。

早上,我( CPU )来做早餐来到厨房( 计算机 )。 我有2手( 核心 )。 厨房中有许多设备( IO ):烤箱,水壶,烤面包机,冰箱。 我打开煤气,在上面放一个煎锅,然后在里面倒油,而不必等到它变暖为止( 异步地,非阻塞IO等待 ),我将鸡蛋从冰箱中取出,分成一个盘子,然后用一只手将它们打碎( 线程1) ),第二个( 线程#2 )我拿着盘子(共享资源)。 现在我仍然可以打开水壶,但是手不够( 螺纹饥饿 ),在此期间,将煎锅加热(处理结果),然后倒入搅打的东西。 我伸手去拿水壶,然后笨拙地看着水壶里的水如何沸腾( Blocking-IO-Wait ),尽管这段时间我可以洗盘子,打败煎蛋卷。

我只用两只手煮了一个煎蛋卷,但是我没有更多,但是在搅打煎蛋卷的同时执行了3次操作:搅打煎蛋卷,握住盘子,加热煎锅CPU是计算机中最快的部分,IO经常出现减慢一切速度,因此一种有效的解决方案通常是在从IO接收数据的同时占用CPU。

继续比喻:

  • 如果在准备煎蛋卷的过程中,我还尝试换衣服,这将是多任务处理的一个示例。 一个重要的细微差别:具有此功能的计算机比人的性能要好得多。
  • 一个有多个厨师的厨房(例如,在餐馆里)是多核计算机。
  • 购物中心中的许多美食广场餐厅-数据中心

.NET工具


与许多其他事情一样,在使用线程时,.NET很好。 对于每个新版本,他都提供了越来越多的与之配合使用的新工具,以及OS线程上新的抽象层。 在进行抽象的构造时,框架开发人员使用的方法在使用高级抽象时会留下可能性,它将在下面下降一个或几个级别。 多数情况下这不是必需的,此外,这可能会导致a弹枪被脚踩开,但是有时,在极少数情况下,这可能是解决在当前抽象水平上无法解决的问题的唯一方法。

所谓工具,是指框架和第三方程序包提供的程序接口(API),以及指简化与多线程代码相关的任何问题的搜索的整个软件解决方案。

流开始


Thread类是.NET中用于处理线程的最基本的类。 构造函数接受两个委托之一:

  • ThreadStart-没有参数
  • ParametrizedThreadStart-具有对象类型的一个参数。

该委托将在调用Start方法后在新创建的线程中执行,如果将ParametrizedThreadStart类型的委托传递给构造函数,则必须将一个对象传递给Start方法。 需要使用此机制将任何本地信息传输到流。 值得注意的是,创建线程是一项昂贵的操作,并且线程本身是一个沉重的对象,至少因为至少1MB的内存分配给了堆栈,并且需要与OS API进行交互。

new Thread(...).Start(...); 

ThreadPool类表示池的概念。 在.NET中,线程池是一件艺术品,Microsoft的开发人员已投入大量精力使其在各种情况下均能最佳工作。

一般概念:

从一开始,后台的应用程序就会在预留空间中创建多个线程,并提供了将其投入使用的机会。 如果频繁且大量使用线程,则池会扩展以满足调用代码的需求。 如果在适当的时间池中没有可用的流,它将等待流之一返回或创建一个新的流。 随之而来的是,线程池对于某些短时间操作非常有用,而对于整个应用程序中作为服务进行操作的操作则不太适合。

要使用池中的线程,有一个QueueUserWorkItem方法可以接受WaitCallback委托,该委托与ParametrizedThreadStart的签名相同,并且传递给它的参数执行相同的功能。

 ThreadPool.QueueUserWorkItem(...); 

鲜为人知的线程池方法RegisterWaitForSingleObject用于组织非阻塞IO操作。 当传递给该方法的WaitHandle为“已释放”时,将调用传递给该方法的委托。

 ThreadPool.RegisterWaitForSingleObject(...) 

.NET有一个流计时器,它与WinForms / WPF计时器的不同之处在于,它将在从池中获取的流中调用其处理程序。

 System.Threading.Timer 

还有一种从池中将委托发送到线程的颇为奇特的方法-BeginInvoke方法。

 DelegateInstance.BeginInvoke 

我还想谈谈调用上述许多方法的函数-Kernel32.dll Win32 API中的CreateThread。 借助于extern方法的机制,有一种方法可以调用此函数。 在一个糟糕的遗留代码示例中,我只看到过一次这样的挑战,而作者这样做的动力对我来说仍然是个谜。

 Kernel32.dll CreateThread 

查看和调试线程


您可以在“线程Visual Studio”窗口中查看由所有第三方组件和.NET池个人创建的线程。 仅当应用程序处于调试状态且处于中断模式(中断模式)时,此窗口才会显示有关流的信息。 在这里,您可以方便地查看每个线程的堆栈名称和优先级,将调试切换到特定线程。 Thread类的Priority属性允许您设置线程的优先级,当在线程之间划分CPU时间时,OC和CLR会认为这是建议。



任务并行库


任务并行库(TPL)出现在.NET 4.0中。 现在,它是处理异步的标准工具和主要工具。 任何使用旧方法的代码都被认为是遗留的。 TPL的基本单元是System.Threading.Tasks命名空间中的Task类。 任务是对线程的抽象。 使用新版本的C#,我们有了一种使用Task的优雅方法-异步/等待运算符。 这些概念使编写异步代码变得简单和同步成为了可能,即使对于不太了解线程内部知识的人们也可以编写使用它们的应用程序,这些应用程序在长时间运行时不会冻结。 使用async / await是一个或什至几篇文章的主题,但是我将尽力弄清楚几句话:

  • 异步是返回Task或void的方法的修饰符
  • 而await是Task非阻塞等待语句。

再一次:在一般情况下(有例外),await运算符将进一步释放当前执行线程,并且当Task完成执行时,该线程(实际上说上下文是正确的,但稍后会更多)可以自由地继续执行该方法。 在.NET内部,当一种书面方法变成一个完整的类(即状态机)时,可以用与收益率返回相同的方式实现该机制,并且可以根据这些状态将它们分开执行。 任何有兴趣的人都可以使用asyn/ await编写任何简单的代码,使用启用了“编译器生成的代码”的JetBrains dotPeek进行编译和查看程序集。

考虑启动和使用Task的选项。 使用下面的代码示例,我们创建了一个无用的新任务( Thread.Sleep(10000) ),但在现实生活中,这应该是某种复杂的,涉及CPU的工作。

 using TCO = System.Threading.Tasks.TaskCreationOptions; public static async void VoidAsyncMethod() { var cancellationSource = new CancellationTokenSource(); await Task.Factory.StartNew( // Code of action will be executed on other context () => Thread.Sleep(10000), cancellationSource.Token, TCO.LongRunning | TCO.AttachedToParent | TCO.PreferFairness, scheduler ); // Code after await will be executed on captured context } 

任务具有许多选项:

  • LongRunning提示该任务不会很快完成,这意味着可能值得考虑不要从池中获取线程,而是为此任务创建一个单独的线程,以免损害其他任务。
  • AttachedToParent-任务可以按层次结构排列。 如果使用了此选项,则任务可能会处于完成状态并正在等待子项完成的状态。
  • PreferFairness-意味着最好先执行较早发送的任务,然后再执行较晚发送的任务。 但这只是一个建议,无法保证结果。

该方法的第二个参数传递了CancellationToken。 为了在启动操作后正确处理取消操作,必须在执行的代码中填充CancellationToken的状态检查。 如果没有检查,则在CancellationTokenSource对象上调用的Cancel方法将只能在Task启动之前停止执行。

最后一个参数传递了TaskScheduler类型的调度程序对象。 此类及其子代旨在控制按线程分配Task'ov的策略,默认情况下,Task将在池中的随机线程上执行。

等待操作符应用于创建的任务,这意味着在其之后编写的代码(如果有)将与等待之前的代码在相同的上下文中执行(通常意味着它在同一线程上)。

该方法被标记为异步无效,这意味着您可以在其中使用await运算符,但是调用代码无法等待执行。 如果必须使用此功能,则该方法应返回Task。 标记为异步无效的方法非常普遍:通常,它们是事件处理程序或其他根据即发即弃原理工作的方法。 如果您不仅需要有机会等待执行完成,还需要返回结果,则必须使用Task。

但是,在StartNew方法返回的Task上,就像在其他任何方法上一样,您可以使用false参数调用ConfigureAwait方法,然后await之后的执行将不会在捕获的上下文上继续,而是在任意上下文上继续。 当等待后执行上下文对代码不重要时,应始终执行此操作。 MS在编写代码时也建议将其打包成库形式。

让我们详细介绍如何等待任务完成。 下面是带注释的示例代码,其中在条件良好的情况下和在条件不好的情况下进行等待的时间。

 public static async void AnotherMethod() { int result = await AsyncMethod(); // good result = AsyncMethod().Result; // bad AsyncMethod().Wait(); // bad IEnumerable<Task> tasks = new Task[] { AsyncMethod(), OtherAsyncMethod() }; await Task.WhenAll(tasks); // good await Task.WhenAny(tasks); // good Task.WaitAll(tasks.ToArray()); // bad } 

在第一个示例中,我们等待Task完成并且不阻塞调用线程,仅当结果已经存在时,我们才返回处理结果,直到调用线程留给自己为止。

在第二个选项中,我们阻塞调用线程,直到计算出方法的结果为止。 这很不好,不仅因为我们通过简单的空闲来获取了线程,这是程序的宝贵资源,而且还因为如果我们调用的方法代码已经处于等待状态,并且同步上下文涉及到等待之后返回到调用线程,那么我们将陷入僵局。 :调用线程等待直到计算出异步方法的结果,异步方法徒劳地尝试继续在调用线程中执行它。

这种方法的另一个缺点是复杂的错误处理。 事实是,使用async / await时异步代码中的错误很容易处理-它们的行为就像代码是同步的一样。 同时,如果我们对任务应用驱魔,同步期望,则原始异常会变成AggregateException,即 要处理异常,您将必须检查InnerException类型并在一个catch块中编写if链,或者在构造时使用catch而不是C#中更熟悉的catch块链。

由于相同的原因,第三个和最后一个示例也被标记为不良,并且包含所有相同的问题。

当Any和WhenAll方法在等待一组Task'ov时非常方便时,它们将一组Task'ov包装在一个中,这将在该组中Task'a的第一个操作或每个人完成执行时起作用。

止流


由于各种原因,可能有必要在流启动后停止流。 有很多方法可以做到这一点。 Thread类有两个具有适当名称的方法-AbortInterrupt 。 不建议使用第一个,因为 在任何随机时刻调用它之后,在处理任何指令的过程中,都会引发ThreadAbortedException 。 您不希望在增加整数变量时会发生此类异常,对吗? 使用这种方法时,这是一个非常现实的情况。 如果要防止CLR在代码的特定部分引发此类异常,可以将其包装在对Thread.BeginCriticalRegionThread.EndCriticalRegion的调用中。 在finally块中编写的任何代码都将被此类调用包装。 因此,在框架代码的开头,您可以找到带有空尝试的块,但最终找不到空的块。 Microsoft不建议您使用此方法,因为他们没有将其包括在.net core中。

中断方法更可预测。 仅当线程处于空闲状态时,它才能中断线程,但ThreadInterruptedException除外。 在这种状态下,它在等待WaitHandle,锁定或调用Thread.Sleep之后进入暂停状态。

上述两个选项都具有不可预测性。 解决方案是使用CancellationToken结构和CancellationTokenSource类。 最重要的是:创建了CancellationTokenSource类的实例,只有拥有它的人才能通过调用Cancel方法停止操作。 仅CancellationToken传递给操作本身。 CancellationToken的所有者不能自己取消该操作,而只能检查该操作是否已取消。 为此,有一个布尔属性IsCancellationRequestedThrowIfCancelRequested方法。 如果在CancellationTokenSource的已取消CancellationToken实例上调用Cancel方法,则后者将引发TaskCancelledException 。 我建议使用这种方法。 通过完全控制可以中断异常操作的点,这比以前的选项要好。

停止线程最残酷的选择是调用Win32 API TerminateThread函数。 调用此函数后CLR的行为可能无法预测。 在MSDN上,有关此功能的内容如下: “ TerminateThread是一个危险的功能,仅应在最极端的情况下使用。

使用FromAsync方法将legacy-API转换为基于任务


如果您有幸从事引入任务后开始的项目,并且不再对大多数开发人员造成恐惧,那么您​​将不必处理很多旧的API,无论是第三方还是您的团队过去都曾遭受过折磨。 幸运的是,.NET Framework开发团队照顾了我们,尽管目标可能是照顾好自己。 尽管如此,.NET拥有许多工具,可以轻松地将用旧的异步编程方法编写的代码转换为新的方法。 其中之一是TaskFactory的FromAsync方法。 使用下面的代码示例,我使用该方法将WebRequest类的旧异步方法包装在Task中。

 object state = null; WebRequest wr = WebRequest.CreateHttp("http://github.com"); await Task.Factory.FromAsync( wr.BeginGetResponse, we.EndGetResponse ); 

这只是一个示例,您不太可能使用内置类型来执行此操作,但是任何旧项目都只能使用返回IAsyncResult的BeginDoSomething方法和接受它的EndDoSomething方法。

使用TaskCompletionSource类将legacy-API转换为基于任务


要考虑的另一个重要工具是TaskCompletionSource类。 在功能,目的和操作原理方面,它可以以某种方式提醒我上面编写的ThreadPool类的RegisterWaitForSingleObject方法。 使用此类,您可以轻松方便地在Task中包装旧的异步API。

您会说我已经谈到了用于这些目的的TaskFactory类的FromAsync方法。 在这里,我们将不得不回顾Microsoft在过去15年中一直提供的.net中异步模型开发的全部历史:在基于任务的异步模式(TAP)之前,有异步编程模式(APP),它涉及返回IAsyncResult的 Begin DoSomething方法和接受IAsyncResult的 End DoSomething方法。而且FromAsync方法对于这些年的历史来说还不错,但是随着时间的流逝,它被基于事件的异步模式( EAP )取代,该方法假定事件将在异步操作完成后被调用。

TaskCompletionSource非常适合用于封装围绕事件模型构建的Task和legacy-API。 他的工作实质如下:此类的对象具有Task类型的公共属性,其状态可以通过TaskCompletionSource类的SetResult,SetException等方法来控制。 在将await运算符应用于此Task的地方,它将执行或崩溃,但有例外,具体取决于应用于TaskCompletionSource的方法。 如果一切仍然不清楚,那么让我们看一下此代码示例,其中使用TaskCompletionSource在Task中包装了一些旧的EAP API:触发事件后,Task将被转移到Completed状态,将await运算符应用于此Task的方法将恢复执行获取结果对象。

 public static Task<Result> DoAsync(this SomeApiInstance someApiObj) { var completionSource = new TaskCompletionSource<Result>(); someApiObj.Done += result => completionSource.SetResult(result); someApiObj.Do(); result completionSource.Task; } 

TaskCompletionSource技巧与窍门


使用TaskCompletionSource可以包装所有较旧的API。 使用此类为在不占用线程的任务上设计各种API带来了一种有趣的可能性。 我们记得,流是一种昂贵的资源,并且其数量受到限制(主要受RAM限制)。 通过开发例如具有复杂业务逻辑的已加载Web应用程序,可以轻松实现此限制。 考虑一下我所说的实施“长轮询”这一技巧的可能性。

简而言之,该技巧的本质是这样的:您需要从API接收有关其侧面发生的某些事件的信息,而API由于某种原因不能报告事件,而只能返回状态。 这样的一个例子是在WebSocket时代之前或由于某种原因无法使用该技术时,所有基于HTTP构建的API。 客户端可以询问HTTP服务器。 HTTP服务器本身不能引发与客户端的通信。 一种简单的解决方案是通过计时器询问服务器,但这会在服务器上增加额外的负载,并在平均TimerInterval / 2上造成额外的延迟。为解决此问题,发明了一种称为“长轮询”的技巧,该技巧涉及将服务器的响应延迟到超时到期或一个事件将会发生。 如果发生了事件,则将对其进行处理;否则,将再次发送请求。

 while(!eventOccures && !timeoutExceeded) { CheckTimout(); CheckEvent(); Thread.Sleep(1); } 

但是,一旦等待事件的客户数量增加,这种解决方案就会表现出惊人的效果,因为 每个此类客户端在事件发生时都占据了整个流。 是的,在事件触发时我们会额外获得1ms的延迟,通常这并不重要,但是为什么使软件变得更糟呢? 如果删除Thread.Sleep(1),那么我们将以100%的空闲负载加载一个处理器内核,这是徒劳的,徒劳无功。 使用TaskCompletionSource,您可以轻松地重做此代码并解决上述所有问题:

 class LongPollingApi { private Dictionary<int, TaskCompletionSource<Msg>> tasks; public async Task<Msg> AcceptMessageAsync(int userId, int duration) { var cs = new TaskCompletionSource<Msg>(); tasks[userId] = cs; await Task.WhenAny(Task.Delay(duration), cs.Task); return cs.Task.IsCompleted ? cs.Task.Result : null; } public void SendMessage(int userId, Msg m) { if (tasks.TryGetValue(userId, out var completionSource)) completionSource.SetResult(m); } } 

该代码不支持生产,而只是一个演示。 要在实际情况下使用它,您至少需要处理在没有人期望的时候消息到达的情况:在这种情况下,AsseptMessageAsync方法应该返回一个已经完成的Task。 如果这种情况最常见,则可以考虑使用ValueTask。

收到消息请求后,我们创建TaskCompletionSource并将其放置在字典中,然后我们首先等待发生的事情:指定的时间间隔到期或收到消息。

ValueTask:原因和方式


异步/等待运算符(与yield return运算符一样)从该方法生成状态机,该状态机正在创建一个新对象,这几乎总是不重要的,但在极少数情况下会产生问题。 这种情况可能是一种经常被调用的方法,每秒谈论数以万计的呼叫。 如果编写了这样的方法,以便在大多数情况下绕过所有等待方法,则它返回结果,则.NET提供了一种优化此方法的工具-ValueTask结构。 为了清楚起见,请考虑一个使用示例:我们经常使用一个缓存。 其中包含一些值,然后我们只返回它们,如果没有,则转到它们后面的一些慢速IO。 我要异步执行后者,这意味着整个方法都是异步的。 因此,编写方法的明显方法如下:

 public async Task<string> GetById(int id) { if (cache.TryGetValue(id, out string val)) return val; return await RequestById(id); } 

- , - Roslyn , :

 public Task<string> GetById(int id) { if (cache.TryGetValue(id, out string val)) return Task.FromResult(val); return RequestById(id); } 

hot-path, GC, , IO / :

 public ValueTask<string> GetById(int id) { if (cache.TryGetValue(id, out string val)) return new ValueTask<string>(val); return new ValueTask<string>(RequestById(id)); } 

: , . : ValueTask C# Task .

TaskScheduler': Task'


API, TaskScheduler . , TPL Task' . TaskScheduler. , ParallelExtensionsExtras , microsoft, .NET, Nuget . :

  • CurrentThreadTaskScheduler — Task'
  • LimitedConcurrencyLevelTaskScheduler — Task' N,
  • OrderedTaskScheduler — LimitedConcurrencyLevelTaskScheduler(1), .
  • WorkStealingTaskSchedulerwork-stealing . ThreadPool. , .NET ThreadPool , , . . .. WorkStealingTaskScheduler' , ThreadPool .
  • QueuedTaskScheduler-允许您根据具有优先级的队列规则执行任务
  • ThreadPerTaskScheduler-为在其上运行的每个Task创建一个单独的线程。这对于运行时间异常长的任务很有用。

在Microsoft博客上有关于TaskSchedulers的详细文章

为了方便调试与Visual Studio中与Task相关的所有内容,有一个Tasks窗口。在此窗口中,您可以查看任务的当前状态并转到当前正在执行的代码行。



PLinq和Parallel类


Task' .NET PLinq(Linq2Parallel) Parallel. Linq . - WithDegreeOfParallelism. , PLinq , , : AsParallel Linq . PLinq Partitions. .

Parallel Foreach, For Invoke. . ParallelOptions . TaskScheduler CancellationToken.

结论


, , . , , 15 , . , API, .

:

  • , , .
  • .NET
  • , legacy, API .
  • .NET Thread ThreadPool
  • Thread.Abort, Thread.Interrupt, Win32 API TerminateThread . CancellationToken'
  • — , . , . TaskCompletionSource
  • .NET Task'.
  • c# async/await
  • Task' TaskScheduler'
  • ValueTask hot-paths memory-traffic
  • Tasks Threads Visual Studio
  • PLinq , , partitioning
  • ...

Source: https://habr.com/ru/post/zh-CN452094/


All Articles