经验丰富的哲学家或有竞争力的.NET编程


让我们以餐饮哲学家的问题为例,看看.Net中的并行和并行编程如何工作。 这样的计划,从线程/进程的同步到参与者的模型(在以下部分中)。 这篇文章可能对于初次接触或对您的知识有所帮助。


为什么能够做到这一点? 晶体管达到其最小尺寸,摩尔定律取决于光速的限制,因此观察到数量的增长,可以制造更多的晶体管。 同时,数据量在增长,用户期望系统立即做出反应。 在这种情况下,当我们只有一个执行线程时,“常规”编程将不再有效。 必须以某种方式解决同时执行或竞争执行的问题。 此外,此问题存在于不同级别:流级别,进程级别,网络(分布式系统)中的机器级别。 .NET具有高质量,经过时间检验的技术,可以快速有效地解决此类问题。



挑战赛


Edsger Dijkstra早在1965年就向他的学生提出了这个问题。 有一些哲学家(通常是五个)和许多分支。 他们坐在圆桌旁,叉子之间。 哲学家可以从盘子里吃不完的食物,思考或等待。 要吃这个哲学家,您需要拿两个叉子(后者与第一个叉子共享)。 采取和把叉子-两个单独的动作。 所有的哲学家都保持沉默。 他们的任务是找到一种算法,让他们都思考,甚至​​在54年后都会厌倦。


首先,让我们尝试通过使用共享空间来解决此问题。 叉子在桌子上,而哲学家只是在它们被拿走时放回去。 何时出现插头问题,同步有问题? 如果没有插头怎么办? 等,但首先,让我们启动哲学家。


