我最初将这篇文章发布在CodingSight博客中。
这里也有俄语版本。本文包含我在多线程聚会上的演讲的第二部分。 您可以在
这里和
这里看看第一部分。 在第一部分中,我重点介绍了用于启动线程或Task的基本工具集,跟踪其状态的方法以及PLinq等其他实用工具。 在这一部分中,我将解决在多线程环境中可能遇到的问题以及解决这些问题的一些方法。
目录内容
关于共享资源
没有共享资源,您将无法编写基于多个线程工作的程序。 即使它在您当前的抽象级别上起作用,您也会发现,只要您降低一个或多个抽象级别,它实际上就具有共享资源。 以下是一些示例:
范例1:为避免可能出现的问题,可以使线程使用不同的文件,每个线程使用一个文件。 在您看来,该程序没有任何共享资源。
向下移动几级,您将知道只有一个硬盘驱动器,这取决于驱动程序或操作系统来找到解决硬盘驱动器访问问题的解决方案。
范例2:阅读
示例1之后 ,您决定将文件放在物理上不同的硬件和操作系统的两台不同的远程计算机上。 您还维护两个不同的FTP或NFS连接。
再下降几级,您会了解到什么都没有真正改变,竞争性访问问题现在委派给了网卡驱动程序或运行该程序的计算机的OS。
范例3:经过尝试证明可以编写多线程程序之后,您决定完全放弃文件并将计算移至两个不同的对象,并且指向每个对象的链接仅适用于它们的特定对象。线程。
要想弄清楚这个想法的最后一钉钉子:一个运行时和一个垃圾回收器,一个线程调度程序,一个物理上的统一RAM和一个处理器仍然被视为共享资源。
因此,我们了解到不可能在所有抽象级别和整个技术堆栈范围内编写没有共享资源的多线程程序。 幸运的是,每个抽象级别(作为一般规则)部分或完全解决竞争性访问的问题,或者只是立即拒绝竞争性访问(例如:任何UI框架均不允许使用来自不同线程的元素)。 因此,通常情况下,共享资源的问题会出现在您当前的抽象级别。 为了照顾它们,引入了同步的概念。
多线程环境中的可能问题
我们可以将软件错误分为以下几类:
- 该程序不会产生结果-它会崩溃或冻结。
- 程序给出不正确的结果。
- 该程序产生正确的结果,但不满足某些与功能无关的要求-花费太多时间或资源。
在多线程环境中,导致错误#1和#2的主要问题是
死锁和
竞争条件 。
死锁
死锁是一个共同的障碍。 死锁有很多变体。 以下是最常见的一种:

