المهام الموازية مع التبعيات - مثال .NET

مرحبا زملائي!

لقد تبرعنا هذا الأسبوع بكتاب Manning Concurrency in .NET ، وهو كتاب طموح للتعقيد:



يرجى من المؤلف نشر مقتطفات من الفصل الثالث عشر على موقع الوسيط ، والتي نقترح تقييمها قبل فترة طويلة من العرض الأول.
هل لديك قراءة لطيفة!

افترض أنك تحتاج إلى كتابة أداة تسمح لك بتنفيذ عدد من المهام غير المتزامنة ، لكل منها مجموعة من التبعيات الخاصة بها والتي تؤثر على ترتيب العمليات. يمكن حل هذه المشاكل من خلال التنفيذ المتسق والضروري ، ولكن إذا كنت ترغب في تحقيق أقصى أداء ، فلن تنجح العمليات المتسلسلة بالنسبة لك. بدلاً من ذلك ، تحتاج إلى تنظيم التنفيذ الموازي للمهام. يمكن تفسير العديد من المشاكل التنافسية كمجموعات ثابتة للعمليات الذرية مع التبعيات بين بيانات المدخلات والمخرجات. عند الانتهاء من العملية ، يتم استخدام ناتجها كمدخل لعمليات أخرى تابعة. لتحسين الأداء ، يجب تعيين المهام بناءً على تبعياتها ، ويجب تكوين الخوارزمية بحيث يتم تنفيذ المهام التابعة بالتسلسل حسب الضرورة ، وبالتوازي قدر الإمكان.

تريد إنشاء مكون قابل لإعادة الاستخدام يقوم بسلسلة من المهام بالتوازي ، والتأكد من أن جميع التبعيات التي يمكن أن تؤثر على ترتيب العمليات تؤخذ في الاعتبار. كيفية بناء نموذج برمجة كهذا يضمن التوازي الأساسي لمجموعة من العمليات التي تتم بكفاءة ، سواء بالتوازي أو بالتتابع ، اعتمادًا على ما هي التبعيات التي تنشأ بين هذه العملية وغيرها؟

الحل: ننفذ الرسم البياني للتبعية باستخدام فئة MailboxProcessor من F # ونقدم طرقًا كمهام قياسية بحيث يمكن استهلاكها من C #

يُسمى هذا الحل الرسم البياني الحلقي الموجه (DAG) وهو مصمم لتشكيل رسم بياني عن طريق تقسيم العمليات إلى سلاسل من المشاكل الذرية مع تبعيات محددة جيدًا. في هذه الحالة ، يعد الجوهر غير الدوري للرسم البياني مهمًا ، لأنه يلغي إمكانية حدوث جمود بين المهام ، شريطة أن تكون المهام في الواقع ذرية تمامًا. عند تعيين رسم بياني ، من المهم فهم جميع التبعيات بين المهام ، خاصة التبعيات الضمنية التي يمكن أن تؤدي إلى طريق مسدود أو ظروف السباق. فيما يلي مثال نموذجي على بنية بيانات تشبه الرسم البياني ، والتي يمكنك من خلالها تخيل القيود التي تنشأ عند تخطيط التفاعلات بين العمليات في رسم بياني معين.

الرسم البياني هو بنية بيانات قوية للغاية ، ويمكن للمرء كتابة خوارزميات قوية على أساسها.



التين. 1 الرسم البياني عبارة عن مجموعة من القمم المتصلة بالحواف. في هذا التمثيل للرسم البياني الموجه ، تعتمد العقدة 1 على العقدتين 4 و 5 ، العقدة 2 تعتمد على العقدة 5 ، العقدة 3 تعتمد على العقد 5 و 6 ، إلخ.

هيكل DAG قابل للتطبيق كاستراتيجية للتنفيذ الموازي للمهام مع مراعاة ترتيب التبعيات ، مما يحسن الإنتاجية. يمكن تحديد بنية مثل هذا الرسم البياني باستخدام فئة MailboxProcessor من F # ؛ في هذه الفئة ، يتم الحفاظ على الحالة الداخلية للمهام المسجلة للتنفيذ في شكل تبعيات الحواف.

