Paralelización de tareas con dependencias: ejemplo de .NET

Hola colegas

Esta semana, donamos el libro Concurrencia de Manning en .NET, un ambicioso libro de complejidad:



El autor publicó amablemente en el sitio web de Medium un extracto del capítulo 13, que proponemos evaluar mucho antes del estreno.
Que tengas una buena lectura!

Suponga que necesita escribir una herramienta que le permita realizar una serie de tareas asincrónicas, cada una de las cuales tiene su propio conjunto de dependencias que afectan el orden de las operaciones. Tales problemas se pueden resolver con una ejecución coherente e imperativa, pero si desea lograr el máximo rendimiento, las operaciones secuenciales no funcionarán para usted. En cambio, debe organizar la ejecución paralela de tareas. Muchos problemas competitivos pueden interpretarse como colecciones estáticas de operaciones atómicas con dependencias entre sus datos de entrada y salida. Al finalizar la operación, su salida se utiliza como entrada para otras operaciones dependientes. Para optimizar el rendimiento, las tareas deben asignarse en función de sus dependencias, y el algoritmo debe configurarse de modo que las tareas dependientes se realicen de la forma más secuencial necesaria y lo más paralela posible.

Desea crear un componente reutilizable que realice una serie de tareas en paralelo y asegurarse de que se tengan en cuenta todas las dependencias que puedan afectar el orden de las operaciones. ¿Cómo construir un modelo de programación que garantice el paralelismo básico de una colección de operaciones realizadas de manera eficiente, en paralelo o secuencialmente, dependiendo de las dependencias que surjan entre esta operación y otras?

Solución: implementamos el gráfico de dependencia utilizando la clase MailboxProcessor de F # y proporcionamos métodos como tareas estándar para que puedan ser consumidos desde C #

Esta solución se llama Gráfico Acíclico Orientado (DAG) y está diseñada para formar un gráfico dividiendo las operaciones en secuencias de problemas atómicos con dependencias bien definidas. En este caso, la esencia acíclica del gráfico es importante, ya que elimina la posibilidad de puntos muertos entre tareas, siempre que las tareas sean realmente completamente atómicas. Al establecer un gráfico, es importante comprender todas las dependencias entre tareas, especialmente las dependencias implícitas que pueden conducir a puntos muertos o condiciones de carrera. El siguiente es un ejemplo típico de una estructura de datos similar a un gráfico, con el que puede imaginar las limitaciones que surgen al planificar interacciones entre operaciones en un gráfico dado.

Un gráfico es una estructura de datos extremadamente poderosa, y uno puede escribir algoritmos poderosos sobre su base.



Fig. 1 Un gráfico es una colección de vértices conectados por aristas. En esta representación del gráfico dirigido, el nodo 1 depende de los nodos 4 y 5, el nodo 2 depende del nodo 5, el nodo 3 depende de los nodos 5 y 6, etc.

La estructura DAG es aplicable como estrategia para la ejecución paralela de tareas teniendo en cuenta el orden de las dependencias, lo que mejora la productividad. La estructura de dicho gráfico se puede determinar utilizando la clase MailboxProcessor de F #; en esta clase, se conserva el estado interno de las tareas registradas para su ejecución en forma de dependencias de bordes.

Validación de gráfico acíclico orientado

Al trabajar con cualquier estructura de datos de gráficos, como DAG, debe cuidar el registro correcto de los bordes. Por ejemplo, volviendo a la Figura 1: ¿qué sucede si hemos registrado el nodo 2 con dependencias del nodo 5, pero el nodo 5 no existe? También puede suceder que algunos bordes dependan unos de otros, lo que resulta en un ciclo orientado. En presencia de un ciclo orientado, es crítico realizar algunas tareas en paralelo; de lo contrario, algunas tareas pueden esperar para siempre que otras se completen y se producirá un punto muerto.

El problema se resuelve mediante la ordenación topológica: esto significa que podemos ordenar todos los vértices del gráfico para que cualquier borde conduzca desde el vértice con el número más bajo al vértice con el número más alto. Entonces, si la tarea A debe finalizar antes de la tarea B, y la tarea B - antes de la tarea C, que, a su vez, debe completarse antes de la tarea A, aparece un enlace circular, y el sistema le notificará este error, lanzando una excepción. Si surge un ciclo orientado en la gestión de pedidos, entonces no hay solución. Una comprobación de este tipo se llama "encontrar un ciclo en un gráfico dirigido". Si un gráfico dirigido satisface las reglas descritas, entonces es un gráfico acíclico dirigido que es perfectamente adecuado para ejecutar varias tareas en paralelo, entre las cuales hay dependencias.

La versión completa del Listado 2, que contiene el código de validación DAG, está en el código fuente en línea.

