具有依赖项的并行任务-.NET示例

大家好!

本周,我们捐赠了曼宁(Manning)的《 .NET中并发性》(Concurrency in .NET),这是一本雄心勃勃的复杂书籍:



作者在中型网站上张贴了第13章的摘录,我们建议在首映前就对其进行评估。
祝您阅读愉快!

假设您需要编写一个工具,使您可以执行许多异步任务,每个任务都有其自己的一组依赖关系,这些依赖关系会影响操作的顺序。 可以通过一致和强制执行来解决此类问题,但是如果要获得最佳性能,则顺序操作将不适合您。 相反,您需要组织并行任务。 许多竞争性问题可以解释为原子操作的静态集合,它们的输入和输出数据之间具有依赖性。 操作完成后,其输出将用作其他相关操作的输入。 为了优化性能,必须根据任务的依赖关系来分配任务,并且必须对算法进行配置,以使依赖的任务尽可能按需要顺序执行,并尽可能并行执行。

您想要使一个可重用的组件并行执行一系列任务,并确保考虑到所有可能影响操作顺序的依赖项。 如何建立这样的编程模型,以确保此操作与其他操作之间出现了哪些依赖关系,从而确保并行或顺序执行的一组有效操作的基本并行性?

解决方案:我们使用F#中的MailboxProcessor类实现依赖关系图,并提供方法作为标准任务,以便可以从C#中使用它们。

该解决方案称为有向无环图(DAG),旨在通过将操作分为具有明确定义的依存关系的原子问题序列来形成图。 在这种情况下,图的非循环本质很重要,因为它消除了任务之间出现死锁的可能性,只要任务实际上是完全原子的即可。 设置图形时,重要的是要了解任务之间的所有依赖关系,尤其是隐式依赖关系,这可能导致死锁或竞争条件。 以下是一个类似图的数据结构的典型示例,通过它可以想象在规划给定图中的操作之间的交互时出现的限制。

图是一种非常强大的数据结构,并且可以在此基础上编写强大的算法。



1图是由边连接的顶点的集合。 在此有向图表示中,节点1取决于节点4和5,节点2取决于节点5,节点3取决于节点5和6,依此类推。

DAG结构可作为并行执行任务的策略,同时考虑到依赖关系的顺序,从而提高了生产率。 可以使用F#中的MailboxProcessor类来确定这种图形的结构。 在此类中,保留以边依赖关系的形式注册执行的任务的内部状态。

定向非循环图验证

在使用任何图形数据结构(例如DAG)时,必须注意边缘的正确配准。 例如,回到图1:如果我们注册了依赖于节点5的节点2,但是节点5不存在,会发生什么呢? 还可能发生某些边缘彼此依赖的情况,这导致定向循环。 在有针对性的周期的情况下,并行执行某些任务至关重要。 否则,某些任务可能会永远等待其他任务完成,并且会发生死锁。

通过拓扑排序解决了这个问题:这意味着我们可以对图的所有顶点进行排序,以使任何边都从具有较低数字的顶点到具有较高数字的顶点相连。 因此,如果任务A在任务B之前完成,任务B在任务C之前完成,而任务C在任务A之前完成,那么将出现一个环形链接,系统将通知您此错误,并引发异常。 如果在订单管理中出现定向循环,那么就没有解决方案。 这种检查称为“在有向图中找到一个循环”。 如果有向图满足所描述的规则,则它是非常适合并行运行多个任务的有向无环图,在这些任务之间有依赖关系。

在线源代码中包含清单2的完整版本,其中包含DAG验证代码。

在下面的清单中,F# MailboxProccessor类用作实现DAG的理想候选者,该DAG允许并行执行与依赖项有关的操作。 首先,让我们定义一个带有标签的联合,我们将通过该联合管理任务并实现其依赖关系。

清单1:用于根据任务的依赖性来协调任务的消息类型和数据结构

 type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option } 


#A将基本的dagAgent发送到ParallelTasksDAG ,后者负责协调任务

#B包装每个任务的详细信息以完成

TaskMessage类型表示发送到ParallelTasksDAG类型的基本代理的消息包装器。 这些消息用于协调任务和同步依赖关系。 TaskInfo类型包含并跟踪DAG执行期间已注册任务的详细信息,包括依赖项边。 捕获执行上下文(https://msdn.microsoft.com/zh-cn/library/system.threading.executioncontext(v=vs.110).aspx)的目的是为了访问延迟执行中的信息,例如,此类信息:current用户,与执行逻辑线程关联的任何状态,有关安全访问代码的信息等。 触发事件后,将发布开始时间和结束时间。

清单2 F#上的DAG代理,用于并行化依赖项相关的操作

 type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O 


#A onTaskCompletedEvent类的实例,用于通知任务完成

#B代理的内部状态,用于跟踪任务寄存器及其相关性。 集合是可变的,因为状态在执行ParallelTasksDAG期间会更改,并且因为它们位于代理中,所以它们继承了线程安全性

#C异步等待执行

#D启动ParallelTasksDAG的消息的外壳

#E集合映射到具有要运行任务的单调递增索引

#F该过程遍历任务列表,分析与其他任务的依存关系以创建拓扑结构,在拓扑结构中按任务顺序显示

#G消息包装器,用于对任务进行排队,执行任务,以及最终在任务完成后从活动状态从代理状态删除此任务

#H如果拾取的ExecutionContextnull ,则在当前上下文中运行任务,否则转到#I

#I在截获的ExecutionContext运行任务

onTaskCompleted并发布onTaskCompleted事件,以通知您任务已完成。 该事件包含有关任务的信息。

#M消息包装器,用于添加要根据其依赖关系执行的任务

#N开始执行已注册的任务

#O添加任务,其依赖项和当前的ExecutionContext以执行DAG。

AddTask函数的目的是注册具有任意依赖关系边的任务。 此功能接受唯一的ID,要执行的任务以及代表该任务可以完成之前必须完成的所有其他注册任务的ID的许多边。 如果数组为空,则表示没有依赖关系。 一个名为dagAgentMailboxProcessor实例以“任务”的当前状态存储已注册的任务,该状态是一个字典( tasks : Dictionary<int, TaskInfo> ),该tasks : Dictionary<int, TaskInfo>将每个任务的ID及其详细信息相关联。 此外,Agent还通过每个任务的ID( edges : Dictionary<int, int list> )存储边缘依赖项的状态。 当代理收到有关需要开始执行的通知时(作为此过程的一部分),将检查边缘的所有依赖项是否都已注册,并且图中没有周期。 在线代码中提供的ParallelTask​​sDAG的完整实现中可以使用此验证步骤。 接下来,我在C#中提供一个示例,其中引用了F#运行ParallelTasksDAG (并使用它)的库。 已注册的任务反映了上面图5中显示的依赖性。 1。

 Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks(); 

辅助功能的目的是显示一条消息,说明任务已经开始,请参考当前线程的Id确认多线程。 另一方面,将记录OnTaskCompleted事件,以向控制台发出有关每个任务完成的通知,并将任务ID和当前线程的ID输出。 这是调用ExecuteTasks方法时得到的输出。

 Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30 

如您所见,任务在不同的线程中并行执行(它们的线程ID不同),并且保留了依存关系的顺序。

本质上,这就是具有依赖项的任务并行化的方式。 在《 .NET中的并发性》一书中了解更多信息。

Source: https://habr.com/ru/post/zh-CN422571/


All Articles