التحقق من الرسم البياني الحلقية الموجه

عند العمل مع أي هيكل بيانات الرسم البياني ، مثل DAG ، عليك الاهتمام بالتسجيل الصحيح للحواف. على سبيل المثال ، بالعودة إلى الشكل 1: ماذا يحدث إذا قمنا بتسجيل العقدة 2 مع تبعيات للعقدة 5 ، ولكن العقدة 5 غير موجودة؟ قد يحدث أيضًا أن بعض الحواف تعتمد على بعضها البعض ، مما يؤدي إلى دورة موجهة. في وجود دورة موجهة ، من المهم القيام ببعض المهام بالتوازي ؛ خلاف ذلك ، قد تنتظر بعض المهام إلى الأبد حتى يكتمل الآخرون وسيحدث طريق مسدود.

يتم حل المشكلة عن طريق الفرز الطوبولوجي: وهذا يعني أنه يمكننا طلب جميع رؤوس الرسم البياني بحيث تؤدي أي حافة من الرأس برقم أقل إلى قمة برقم أعلى. لذلك ، إذا كانت المهمة A يجب أن تنتهي قبل المهمة B ، والمهمة B - قبل المهمة C ، والتي ، بدورها ، يجب أن تكتمل قبل المهمة A ، يظهر رابط دائري ، وسوف يخطرك النظام بهذا الخطأ ، مما يؤدي إلى استثناء. إذا نشأت دورة موجهة في إدارة الطلبات ، فلا يوجد حل. يسمى هذا النوع من "البحث عن دورة في رسم بياني موجه". إذا كان الرسم البياني الموجه يفي بالقواعد الموصوفة ، فهو رسم بياني غير دوري موجه مناسب تمامًا لتشغيل العديد من المهام بالتوازي ، والتي يوجد بينها تبعيات.

النسخة الكاملة من القائمة 2 ، التي تحتوي على رمز التحقق DAG ، موجودة في شفرة المصدر عبر الإنترنت.

في القائمة التالية ، يتم استخدام فئة F # MailboxProccessor كمرشح مثالي لتطبيق DAG التي تسمح بتنفيذ العمليات المتعلقة بالتبعية بالتوازي. أولاً ، دعنا نحدد اتحادًا موصوفًا سندير به المهام ونحقق تبعياتها.

سرد 1 نوع الرسالة وهيكل البيانات لتنسيق المهام وفقا تبعياتهم

 type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option } 


#A يرسل dagAgent الأساسي إلى ParallelTasksDAG ، المسؤول عن تنسيق المهام

#B يلتف تفاصيل كل مهمة لإكمالها

يمثل نوع TaskMessage تغليف الرسائل المرسلة إلى العامل الأساسي لنوع ParallelTasksDAG . يتم استخدام هذه الرسائل لتنسيق المهام ومزامنة التبعيات. يحتوي نوع TaskInfo على تفاصيل المهام المسجلة ويتتبعها أثناء تنفيذ DAG ، بما في ذلك حواف التبعية. يتم التقاط سياق التنفيذ (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) لغرض الوصول إلى المعلومات في تأخر التنفيذ ، على سبيل المثال ، مثل هذه المعلومات: الحالية المستخدم ، أي حالة مرتبطة بسلسلة التنفيذ المنطقية ، معلومات حول الوصول الآمن إلى التعليمات البرمجية ، إلخ. بعد تشغيل الحدث ، يتم نشر أوقات البدء والانتهاء.

إدراج وكيل DAG رقم 2 في F # لموازنة العمليات المتعلقة بالتبعية

 type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O 


onTaskCompletedEvent مثيل لفئة onTaskCompletedEvent ، يُستخدم onTaskCompletedEvent إكمال المهمة

#B الحالة الداخلية للوكيل لتتبع سجلات المهام وتبعياتها. المجموعات قابلة للتغيير لأن الحالة تتغير أثناء تنفيذ ParallelTasksDAG ، ولأنها ورثت سلامة الخيط لأنها تقع في Agent