En la siguiente lista, la clase F # MailboxProccessor se utiliza como un candidato ideal para implementar un DAG que permita que las operaciones relacionadas con la dependencia se realicen en paralelo. Primero, definamos una unión etiquetada con la que manejaremos las tareas y cumpliremos sus dependencias.

Listado 1 Tipo de mensaje y estructura de datos para coordinar tareas de acuerdo con sus dependencias

 type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option } 


#A envía el dagAgent base a ParallelTasksDAG , que es responsable de coordinar las tareas

#B Envuelve los detalles de cada tarea para completar

El tipo TaskMessage representa contenedores de mensajes enviados al agente base del tipo ParallelTasksDAG . Estos mensajes se utilizan para coordinar tareas y sincronizar dependencias. El tipo TaskInfo contiene y rastrea los detalles de las tareas registradas durante la ejecución del DAG, incluidos los bordes de dependencia. El contexto de ejecución (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) se captura con el fin de acceder a la información en ejecución retrasada, por ejemplo, dicha información: actual usuario, cualquier estado asociado con el hilo lógico de ejecución, información sobre el acceso seguro al código, etc. Una vez que se activa el evento, se publican las horas de inicio y finalización.

Listado del agente DAG 2 en F # para paralelizar operaciones relacionadas con dependencias

 type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O 


# Una instancia de la clase onTaskCompletedEvent , utilizada para notificar la finalización de una tarea

#B Estado interno del agente para el seguimiento de registros de tareas y sus dependencias. Las colecciones son mutables porque el estado cambia durante la ejecución de ParallelTasksDAG , y dado que heredan la seguridad de subprocesos porque se encuentran en el Agente

#C Asincrónicamente en espera de ejecución

#D Shell de un mensaje que inicia ParallelTasksDAG

#E Colección asignada a un índice incrementalmente monotónico con una tarea para ejecutar

#F El proceso itera sobre la lista de tareas, analizando dependencias con otras tareas para crear una estructura topológica en la que se presenta el orden de las tareas

#G Contenedor de mensajes para poner en cola una tarea, ejecutarla y, en última instancia, para eliminar esta tarea del estado del agente como una dependencia activa después de que la tarea se complete

#H Si el ExecutionContext recogido es null , ejecute la tarea en el contexto actual; de lo contrario, vaya a #I

# Ejecuto la tarea en el ExecutionContext interceptado

onTaskCompleted y publique el evento onTaskCompleted para notificarle sobre la finalización de la tarea. El evento contiene información sobre la tarea.

#M Contenedor de mensajes para agregar una tarea a ejecutar de acuerdo con sus dependencias, si corresponde

#N Inicia la ejecución de tareas registradas

#O Agregar una tarea, sus dependencias y el ExecutionContext actual para ejecutar el DAG.

El propósito de la función AddTask es registrar una tarea con bordes de dependencia arbitrarios. Esta función acepta una ID única, una tarea a realizar y muchos bordes que representan las ID de todas las demás tareas registradas que deben completarse antes de que esta tarea pueda completarse. Si la matriz está vacía, significa que no hay dependencias. Una instancia de MailboxProcessor llamada dagAgent almacena las tareas registradas en el estado actual de "tareas", que es un diccionario ( tasks : Dictionary<int, TaskInfo> ) que correlaciona el ID de cada tarea y sus detalles. Además, el Agente también almacena el estado de las dependencias de borde por la ID de cada tarea ( edges : Dictionary<int, int list> ). Cuando un agente recibe una notificación sobre la necesidad de iniciar la ejecución, como parte de este proceso, se verifica que todas las dependencias de los bordes estén registradas y que no haya ciclos en el gráfico. Este paso de verificación está disponible en la implementación completa de ParallelTasksDAG que se proporciona en el código en línea. A continuación, ofrezco un ejemplo en C #, donde me refiero a la biblioteca que F # para ejecutar ParallelTasksDAG (y consumirlo). Las tareas registradas reflejan las dependencias presentadas anteriormente en la Fig. 1)

 Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks(); 

El propósito de la función auxiliar es mostrar un mensaje de que la tarea ha comenzado, refiriéndose al Id hilo actual para confirmar el subproceso múltiple. OnTaskCompleted evento OnTaskCompleted , por otro lado, se registra para dar una notificación sobre la finalización de cada tarea con la salida de la ID de la tarea y la Id hilo actual a la consola. Aquí está el resultado que obtenemos cuando ExecuteTasks método ExecuteTasks .

 Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30 

Como puede ver, las tareas se ejecutan en paralelo en diferentes subprocesos (los ID subproceso son diferentes para ellos) y se preserva el orden de las dependencias.

En esencia, así es como se paralelizan las tareas con dependencias. Lea más en el libro Concurrencia en .NET.

Source: https://habr.com/ru/post/es422571/


All Articles