Parallélisation des tâches avec les dépendances - exemple .NET

Bonjour collègues!

Cette semaine, nous avons fait don du livre de Manning Concurrency in .NET, un livre ambitieux de complexité:



L'auteur a aimablement posté sur le site Medium un extrait du 13ème chapitre, que nous proposons d'évaluer bien avant la première.
Bonne lecture!

Supposons que vous deviez écrire un outil qui vous permet d'effectuer un certain nombre de tâches asynchrones, chacune ayant son propre ensemble de dépendances qui affectent l'ordre des opérations. Ces problèmes peuvent être résolus avec une exécution cohérente et impérative, mais si vous souhaitez obtenir des performances maximales, les opérations séquentielles ne fonctionneront pas pour vous. Au lieu de cela, vous devez organiser l'exécution parallèle des tâches. De nombreux problèmes de concurrence peuvent être interprétés comme des collections statiques d'opérations atomiques avec des dépendances entre leurs données d'entrée et de sortie. Une fois l'opération terminée, sa sortie est utilisée comme entrée pour d'autres opérations dépendantes. Pour optimiser les performances, les tâches doivent être affectées en fonction de leurs dépendances et l'algorithme doit être configuré de sorte que les tâches dépendantes soient effectuées de manière séquentielle autant que nécessaire et aussi parallèle que possible.

Vous souhaitez créer un composant réutilisable qui exécute une série de tâches en parallèle et vous assurer que toutes les dépendances susceptibles d'affecter l'ordre des opérations sont prises en compte. Comment construire un tel modèle de programmation qui assurerait le parallélisme de base d'une collection d'opérations effectuées efficacement, en parallèle ou séquentiellement, selon les dépendances qui surgissent entre cette opération et les autres?

Solution: Nous implémentons le graphe de dépendances à l'aide de la classe MailboxProcessor de F # et fournissons des méthodes en tant que tâches standard afin qu'elles puissent être consommées à partir de C #

Cette solution est appelée le graphique acyclique orienté (DAG) et est conçue pour former un graphique en divisant les opérations en séquences de problèmes atomiques avec des dépendances bien définies. Dans ce cas, l'essence acyclique du graphique est importante, car elle élimine la possibilité de blocages entre les tâches, à condition que les tâches soient réellement complètement atomiques. Lors de la définition d'un graphique, il est important de comprendre toutes les dépendances entre les tâches, en particulier les dépendances implicites qui peuvent entraîner des blocages ou des conditions de concurrence. Voici un exemple typique d'une structure de données de type graphique, avec laquelle vous pouvez imaginer les limitations qui surviennent lors de la planification des interactions entre les opérations dans un graphique donné.

Un graphique est une structure de données extrêmement puissante, et on peut écrire des algorithmes puissants sur sa base.



Fig. 1 Un graphique est un ensemble de sommets reliés par des arêtes. Dans cette représentation du graphe orienté, le nœud 1 dépend des nœuds 4 et 5, le nœud 2 dépend du nœud 5, le nœud 3 dépend des nœuds 5 et 6, etc.

La structure DAG est applicable comme stratégie pour l'exécution parallèle de tâches en tenant compte de l'ordre des dépendances, ce qui améliore la productivité. La structure d'un tel graphique peut être déterminée à l'aide de la classe MailboxProcessor de F #; dans cette classe, l'état interne des tâches enregistrées pour exécution sous forme de dépendances de bords est conservé.

Validation du graphique acyclique orienté

Lorsque vous travaillez avec n'importe quelle structure de données de graphique, telle que DAG, vous devez prendre soin de l'enregistrement correct des bords. Par exemple, revenons à la figure 1: que se passe-t-il si nous avons enregistré le nœud 2 avec des dépendances sur le nœud 5, mais que le nœud 5 n'existe pas? Il peut également arriver que certaines arêtes dépendent les unes des autres, ce qui entraîne un cycle orienté. En présence d'un cycle orienté, il est essentiel d'effectuer certaines tâches en parallèle; sinon, certaines tâches peuvent attendre pour toujours que d'autres se terminent et un blocage se produira.

Le problème est résolu par le tri topologique: cela signifie que nous pouvons ordonner tous les sommets du graphe de sorte que toute arête mène du sommet avec le nombre inférieur au sommet avec le nombre supérieur. Ainsi, si la tâche A doit se terminer avant la tâche B et la tâche B - avant la tâche C, qui, à son tour, doit se terminer avant la tâche A, un lien circulaire apparaît et le système vous informe de cette erreur, lançant une exception. Si un cycle orienté apparaît dans la gestion des commandes, alors il n'y a pas de solution. Une vérification de ce type est appelée "recherche d'un cycle dans un graphe orienté". Si un graphe orienté satisfait aux règles décrites, alors c'est un graphe acyclique orienté qui convient parfaitement pour exécuter plusieurs tâches en parallèle, entre lesquelles il existe des dépendances.

La version complète du Listing 2, contenant le code de validation DAG, se trouve dans le code source en ligne.

