Hallo Kollegen!
Diese Woche haben wir Mannings Buch
Concurrency in .NET gespendet
, ein ehrgeiziges Buch der Komplexität:

Der Autor hat freundlicherweise auf der Medium-Website einen Auszug aus dem 13. Kapitel veröffentlicht, den wir lange vor der Premiere auswerten möchten.
Viel Spaß beim Lesen!
Angenommen, Sie müssen ein Tool schreiben, mit dem Sie eine Reihe von asynchronen Aufgaben ausführen können, von denen jede ihre eigenen Abhängigkeiten hat, die sich auf die Reihenfolge der Operationen auswirken. Solche Probleme können mit konsistenter und zwingender Ausführung gelöst werden. Wenn Sie jedoch maximale Leistung erzielen möchten, funktionieren sequentielle Vorgänge für Sie nicht. Stattdessen müssen Sie die parallele Ausführung von Aufgaben organisieren. Viele Wettbewerbsprobleme können als statische Sammlungen atomarer Operationen mit Abhängigkeiten zwischen ihren Eingabe- und Ausgabedaten interpretiert werden. Nach Abschluss der Operation wird ihre Ausgabe als Eingabe für andere abhängige Operationen verwendet. Um die Leistung zu optimieren, müssen Aufgaben basierend auf ihren Abhängigkeiten zugewiesen und der Algorithmus so konfiguriert werden, dass abhängige Aufgaben so sequentiell wie nötig und so parallel wie möglich ausgeführt werden.
Sie möchten eine wiederverwendbare Komponente erstellen, die eine Reihe von Aufgaben parallel ausführt, und sicherstellen, dass alle Abhängigkeiten berücksichtigt werden, die sich auf die Reihenfolge der Vorgänge auswirken können. Wie kann ein solches Programmiermodell erstellt werden, das die grundlegende Parallelität einer Sammlung von Operationen sicherstellt, die entweder parallel oder nacheinander effizient ausgeführt werden, je nachdem, welche Abhängigkeiten zwischen dieser Operation und anderen auftreten?
Lösung: Wir implementieren das Abhängigkeitsdiagramm mithilfe der MailboxProcessor-Klasse aus F # und stellen Methoden als Standardaufgaben bereit, damit sie aus C # verwendet werden könnenDiese Lösung wird als Oriented Acyclic Graph (DAG) bezeichnet und dient zur Bildung eines Graphen durch Aufteilen von Operationen in Sequenzen atomarer Probleme mit genau definierten Abhängigkeiten. In diesem Fall ist die azyklische Essenz des Graphen wichtig, da dadurch die Möglichkeit von Deadlocks zwischen Aufgaben ausgeschlossen wird, vorausgesetzt, die Aufgaben sind tatsächlich vollständig atomar. Beim Festlegen eines Diagramms ist es wichtig, alle Abhängigkeiten zwischen Aufgaben zu verstehen, insbesondere implizite Abhängigkeiten, die zu Deadlocks oder Race-Bedingungen führen können. Das Folgende ist ein typisches Beispiel für eine grafische Datenstruktur, mit der Sie sich die Einschränkungen vorstellen können, die bei der Planung von Interaktionen zwischen Operationen in einem bestimmten Diagramm auftreten.
Ein Graph ist eine extrem leistungsfähige Datenstruktur, auf deren Basis leistungsfähige Algorithmen geschrieben werden können.
Abb. 1 Ein Diagramm ist eine Sammlung von Scheitelpunkten, die durch Kanten verbunden sind. In dieser Darstellung des gerichteten Graphen hängt Knoten 1 von Knoten 4 und 5 ab, Knoten 2 von Knoten 5, Knoten 3 von Knoten 5 und 6 usw.Die DAG-Struktur ist als Strategie zur parallelen Ausführung von Aufgaben unter Berücksichtigung der Reihenfolge der Abhängigkeiten anwendbar, wodurch die Produktivität verbessert wird. Die Struktur eines solchen Graphen kann mit der
MailboxProcessor
Klasse aus F # bestimmt werden. In dieser Klasse bleibt der interne Status für Aufgaben erhalten, die zur Ausführung in Form von Kantenabhängigkeiten registriert sind.
Orientierte azyklische GraphvalidierungWenn Sie mit einer Diagrammdatenstruktur wie DAG arbeiten, müssen Sie auf die korrekte Registrierung der Kanten achten. Zurück zu Abbildung 1: Was passiert, wenn wir Knoten 2 mit Abhängigkeiten von Knoten 5 registriert haben, Knoten 5 jedoch nicht existiert? Es kann auch vorkommen, dass einige Kanten voneinander abhängen, was zu einem orientierten Zyklus führt. Bei einem orientierten Zyklus ist es wichtig, einige Aufgaben parallel auszuführen. Andernfalls warten einige Aufgaben möglicherweise ewig, bis andere abgeschlossen sind, und es kommt zu einem Deadlock.
Das Problem wird durch topologische Sortierung gelöst: Dies bedeutet, dass wir alle Scheitelpunkte des Graphen so anordnen können, dass jede Kante vom Scheitelpunkt mit der niedrigeren Zahl zum Scheitelpunkt mit der höheren Zahl führt. Wenn also Aufgabe A vor Aufgabe B und Aufgabe B - vor Aufgabe C beendet werden soll, die wiederum vor Aufgabe A abgeschlossen werden soll, wird eine Zirkelverknüpfung angezeigt, und das System benachrichtigt Sie über diesen Fehler und löst eine Ausnahme aus. Wenn sich im Auftragsmanagement ein orientierter Zyklus ergibt, gibt es keine Lösung. Eine Überprüfung dieser Art wird als "Finden eines Zyklus in einem gerichteten Graphen" bezeichnet. Wenn ein gerichteter Graph die beschriebenen Regeln erfüllt, ist er ein gerichteter azyklischer Graph, der perfekt geeignet ist, um mehrere Aufgaben parallel auszuführen, zwischen denen Abhängigkeiten bestehen.
Die Vollversion von Listing 2, die den DAG-Validierungscode enthält, befindet sich im Online-Quellcode.
In der folgenden Auflistung wird die F #
MailboxProccessor
Klasse als idealer Kandidat für die Implementierung einer DAG verwendet, mit der abhängigkeitsbezogene Operationen parallel ausgeführt werden können. Definieren wir zunächst eine beschriftete Union, mit der wir Aufgaben verwalten und ihre Abhängigkeiten erfüllen.
Listing 1 Nachrichtentyp und Datenstruktur zur Koordinierung von Aufgaben nach ihren Abhängigkeiten
type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option }
#A sendet den Basis-dagAgent an
ParallelTasksDAG
, das für die Koordination der Aufgaben verantwortlich ist
#B Fasst die Details jeder zu erledigenden Aufgabe zusammen
Der
TaskMessage
Typ repräsentiert Nachrichten-Wrapper, die an den
TaskMessage
Typs
ParallelTasksDAG
gesendet werden. Diese Nachrichten werden verwendet, um Aufgaben zu koordinieren und Abhängigkeiten zu synchronisieren. Der
TaskInfo
Typ enthält und verfolgt die Details registrierter Aufgaben während der Ausführung der DAG, einschließlich der Abhängigkeitskanten. Der Ausführungskontext (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) wird zum Zweck des Zugriffs auf Informationen bei verzögerter Ausführung erfasst, z. B. solche Informationen: aktuell Benutzer, jeder Status, der mit dem logischen Ausführungsthread verknüpft ist, Informationen zum sicheren Zugriff auf Code usw. Nach dem Auslösen des Ereignisses werden die Start- und Endzeiten veröffentlicht.
Listing 2: DAG-Agent auf F # zum Parallelisieren von abhängigkeitsbezogenen Operationen
type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O
#Eine Instanz der Klasse
onTaskCompletedEvent
, mit der der Abschluss einer Aufgabe benachrichtigt wird
#B Interner Status des Agenten zum Verfolgen von Aufgabenregistern und deren Abhängigkeiten. Sammlungen sind veränderbar, da sich der Status während der Ausführung von
ParallelTasksDAG
ändert und weil sie die Thread-Sicherheit geerbt haben, weil sie sich in Agent befinden
#C Asynchron auf Ausführung warten
#D Shell einer Nachricht, die
ParallelTasksDAG
startet
#E Collection wird einem monoton inkrementierten Index mit einer auszuführenden Aufgabe zugeordnet
#F Der Prozess durchläuft die Liste der Aufgaben und analysiert Abhängigkeiten mit anderen Aufgaben, um eine topologische Struktur zu erstellen, in der die Reihenfolge der Aufgaben dargestellt wird
#G Message Wrapper zum Einreihen, Ausführen und letztendlich zum Entfernen dieser Aufgabe aus dem Agentenstatus als aktive Abhängigkeit nach Abschluss der Aufgabe
#H Wenn der aufgenommene
ExecutionContext
null
, führen Sie die Aufgabe im aktuellen Kontext aus, andernfalls gehen Sie zu #I
#Ich führe die Aufgabe im abgefangenen
ExecutionContext
onTaskCompleted
und veröffentlichen Sie das Ereignis
onTaskCompleted
, um Sie über den Abschluss der Aufgabe zu informieren. Das Ereignis enthält Informationen zur Aufgabe.
# M Message Wrapper zum Hinzufügen einer Aufgabe, die gegebenenfalls gemäß ihren Abhängigkeiten ausgeführt werden soll
#N Startet die Ausführung registrierter Aufgaben
#O Hinzufügen einer Aufgabe, ihrer Abhängigkeiten und des aktuellen
ExecutionContext
zur Ausführung der DAG.
Der Zweck der
AddTask
Funktion besteht darin, eine Aufgabe mit beliebigen Abhängigkeitskanten zu registrieren. Diese Funktion akzeptiert eine eindeutige ID, eine auszuführende Aufgabe und viele Kanten, die die IDs aller anderen registrierten Aufgaben darstellen, die ausgeführt werden müssen, bevor diese Aufgabe ausgeführt werden kann. Wenn das Array leer ist, bedeutet dies, dass keine Abhängigkeiten bestehen. Eine
MailboxProcessor
Instanz namens
dagAgent
speichert registrierte Aufgaben im aktuellen Status von "Aufgaben".
dagAgent
handelt es sich um ein Wörterbuch (
tasks : Dictionary<int, TaskInfo>
), das die ID jeder Aufgabe und ihre Details korreliert. Darüber hinaus speichert der Agent den Status der Kantenabhängigkeiten anhand der ID jeder Aufgabe (
edges : Dictionary<int, int list>
). Wenn ein Agent im Rahmen dieses Prozesses eine Benachrichtigung über die Notwendigkeit erhält, die Ausführung zu starten, wird überprüft, ob alle Abhängigkeiten der Kanten registriert sind und ob das Diagramm keine Zyklen enthält. Dieser Überprüfungsschritt ist in der vollständigen Implementierung von ParallelTasksDAG verfügbar, die im Online-Code bereitgestellt wird. Als nächstes biete ich ein Beispiel in C # an, in dem ich auf die Bibliothek verweise, in der F #
ParallelTasksDAG
(und verwendet). Registrierte Aufgaben spiegeln die oben in Abb. 1 dargestellten Abhängigkeiten wider. 1.
Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks();
Der Zweck der Hilfsfunktion besteht darin, eine Meldung anzuzeigen, dass die Aufgabe begonnen hat, und auf die
Id
aktuellen Threads zu verweisen, um das Multithreading zu bestätigen.
OnTaskCompleted
Ereignis wird dagegen protokolliert, um eine Benachrichtigung über den Abschluss jeder Aufgabe mit der Ausgabe der Aufgaben-
ID
und der
ID
Id
aktuellen Threads an die Konsole zu geben. Hier ist die Ausgabe, die wir beim Aufrufen der
ExecuteTasks
Methode erhalten.
Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30
Wie Sie sehen können, werden Aufgaben in verschiedenen Threads parallel ausgeführt (die Thread-
ID
sind für sie unterschiedlich), und die Reihenfolge der Abhängigkeiten bleibt erhalten.
Im Wesentlichen werden auf diese Weise Aufgaben mit Abhängigkeiten parallelisiert. Weitere Informationen finden Sie im Buch Parallelität in .NET.