要启动线程,请通过Task.Run方法使用线程池:


 var cancelTokenSource = new CancellationTokenSource(); Action<int> create = (i) => RunPhilosopher(i, cancelTokenSource.Token); for (int i = 0; i < philosophersAmount; i++) { int icopy = i; //      .  RunDeadlock   // ,    .  . philosophers[i] = Task.Run(() => create(icopy), cancelTokenSource.Token); } 

创建线程池以优化线程的创建和删除。 该池有一个任务队列,CLR根据这些任务的数量创建或删除线程。 所有AppDomain的一个池。 几乎应始终使用此池,因为 您不必费心创建,删除线程,它们的队列等。没有池是可能的,但是随后您必须直接使用Thread ,在需要更改线程优先级,执行较长的操作,线程的前景等情况下,建议使用此方法。


而且System.Threading.Tasks.Task类使使用此线程池变得很容易(甚至不使用它)。 这是一个异步操作。 粗略地说,这是相同的Thread ,但具有各种便利:能够在一系列其他任务之后启动任务,从函数返回它们,方便地中断它们等功能。 需要它们来支持异步/等待结构(基于任务的异步模式,用于等待IO操作的语法糖)。 我们将再次讨论。


这里需要CancelationTokenSource ,以便可以通过调用线程的信号终止线程本身。


同步问题


哲学家受阻


好的,我们可以创建线程,让我们尝试吃午饭:


 //    .  : 1 1 3 3 - 1  3    . private int[] forks = Enumerable.Repeat(0, philosophersAmount).ToArray(); //  ,  RunPhilosopher() private void RunDeadlock(int i, CancellationToken token) { //  ,  . : // while(true) // if forks[fork] == 0 // forks[fork] = i+1 // break // Thread.Sleep()  Yield()  SpinWait() void TakeFork(int fork) => SpinWait.SpinUntil(() => Interlocked.CompareExchange(ref forks[fork], i+1, 0) == 0); //  ,    Interlocked.Exchange: void PutFork(int fork) => forks[fork] = 0; while (true) { TakeFork(Left(i)); TakeFork(Right(i)); eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); PutFork(Left(i)); PutFork(Right(i)); Think(i); //   -. token.ThrowIfCancellationRequested(); } } 

在这里,我们首先尝试取左叉,然后取右叉,如果可行,那么我们进食并将它们放回原处。 一把叉子是原子的,即 两个流不能同时取一个(不正确:第一个流读取插头是免费的,第二个也读取,第一个取,第二个取)。 为此,必须使用处理器指令( TSLXCHG )实现Interlocked.CompareExchange ,该指令会阻塞一块内存以进行原子顺序读取和写入。 SpinWait等效于仅带有一点“魔术”的while(true)构造-线程占用了处理器( Thread.SpinWait ),但有时将控制权转移到另一个线程( Thread.Yeild )或Thread.Yeild睡眠状态( Thread.Sleep )。


但是此解决方案不起作用,因为 流量将很快被阻塞(对我而言在一秒钟之内):所有哲学家都拿左叉,而不是右叉。 forks数组的值将为:1 2 3 4 5。


活锁


在图中,阻塞线程(死锁)。 绿色表示执行,红色表示同步,灰色表示睡眠。 菱形表示任务的开始时间。


哲学家的饥饿


尽管没有必要考虑太多食物,但是您必须强迫任何人放弃哲学。 让我们尝试模拟问题中禁食的情况。 饥饿是指溪流正常运转,但没有大量工作的情况,换句话说,这是同样的僵局,只是现在溪流不睡觉,而是在积极寻找食物,却没有食物。 为了避免频繁阻塞,如果我们不能再接插头,我们将把插头放回原处。


 //      RunDeadlock,         . private void RunStarvation(int i, CancellationToken token) { while (true) { bool hasTwoForks = false; var waitTime = TimeSpan.FromMilliseconds(50); //      : bool hasLeft = forks[Left(i)] == i + 1; if (hasLeft || TakeFork(Left(i), i + 1, waitTime)) { if (TakeFork(Right(i), i + 1, TimeSpan.Zero)) hasTwoForks = true; else PutFork(Left(i)); //      . } if (!hasTwoForks) { if (token.IsCancellationRequested) break; continue; } eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); bool goodPhilosopher = i % 2 == 0; //        : if (goodPhilosopher) PutFork(Left(i)); //      ,      . PutFork(Right(i)); Think(i); if (token.IsCancellationRequested) break; } } //     . bool TakeFork(int fork, int philosopher, TimeSpan? waitTime = null) { return SpinWait.SpinUntil( () => Interlocked.CompareExchange(ref forks[fork], philosopher, 0) == 0, waitTime ?? TimeSpan.FromMilliseconds(-1) ); } 

在此代码中,重要的是四分之一的哲学家忘记放左叉。 事实证明,尽管人流优先顺序相同,但他们却吃更多的食物,而其他人则开始挨饿。 他们在这里并没有真正挨饿,因为 坏的哲学家有时会退缩。 事实证明,好人的饮食比坏人的饮食少约5倍。 因此,代码中的一个小错误会导致性能下降。 在这里值得注意的是,当所有哲学家都拿左叉,没有右边,他们放左,等待,再拿左,等等时,这种情况极有可能发生。 这种情况也很饥饿,更像是僵局。 我无法重复。 以下是两个坏哲学家双刀叉牙而两个好哲学家挨饿的情况的图片。


饿死


在这里,您可以看到线程有时会唤醒并尝试获取资源。 四个核心中的两个没有执行任何操作(顶部的绿色图形)。


哲学家之死


好吧,另一个可能打断哲学家光荣晚宴的问题是,如果其中一个人突然用手叉死了(他们会像这样把他埋葬)。 然后邻居们将不吃午餐。 您可以自己为这种情况NullReferenceException示例代码,例如,在哲学家进行分叉之后NullReferenceException引发NullReferenceException 。 而且,顺便说一句,该异常将不会被处理,并且调用代码将不会捕获该异常(为此,例如AppDomain.CurrentDomain.UnhandledException等)。 因此,错误处理程序对于线程本身以及正确的终止是必需的。


服务员


那么,我们如何解决僵局,饥饿和死亡呢? 我们将只允许一个哲学家分叉,并为该位置添加流量互斥。 怎么做? 假设一位服务员站在哲学家旁边,允许一位哲学家拿叉。 我们如何做这个服务生,哲学家如何问他,有趣的问题。


最简单的方法是,当哲学家不断地向服务员索要叉子时。 即 现在,哲学家不会等待附近的插头,而是会等待或询问服务员。 首先,我们为此仅使用用户空间,在该空间中,我们不使用中断来调用内核中的任何过程(以下有关它们)。


用户空间解决方案


在这里,我们将像以前使用一个叉子和两个哲学家一样进行操作,我们将循环旋转并等待。 但是现在这将是所有的哲学家,好像只有一把叉子,即 我们可以说只有那个哲学家从服务生那里拿走了这把“金叉”。 为此,我们使用SpinLock。


 private static SpinLock spinLock = new SpinLock(); //  "" private void RunSpinLock(int i, CancellationToken token) { while (true) { //    busy waiting.   try,  //        SpinLock. bool hasLock = false; spinLock.Enter(ref hasLock); try { //       (mutual exclusion). forks[Left(i)] = i + 1; //   ,  . forks[Right(i)] = i + 1; eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); forks[Left(i)] = 0; forks[Right(i)] = 0; } finally { if(hasLock) spinLock.Exit(); //     . } Think(i); if (token.IsCancellationRequested) break; } } 

SpinLock是一个阻止程序,大致来说, while(true) { if (!lock) break; } while(true) { if (!lock) break; } ,但比SpinWait (在SpinWait使用)具有更多的“魔力”。 现在,他知道如何计算正在等待的人数,让他们多入睡。 等。一般而言,会尽一切可能进行优化。 但是我们必须记住,这仍然是一个活动周期,它消耗处理器资源并保留一个线程,如果其中一个哲学家比其他哲学家具有优先权,但又没有黄金叉(Priority Inversion问题),则会导致饥饿。 因此,我们仅将其用于共享内存中非常短的更改,而不会引起任何第三方调用,嵌套锁等意外情况。


自旋锁


图为SpinLock 。 溪流一直在为“金叉”而战。 发生故障-在图中,所选区域。 核心没有完全使用:这四个线程中只有大约2/3。


这里的另一种解决方案是仅使用具有相同积极期望的Interlocked.CompareExchange ,如上面的代码所示(在饥饿的哲学家中),但是如上所述,从理论上讲,这可能会导致阻塞。


关于Interlocked ,值得一提的是,不仅有CompareExchange ,而且还有其他用于原子读写的方法。 并且通过在另一线程设法进行更改的情况下重复更改(读1,读2,写2,写1为错误),可以将其用于一个值的复杂更改(互锁的任何模式)。


内核模式解决方案


为了避免循环丢失资源,让我们看看如何阻止流。 换句话说,继续我们的示例,我们将看到服务员如何使哲学家入睡并仅在必要时将其唤醒。 首先,让我们看看如何通过操作系统的内核模式来执行此操作。 事实证明,那里的所有结构通常都比用户空间中的结构慢。 慢几倍,例如AutoResetEvent可能比SpinLock [Richter]慢53倍。 但是在他们的帮助下,您可以在整个系统中同步或不同步管理流程。


这里的主要结构是Dijkstroy在半个多世纪前提出的信号量。 简单来说,信号量是一个由系统控制的正整数,对其进行两个操作-增加和减少。 如果缩减不起作用,则为零,则调用线程被阻塞。 当其他活动线程/进程增加该数目时,将跳过该线程,并且信号量再次减少所传递的线程数。 您可以想象火车在信号量的瓶颈中。 .NET提供了几种具有类似功能的设计: AutoResetEventManualResetEventMutexSemaphore本身。 我们将使用AutoResetEvent ,这是这些构造中最简单的构造:只有两个值是0和1(false,true)。 如果值是0,则它的WaitOne()方法将阻止调用线程;如果值是1,则降低到0并跳过它。 然后Set()方法增加到1,并跳过一个等待,然后再次降低到0。它的作用就像地铁中的旋转门。


我们使解决方案复杂化,并且将锁用于每个哲学家,而不是一次用于每个人。 即 现在一次可以有几位哲学家,而没有一个。 但是,我们再次阻止对表的访问以正确获取分叉,从而避免比赛(比赛条件)。


 //    . // : new AutoResetEvent(true)  . private AutoResetEvent[] philosopherEvents; //     /   . private AutoResetEvent tableEvent = new AutoResetEvent(true); //  . public void Run(int i, CancellationToken token) { while (true) { TakeForks(i); //  . // .    . eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); PutForks(i); //     . Think(i); if (token.IsCancellationRequested) break; } } //    . void TakeForks(int i) { bool hasForks = false; while (!hasForks) //    (  ). { //    ,    . tableEvent.WaitOne(); if (forks[Left(i)] == 0 && forks[Right(i)] == 0) forks[Left(i)] = forks[Right(i)] = i + 1; hasForks = forks[Left(i)] == i + 1 && forks[Right(i)] == i + 1; if (hasForks) //   ,   .  Set //  ,   true. philosopherEvents[i].Set(); //   .    tableEvent  false. tableEvent.Set(); //   true,  ,   false,    Set  . philosopherEvents[i].WaitOne(); } } //     . void PutForks(int i) { tableEvent.WaitOne(); //    . forks[Left(i)] = 0; //  ,     ,  AutoResetEvent  true. philosopherEvents[LeftPhilosopher(i)].Set(); forks[Right(i)] = 0; philosopherEvents[RightPhilosopher(i)].Set(); tableEvent.Set(); } 

要了解这里发生的情况,请考虑当哲学家未能分叉时的情况,那么他的行为将是这样。 他正在等待进入桌子。 收到它后,他尝试拿起叉子。 无法解决。 他可以访问该表(互斥)。 它通过其“旋转门”( AutoResetEvent )(首先它们是打开的)。 它再次进入循环,因为 他没有叉子。 试图抓住他们,停在他的旋转门处。 右边或左边的一些更幸运的邻居在吃完饭后解开了我们的哲学家的门,“打开他的旋转门”。 我们的哲学家第二次通过了它(他紧随其后)。 第三次尝试拿叉。 祝你好运 并通过他的旋转门吃饭。


例如,当此类代码中存在随机错误(它们始终存在)时,例如,错误地指定了邻居或为每个人AutoResetEvent了相同的AutoResetEvent对象( Enumerable.Repeat ),那么哲学家将等待开发人员,因为 在此类代码中发现错误是一项相当艰巨的任务。 该解决方案的另一个问题是,它不能保证任何哲学家都不会挨饿。


混合解决方案


当我们停留在用户模式并旋转时,以及当我们阻塞内核中的线程时,我们研究了两种同步方法。 第一种方法适用于短锁,第二种方法适用于长锁。 通常,您首先需要短暂地等待变量在循环中更改,然后在等待时间较长时阻塞线程。 这种方法是在所谓的 混合设计。 这里有与内核模式相同的构造,但现在在用户模式下有一个循环: SemaphorSlimManualResetEventSlim等。这里最受欢迎的构造是Monitor ,因为 C#具有众所周知的lock语法。 Monitor是相同的信号量,最大值为1(互斥量),但支持循环等待,递归,条件变量模式(在下面进行介绍)等。让我们看一下它的解决方案。


 //      ,   . private readonly object _lock = new object(); //   . private DateTime?[] _waitTimes = new DateTime?[philosophersAmount]; public void Run(int i, CancellationToken token) { while (true) { TakeForks(i); eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); PutForks(i); Think(i); if (token.IsCancellationRequested) break; } } //     Condition Variable . bool CanIEat(int i) { //   : if (forks[Left(i)] != 0 && forks[Right(i)] != 0) return false; var now = DateTime.Now; // ,     ,  . foreach(var p in new int[] {LeftPhilosopher(i), RightPhilosopher(i)}) if (_waitTimes[p] != null && now - _waitTimes[p] > now - _waitTimes[i]) return false; return true; } void TakeForks(int i) { //   .   : lock(_lock) {..}. //   try,     . bool lockTaken = false; Monitor.Enter(_lock, ref lockTaken); try { _waitTimes[i] = DateTime.Now; // Condition Variable .  ,    //  .    -  Pulse / PulseAll. while (!CanIEat(i)) Monitor.Wait(_lock); forks[Left(i)] = i + 1; forks[Right(i)] = i + 1; _waitTimes[i] = null; } finally { if (lockTaken) Monitor.Exit(_lock); } } void PutForks(int i) { //   : lock (_lock) {..}. bool lockTaken = false; Monitor.Enter(_lock, ref lockTaken); try { forks[Left(i)] = 0; forks[Right(i)] = 0; //        Monitor.Exit. Monitor.PulseAll(_lock); } finally { if (lockTaken) Monitor.Exit(_lock); } } 

在这里,我们再次锁定整个表以访问分叉,但是现在我们一次解锁所有流,而不是在某人吃完东西后立即解锁邻居。 即 首先,有人吃东西并挡住了邻居,当某人吃完东西,但又想立即吃东西时,他进入了锁并唤醒了邻居,因为 他的等待时间较短。


因此,我们避免了某些哲学家的僵局和饥饿。 我们使用一个循环等待一小会儿,然后将流阻塞一会儿。 与仅解锁邻居相比,一次解锁全部的工作要慢得多,就像使用AutoResetEvent的解决方案一样,但是差别不应该太大,因为 线程应首先保持用户模式。


语法lock有令人不愉快的惊喜。 他们建议直接使用Monitor [Richter] [Eric Lippert]。 其中之一是,即使发生异常, lock始终退出Monitor ,然后另一个线程可以更改共享内存的状态。 在这种情况下,通常最好进入僵局或以某种方式安全地完成程序。 Monitor另一个令人惊讶的是,它使用所有对象中的同步块( SyncBlock )。 因此,如果您选择了错误的对象,则很容易导致死锁(例如,如果您对插入的字符串进行了锁定)。 为此,我们始终使用隐藏的对象。


条件变量模式使您可以更简洁地实现对复杂条件的期望。 我认为,在.NET中,它是不完整的,因为 从理论上讲,应该在多个变量上有多个队列(例如在Posix Threads中),而不是一个锁。 这样一来,所有哲学家都可以使用它们。 但是即使采用这种形式,它也可以减少代码。


许多哲学家或async / await


好的,现在我们可以有效地阻止线程了。 但是,如果我们得到很多哲学家怎么办? 100? 10000? 例如,我们收到了到Web服务器的100,000个请求。 为每个请求创建流将是开销,因为 如此多的线程将不会并行执行。 将执行尽可能多的逻辑核心(我有4个)。 而其他所有人只会占用资源。 解决此问题的一种方法是异步/等待模式。 其思想是,如果您需要等待流继续运行,则该函数将不保存流。 并且当她执行此操作时,她会恢复执行(但不一定在同一线程中!)。 在我们的情况下,我们将等待插头。


SemaphoreSlim具有一个WaitAsync()方法。 这是使用此模式的实现。


 //   ,  . -  : Task.Run(() => Run(i, cancelTokenSource.Token)); //  . //   async --      . public async Task Run(int i, CancellationToken token) { while (true) { // await --   - . await TakeForks(i); //  await,     . eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1); //      . await PutForks(i); Think(i); if (token.IsCancellationRequested) break; } } async Task TakeForks(int i) { bool hasForks = false; while (!hasForks) { //    : await _tableSemaphore.WaitAsync(); if (forks[Left(i)] == 0 && forks[Right(i)] == 0) { forks[Left(i)] = i+1; forks[Right(i)] = i+1; hasForks = true; } _tableSemaphore.Release(); //  ,    : if (!hasForks) await _philosopherSemaphores[i].WaitAsync(); } } //       . async Task PutForks(int i) { await _tableSemaphore.WaitAsync(); forks[Left(i)] = 0; // "" ,   "". _philosopherSemaphores[LeftPhilosopher(i)].Release(); forks[Right(i)] = 0; _philosopherSemaphores[RightPhilosopher(i)].Release(); _tableSemaphore.Release(); } 

async / await , Task . , , Task. , , . , , , , . . async / await .


. 100 4 , 8 . Monitor 4 , . 4 2. async / await 100, 6.8 . , 6 . Monitor .


结论


, .NET . , , , . . , , , TPL Dataflow, Reactive , Software Transaction .


资料来源


Source: https://habr.com/ru/post/zh-CN447898/


All Articles