Dans la liste suivante, la classe F # MailboxProccessor est utilisée comme candidat idéal pour implémenter un DAG qui permet d'effectuer des opérations liées aux dépendances en parallèle. Définissons d'abord une union étiquetée avec laquelle nous gérerons les tâches et remplirons leurs dépendances.

Listing 1 Type de message et structure de données pour coordonner les tâches en fonction de leurs dépendances

 type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option } 


#A envoie le dagAgent de base à ParallelTasksDAG , qui est responsable de la coordination des tâches

#B Enveloppe les détails de chaque tâche pour terminer

Le type TaskMessage représente les enveloppes de message envoyées à l'agent de base du type ParallelTasksDAG . Ces messages sont utilisés pour coordonner les tâches et synchroniser les dépendances. Le type TaskInfo contient et suit les détails des tâches enregistrées lors de l'exécution du DAG, y compris les fronts de dépendance. Le contexte d'exécution (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) est capturé dans le but d'accéder à des informations en exécution différée, par exemple, ces informations: en cours utilisateur, tout état associé au fil d'exécution logique, informations sur l'accès sécurisé au code, etc. Une fois l'événement déclenché, les heures de début et de fin sont publiées.

Liste de 2 agents DAG sur F # pour la parallélisation des opérations liées aux dépendances

 type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O 


#A Instance de la classe onTaskCompletedEvent , utilisée pour notifier l'achèvement d'une tâche

#B État interne de l'agent pour le suivi des registres de tâches et de leurs dépendances. Les collections sont modifiables car l'état change pendant l'exécution de ParallelTasksDAG , et puisqu'elles ont hérité de la sécurité des threads car elles se trouvent dans l'agent

#C En attente d'exécution asynchrone

#D Shell d'un message qui lance ParallelTasksDAG

#E Collection mappée sur un index incrémenté monotone avec une tâche à exécuter

#F Le processus parcourt la liste des tâches, analyse les dépendances avec d'autres tâches pour créer une structure topologique dans laquelle l'ordre des tâches est présenté

#G Encapsuleur de messages pour mettre en file d'attente une tâche, l'exécuter et, finalement, pour supprimer cette tâche de l'état d'agent en tant que dépendance active une fois la tâche terminée

#H Si l' ExecutionContext ramassé est null , exécutez la tâche dans le contexte actuel, sinon passez à #I

#I Exécute la tâche dans le ExecutionContext intercepté

onTaskCompleted et publiez l'événement onTaskCompleted pour vous informer de la fin de la tâche. L'événement contient des informations sur la tâche.

#M Encapsuleur de message pour ajouter une tâche à exécuter en fonction de ses dépendances, le cas échéant

#N Lance l'exécution des tâches enregistrées

#O Ajout d'une tâche, de ses dépendances et du contexte d' ExecutionContext actuel pour exécuter le DAG.

Le but de la fonction AddTask est d'enregistrer une tâche avec des bords de dépendance arbitraires. Cette fonction accepte un ID unique, une tâche à effectuer et de nombreux bords représentant les ID de toutes les autres tâches enregistrées qui doivent être terminées avant que cette tâche puisse être terminée. Si le tableau est vide, cela signifie qu'il n'y a pas de dépendances. Une instance de MailboxProcessor appelée dagAgent stocke les tâches enregistrées dans l'état actuel de «tâches», qui est un dictionnaire ( tasks : Dictionary<int, TaskInfo> ) qui corrèle l'ID de chaque tâche et ses détails. De plus, l'agent stocke également l'état des dépendances de périphérie par l'ID de chaque tâche ( edges : Dictionary<int, int list> ). Lorsqu'un agent reçoit une notification sur la nécessité de démarrer l'exécution, dans le cadre de ce processus, il est vérifié que toutes les dépendances des bords sont enregistrées et qu'il n'y a pas de cycles dans le graphique. Cette étape de vérification est disponible dans l'implémentation complète de ParallelTasksDAG fournie dans le code en ligne. Ensuite, je propose un exemple en C #, où je fais référence à la bibliothèque F # pour exécuter ParallelTasksDAG (et la consommer). Les tâches enregistrées reflètent les dépendances présentées ci-dessus dans la Fig. 1.

 Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks(); 

Le but de la fonction auxiliaire est d'afficher un message indiquant que la tâche a commencé, se référant à l' Id thread actuel pour confirmer le multithreading. OnTaskCompleted événement OnTaskCompleted , d'autre part, est enregistré pour donner une notification sur l'achèvement de chaque tâche avec la sortie de l' ID tâche et de l' Id thread actuel vers la console. Voici la sortie que nous obtenons lors de l'appel de la méthode ExecuteTasks .

 Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30 

Comme vous pouvez le voir, les tâches sont exécutées en parallèle dans différents threads (les ID threads sont différents pour eux) et l'ordre des dépendances est préservé.

C'est essentiellement ainsi que les tâches avec dépendances sont parallélisées. En savoir plus dans le livre Concurrency in .NET.

Source: https://habr.com/ru/post/fr422571/


All Articles