在
线程1做某事时,
线程2阻止了资源
B。 稍后,
线程1阻塞了资源
A ,并试图阻塞资源B。不幸的是,这永远不会发生,因为
线程2仅在阻塞资源
A之后才释放资源
B。比赛条件
竞争条件是一种情况,计算的行为和结果都取决于执行环境的线程调度程序
问题在于您的程序可能无法正常运行一百次甚至上百万次。
当问题三分之三时,情况可能会变得更糟。 例如,线程调度程序的特定行为可能导致相互死锁。
除了这两个导致显式错误的问题外,还有一些问题,如果不导致错误的计算结果,可能仍使程序花费更多的时间或资源来产生所需的结果。 其中两个问题是“
忙等待”和“
线程饥饿” 。
忙碌中
当程序将处理器资源花费在等待而不是计算上时,就会出现“忙等待”问题。
通常,此问题如下所示:
while(!hasSomethingHappened) ;
这是一个非常糟糕的代码示例,因为它完全占用了处理器的一个核心,而实际上根本没有做任何有生产力的工作。 仅当在不同线程中快速处理值的更改至关重要时,才能证明此类代码是合理的。 “快速”是指您甚至不能等待几纳秒。 在所有其他情况下,也就是在所有情况下都可以提出合理的建议,使用ResetEvent及其Slim版本的变体要方便得多。 我们稍后再讨论。
可能有些读者建议通过在循环中添加Thread.Sleep(1)(或类似的东西)来解决一个核心完全被等待占用的问题。 虽然可以解决此问题,但会创建一个新的-响应更改所需的时间现在平均为0.5毫秒。 一方面,这个值不算多,但另一方面,该值比使用ResetEvent系列的同步原语所能达到的值高得多。
线程饥饿
线程饥饿是程序具有太多并行操作线程的问题。 在这里,我们专门讨论的是计算所占用的线程,而不是等待某些IO的答案。 由于这个问题,我们失去了线程带来的任何可能的性能优势,因为处理器在切换上下文上花费了大量时间。
您可以使用各种分析器找到此类问题。 以下是在时间轴模式下工作的
dotTrace分析器的屏幕截图
(点击放大)。通常,没有遭受线程饥饿的程序在表示线程的图表上没有任何粉红色部分。 此外,在“子系统”类别中,我们可以看到程序等待CPU的时间为30.6%。
诊断出此类问题后,您可以轻松地解决问题:您一次启动了太多线程,因此只启动了更少的线程。
同步方式
联锁
这可能是最轻量的同步方法。 互锁是一组简单的原子操作。 当执行原子操作时,什么也不会发生。 在.NET中,“互锁”由具有相同名称的静态类表示,并带有一系列方法,每个方法都实现一个原子操作。
要实现非原子操作的最终恐怖,请尝试编写一个程序,该程序可以启动10个线程,每个线程将相同的变量增加一百万倍。 完成工作后,输出此变量的值。 不幸的是,这将与一千万有很大的不同。 此外,每次您运行该程序时,它都会有所不同。 发生这种情况的原因是,即使像增量这样的简单操作也不是原子操作,它包括从内存中提取值,计算新值并再次将其写入内存。 因此,两个线程可以执行任何这些操作,在这种情况下,增量将丢失。
Interlocked类提供了Increment / Decrement方法,不难猜测它们应该做什么。 如果您在多个线程中处理数据并进行计算,它们将非常方便。 这样的代码将比经典锁更快地工作。 如果在上一段所述的情况下使用Interlocked,则该程序在任何情况下都将可靠地产生1000万的价值。
CompareExchange方法的功能不是很明显。 但是,它的存在使得可以实现许多有趣的算法。 最重要的是,那些来自无锁家族。
public static int CompareExchange (ref int location1, int value, int comparand);
此方法采用三个值。 第一个通过参考传递,并且当执行比较时,如果location1等于comparand,它将是第二个值。 将返回location1的原始值。 这听起来很复杂,所以编写一段代码执行与CompareExchange相同的操作会更容易:
var original = location1; if (location1 == comparand) location1 = value; return original;
唯一的区别是Interlocked类以原子方式实现了这一点。 因此,如果我们自己编写此代码,则可能会遇到满足location1 == comparand条件的情况。 但是,当执行语句location1 = value时,另一个线程已经更改了location1值,因此它将丢失。
我们可以找到一个很好的示例,说明如何在编译器为任何C#事件生成的代码中使用此方法。
让我们用一个名为MyEvent的事件编写一个简单的类:
class MyClass { public event EventHandler MyEvent; }
现在,让我们在Release配置中构建项目,并通过
启用了“ Show Compiler Generated Code”选项的
dotPeek打开构建:
[CompilerGenerated] private EventHandler MyEvent; public event EventHandler MyEvent { [CompilerGenerated] add { EventHandler eventHandler = this.MyEvent; EventHandler comparand; do { comparand = eventHandler; eventHandler = Interlocked.CompareExchange<EventHandler>(ref this.MyEvent, (EventHandler) Delegate.Combine((Delegate) comparand, (Delegate) value), comparand); } while (eventHandler != comparand); } [CompilerGenerated] remove {
在这里,我们可以看到编译器在后台生成了一个相当复杂的算法。 此算法可防止我们丢失对该事件的订阅,在该事件中,有几个线程同时订阅了该事件。 让我们详细说明add方法,同时牢记CompareExchange方法在后台执行的操作:
EventHandler eventHandler = this.MyEvent; EventHandler comparand; do { comparand = eventHandler;
这是更易于管理的,但可能仍需要说明。 这就是我描述算法的方式:
如果MyEvent与我们开始执行Delegate.Combine时的状态相同,则将其设置为Delegate.Combine返回的值。 如果不是这种情况,请重试直到它起作用。这样,订阅将永远不会丢失。 如果要实现动态的,线程安全的,无锁的数组,则必须解决类似的问题。 如果多个线程突然开始向该数组添加元素,则必须成功添加所有这些元素,这一点很重要。
Monitor.Enter,Monitor.Exit,锁定
这些结构最常用于线程同步。 它们实现了关键部分的概念:即,在Monitor.Enter和Monitor.Exit的调用之间编写的代码只能在一个时间点上由一个线程在一个资源上执行。 锁定运算符用作try-finally中包装的Enter / Exit调用的语法糖。 .NET中关键部分的令人愉快的品质是它支持重入。 这意味着可以执行以下代码而没有任何实际问题:
lock(a) { lock (a) { ... } }
不太可能有人会以这种确切的方式编写代码,但是如果您在整个调用堆栈的深度中的几种方法之间传播此代码,则此功能可以为您节省一些IF。 为了使此技巧起作用,.NET的开发人员必须添加一个限制-您只能将引用类型的实例用作同步对象,并且将几个字节添加到将写入线程标识符的每个对象中。
C#中关键部分工作过程的这种特殊性对锁运算符施加了一个有趣的限制:您不能在锁运算符内部使用await运算符。 起初,这让我感到惊讶,因为可以编译类似的Monitor-Enter / Exit结构。 怎么了 重读上一段并应用一些有关异步/等待的工作原理的知识很重要:等待后的代码不会与等待前的代码在同一线程上执行。 这取决于同步上下文以及是否调用ConfigureAwait方法。 由此,可以在与Monitor.Enter不同的线程上执行Monitor.Exit,这将导致引发SynchronizationLockException。 如果您不相信我,请尝试在控制台应用程序中运行以下代码-它会生成
SynchronizationLockException :
var syncObject = new Object(); Monitor.Enter(syncObject); Console.WriteLine(Thread.CurrentThread.ManagedThreadId); await Task.Delay(1000); Monitor.Exit(syncObject); Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
值得注意的是,在WinForms或WPF应用程序中,如果您从主线程调用此代码,则该代码将正确运行,因为将存在一个同步上下文,该上下文会在调用await之后实现返回UI-Thread的功能。 无论如何,最好不要在包含await运算符的代码上下文中处理关键部分。 在此类示例中,最好使用同步原语,我们将在稍后介绍。
当我们讨论.NET中的关键部分时,重要的是要提到它们如何实现的更多特殊性。 .NET中的关键部分以两种模式工作:旋转等待和核心等待。 我们可以像下面的伪代码一样表示自旋等待算法:
while(!TryEnter(syncObject)) ;
此优化旨在基于即使即使资源当前被占用,也将很快释放它的基础上,在短时间内尽快捕获关键部分。 如果在短时间内没有发生这种情况,线程将切换到核心模式下的等待状态,这会花费一些时间-就像从等待中返回一样。 .NET的开发人员已尽可能优化了短块的方案。 不幸的是,如果许多线程开始拉动它们之间的关键部分,则可能导致CPU突然负载过大。
SpinLock,SpinWait
已经提到了循环等待算法(spin-wait),值得一提的是BCL的SpinLock和SpinWait结构。 如果有理由认为总是有可能非常迅速地获得阻止,则应使用它们。 另一方面,在分析结果表明程序的瓶颈是由使用其他同步原语引起的之前,您不应该真正考虑它们。
Monitor.Wait,Monitor.Pulse [全部]
我们应该同时看这两种方法。 在他们的帮助下,您可以实施各种生产者-消费者方案。
Producer-Consumer是一种多进程/多线程设计模式,表示一个或多个产生数据的线程/进程以及一个或多个处理该数据的进程/线程。 通常,使用共享集合。
这两种方法只能由当前具有块的线程调用。 Wait方法将释放该块并冻结,直到另一个线程调用Pulse。
为了说明这一点,我写了一个小例子:
object syncObject = new object(); Thread t1 = new Thread(T1); t1.Start(); Thread.Sleep(100); Thread t2 = new Thread(T2); t2.Start();
(我在这里使用图像而不是文本来准确显示指令执行顺序)说明:我在启动第二个线程时设置了100毫秒的延迟,以专门保证它将在以后执行。
-T1:第2行线程已启动
-T1:第3行线程进入临界区
-T1:第6行,线程进入睡眠状态
-T2:#3行启动了线程
-T2:#4行冻结并等待关键部分
-T1:7号线,它在等待Pulse发出时让关键部分进入并冻结
-T2:#8行进入临界区
-T2:11号线在脉冲的帮助下向T1发出信号
-T2:第14行来自关键部分。 T1无法在此之前继续执行。
-T1:15号线是从等待中出来的
-T1:16号线从关键部分出来
MSDN中有一个关于使用Pulse / Wait方法的重要说明:Monitor不存储状态信息,这意味着在Wait方法之前调用Pulse方法可能导致死锁。 如果可能的话,最好使用ResetEvent系列中的类之一。前面的示例清楚地显示了Monitor类的Wait / Pulse方法如何工作,但是仍然对应使用它们的情况提出了一些疑问。 一个很好的例子是BlockingQueue <T>的
这种 实现 。 另一方面,System.Collections.Concurrent中的BlockingCollection <T>的实现使用SemaphoreSlim进行同步。
ReaderWriterLockSlim
我非常喜欢这个同步原语,它由System.Threading命名空间中同名的类表示。 我认为,如果开发人员使用此类而不是标准锁,则许多程序会更好地工作。
想法:很多线程可以读取,只有一个线程可以写入。 当线程要写入时,无法开始新的读取-它们将等待写入结束。 还有可升级的读锁概念。 当您在阅读过程中了解需要写东西时,可以使用它-这样的锁将在一个原子操作中转换为写锁。
在System.Threading命名空间中,还存在ReadWriteLock类,但强烈建议不要将其用于新开发。 Slim版本将有助于避免导致死锁的情况,并允许快速捕获块,因为它支持在进入核心模式之前在自旋等待模式下进行同步。
如果您在阅读本文之前不了解此类,那么我想现在您已经记住了最近编写的代码中的许多示例,其中使用这种块方法可以使程序有效地工作。
ReaderWriterLockSlim类的接口简单易懂,但使用起来并不舒服:
var @lock = new ReaderWriterLockSlim(); @lock.EnterReadLock(); try {
我通常喜欢将其包装在一个类中-这使它更加方便。
想法:创建Read / WriteLock方法,该方法将对象与Dispose方法一起返回。 然后,您可以在“使用中”中访问它们,并且在行数方面,它与标准锁的差别可能不会太大。 class RWLock : IDisposable { public struct WriteLockToken : IDisposable { private readonly ReaderWriterLockSlim @lock; public WriteLockToken(ReaderWriterLockSlim @lock) { this.@lock = @lock; @lock.EnterWriteLock(); } public void Dispose() => @lock.ExitWriteLock(); } public struct ReadLockToken : IDisposable { private readonly ReaderWriterLockSlim @lock; public ReadLockToken(ReaderWriterLockSlim @lock) { this.@lock = @lock; @lock.EnterReadLock(); } public void Dispose() => @lock.ExitReadLock(); } private readonly ReaderWriterLockSlim @lock = new ReaderWriterLockSlim(); public ReadLockToken ReadLock() => new ReadLockToken(@lock); public WriteLockToken WriteLock() => new WriteLockToken(@lock); public void Dispose() => @lock.Dispose(); }
这使我们稍后可以在代码中编写以下内容:
var rwLock = new RWLock();
ResetEvent系列
我在该系列中包括以下类:ManualResetEvent,ManualResetEventSlim和AutoResetEvent。
ManualResetEvent类,其Slim版本和AutoResetEvent类可以以两种状态存在:
-非信号状态-在此状态下,所有调用WaitOne的线程都将冻结,直到事件切换到信号状态为止。
-发出信号-在此状态下,先前冻结在WaitOne调用中的所有线程均被释放。 信号事件中的所有新的WaitOne调用都相对立即执行。
AutoResetEvent与ManualResetEvent的不同之处在于,它仅释放
一个线程后会自动切换到非信号状态。 如果在等待AutoResetEvent时冻结了几个线程,则调用Set将只释放一个随机线程,而ManualResetEvent则释放所有线程。
让我们看一下AutoResetEvent如何工作的示例:
AutoResetEvent evt = new AutoResetEvent(false); Thread t1 = new Thread(T1); t1.Start(); Thread.Sleep(100); Thread t2 = new Thread(T2); t2.Start();

在这些示例中,我们可以看到仅在释放在WaitOne调用中冻结的线程之后,事件才会自动切换到非信号状态。
与ReaderWriterLock不同,即使出现了Slim版本,ManualResetEvent也不会被视为过时的。 该类的Slim版本可以在Spin-Wait模式下发生短暂等待时有效; 标准版本适合长时间等待。
除了ManualResetEvent和AutoResetEvent类之外,还有CountdownEvent类。 此类对于实现在并行部分之后将结果合并在一起的算法非常有用。 这种方法称为
fork-join 。 有一篇很棒的
文章专门针对此类,因此在这里我将不对其进行详细描述。
结论
- 使用线程时,有两个问题可能导致错误的结果,甚至导致结果的缺失-竞争状况和死锁。
- 可能导致程序花费更多时间或资源的问题是线程匮乏和繁忙的等待。
- .NET提供了许多同步线程的方法。
- 块等待有两种模式-旋转等待和核心等待。 .NET中的Som.e线程同步原语都使用它们。
- 互锁是一组原子操作,可用于实现无锁算法。 这是最快的同步原语。
- lock和Monitor。Enter / Exit运算符实现了关键部分的概念-一段代码只能在一个时间点由一个线程执行。
- Monitor.Pulse / Wait方法对于实现Producer-Consumer方案很有用。
- 当期望并行读取时,ReaderWriterLockSlim比标准锁定情况更有用。
- ResetEvent类家族对于线程同步很有用。