#C في انتظار التنفيذ بشكل غير متزامن

#D Shell لرسالة تقوم بتشغيل ParallelTasksDAG

تم تعيين مجموعة #E على فهرس متزايد رتيبيًا مع مهمة للتشغيل

#F تتكرر العملية عبر قائمة المهام ، وتحليل التبعيات مع المهام الأخرى لإنشاء هيكل طوبولوجي يتم فيه تقديم ترتيب المهام

#G Message المجمّع لقائمة انتظار المهمة وتنفيذها ، وفي النهاية ، لإزالة هذه المهمة من حالة الوكيل باعتبارها تبعية نشطة بعد اكتمال المهمة

#H إذا كان ExecutionContext تم اختياره null ، فقم بتشغيل المهمة في السياق الحالي ، وإلا انتقل إلى #I

#I تشغيل المهمة في ExecutionContext اعتراضها

onTaskCompleted ونشر حدث onTaskCompleted لإعلامك بإكمال المهمة. يحتوي الحدث على معلومات حول المهمة.

#M Message المجمّع لإضافة مهمة لتنفيذها وفقًا لتبعياتها ، إن وجدت

#N يبدأ تنفيذ المهام المسجلة

#O إضافة مهمة ، وتبعياتها ، ونوع ExecutionContext الحالي لتنفيذ DAG.

الغرض من وظيفة AddTask هو تسجيل مهمة بحواف تبعية عشوائية. تقبل هذه الوظيفة معرفًا فريدًا ومهمة يتم تنفيذها والعديد من الحواف التي تمثل معرفات جميع المهام المسجلة الأخرى التي يجب إكمالها قبل إكمال هذه المهمة. إذا كانت المصفوفة فارغة ، فهذا يعني أنه لا توجد تبعيات. يقوم مثيل MailboxProcessor المسمى dagAgent بتخزين المهام المسجلة في الحالة الحالية لـ "المهام" ، وهو القاموس ( tasks : Dictionary<int, TaskInfo> ) الذي يربط معرف كل مهمة وتفاصيلها. علاوة على ذلك ، يقوم الوكيل أيضًا بتخزين حالة تبعيات الحافة حسب معرف كل مهمة ( edges : Dictionary<int, int list> ). عندما يتلقى الوكيل إشعارًا حول الحاجة إلى بدء التنفيذ ، كجزء من هذه العملية ، يتم التحقق من أن جميع تبعيات الحواف مسجلة ، وأنه لا توجد دورات في الرسم البياني. تتوفر خطوة التحقق هذه في التنفيذ الكامل لـ ParallelTasksDAG المتوفرة في التعليمات البرمجية عبر الإنترنت. بعد ذلك ، أقدم مثالًا في C # ، حيث أشير إلى المكتبة التي F # لتشغيل ParallelTasksDAG (واستهلاكها). تعكس المهام المسجلة التبعيات الواردة أعلاه في الشكل. 1.

 Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks(); 

الغرض من الوظيفة المساعدة هو عرض رسالة أن المهمة قد بدأت ، تشير إلى Id سلسلة الرسائل الحالية لتأكيد تعدد مؤشرات الترابط. من ناحية أخرى ، يتم تسجيل حدث OnTaskCompleted لإعطاء إشعار حول اكتمال كل مهمة مع إخراج ID المهمة ID مؤشر الترابط الحالي إلى وحدة التحكم. هذا هو الناتج الذي نحصل عليه عند استدعاء طريقة ExecuteTasks .

 Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30 

كما ترى ، يتم تنفيذ المهام بالتوازي في سلاسل ID مختلفة (تختلف ID سلسلة ID بالنسبة لهم) ، ويتم الحفاظ على ترتيب التبعيات.

في الجوهر ، هذه هي الطريقة التي تتوازى بها المهام مع التبعيات. اقرأ المزيد في كتاب Concurrency in .NET.

Source: https://habr.com/ru/post/ar422571/


All Articles