Olá colegas!
Nesta semana, doamos o livro de Manning
Concurrency in .NET, um ambicioso livro de complexidade:

O autor publicou gentilmente no site Medium um trecho do capítulo 13, que propomos avaliar muito antes da estreia.
Boa leitura!
Suponha que você precise escrever uma ferramenta que permita executar várias tarefas assíncronas, cada uma com seu próprio conjunto de dependências que afetam a ordem das operações. Esses problemas podem ser resolvidos com uma execução consistente e imperativa, mas se você deseja obter o desempenho máximo, as operações seqüenciais não funcionarão para você. Em vez disso, você precisa organizar tarefas paralelas. Muitos problemas competitivos podem ser interpretados como coleções estáticas de operações atômicas com dependências entre os dados de entrada e saída. Após a conclusão da operação, sua saída é usada como entrada para outras operações dependentes. Para otimizar o desempenho, as tarefas devem ser atribuídas com base em suas dependências e o algoritmo deve ser configurado para que as tarefas dependentes sejam executadas o mais seqüencialmente possível e o mais paralelo possível.
Você deseja criar um componente reutilizável que execute uma série de tarefas em paralelo e garantir que todas as dependências que possam afetar a ordem das operações sejam levadas em consideração. Como construir um modelo de programação que garanta o paralelismo básico de uma coleção de operações executadas com eficiência, paralelamente ou sequencialmente, dependendo de quais dependências surgem entre essa operação e outras?
Solução: Implementamos o gráfico de dependência usando a classe MailboxProcessor do F # e fornecemos métodos como tarefas padrão para que possam ser consumidos no C #Essa solução é chamada de Gráfico Acíclico Orientado (DAG) e foi projetada para formar um gráfico dividindo operações em sequências de problemas atômicos com dependências bem definidas. Nesse caso, a essência acíclica do gráfico é importante, pois elimina a possibilidade de conflitos entre tarefas, desde que as tarefas sejam realmente completamente atômicas. Ao definir um gráfico, é importante entender todas as dependências entre tarefas, especialmente dependências implícitas que podem levar a conflitos ou condições de corrida. A seguir, é apresentado um exemplo típico de uma estrutura de dados do tipo gráfico, com a qual você pode imaginar as limitações que surgem ao planejar interações entre operações em um determinado gráfico.
Um gráfico é uma estrutura de dados extremamente poderosa e é possível escrever algoritmos poderosos com base em sua base.
Fig. 1 Um gráfico é uma coleção de vértices conectados por arestas. Nesta representação do gráfico direcionado, o nó 1 depende dos nós 4 e 5, o nó 2 depende do nó 5, o nó 3 depende dos nós 5 e 6, etc.A estrutura do DAG é aplicável como uma estratégia para execução paralela de tarefas, levando em consideração a ordem das dependências, o que melhora a produtividade. A estrutura desse gráfico pode ser determinada usando a classe
MailboxProcessor
de F #; nessa classe, o estado interno das tarefas registradas para execução na forma de dependências de arestas é preservado.
Validação de gráfico acíclico orientadoAo trabalhar com qualquer estrutura de dados gráficos, como o DAG, é necessário cuidar do registro correto das arestas. Por exemplo, voltando à Figura 1: o que acontece se tivermos registrado o nó 2 com dependências no nó 5, mas o nó 5 não existir? Também pode acontecer que algumas arestas dependam uma da outra, o que resulta em um ciclo orientado. Na presença de um ciclo orientado, é fundamental executar algumas tarefas em paralelo; caso contrário, algumas tarefas poderão esperar para sempre que outras pessoas sejam concluídas e ocorrerá um conflito.
O problema é resolvido pela classificação topológica: isso significa que podemos ordenar todos os vértices do gráfico para que qualquer aresta saia do vértice com o número mais baixo para o vértice com o número mais alto. Portanto, se a tarefa A for concluída antes da tarefa B e a tarefa B - antes da tarefa C, que, por sua vez, deve ser concluída antes da tarefa A, um link circular será exibido e o sistema notificará você sobre esse erro, lançando uma exceção. Se um ciclo orientado surgir no gerenciamento de pedidos, não haverá solução. Uma verificação desse tipo é chamada "encontrando um ciclo em um gráfico direcionado". Se um gráfico direcionado satisfaz as regras descritas, é um gráfico acíclico direcionado que é perfeitamente adequado para executar várias tarefas em paralelo, entre as quais existem dependências.
A versão completa da Listagem 2, contendo o código de validação do DAG, está no código-fonte online.
Na listagem a seguir, a classe F #
MailboxProccessor
é usada como candidato ideal para implementar um DAG que permite que operações relacionadas à dependência sejam executadas em paralelo. Primeiro, vamos definir uma união rotulada com a qual gerenciaremos tarefas e cumpriremos suas dependências.
Listagem 1 Tipo de mensagem e estrutura de dados para coordenar tarefas de acordo com suas dependências
type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option }
#A envia o dagAgent base para o
ParallelTasksDAG
, responsável pela coordenação das tarefas
#B Agrupa os detalhes de cada tarefa para concluir
O tipo
TaskMessage
representa os wrappers de mensagens enviados para o agente base do tipo
ParallelTasksDAG
. Essas mensagens são usadas para coordenar tarefas e sincronizar dependências. O tipo
TaskInfo
contém e
TaskInfo
os detalhes de tarefas registradas durante a execução do DAG, incluindo bordas de dependência. O contexto de execução (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) é capturado com a finalidade de acessar informações em execução atrasada, por exemplo, tais informações: current usuário, qualquer estado associado ao encadeamento lógico de execução, informações sobre acesso seguro ao código etc. Depois que o evento é disparado, os horários de início e término são publicados.
Listagem 2 Agente DAG em F # para paralelizar operações relacionadas à dependência
type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O
onTaskCompletedEvent
instância da classe
onTaskCompletedEvent
, usada para notificar a conclusão de uma tarefa
#B Estado interno do agente para rastrear registros de tarefas e suas dependências. As coleções são mutáveis porque o estado muda durante a execução do
ParallelTasksDAG
e porque herdaram a segurança do encadeamento porque estão localizadas no Agent
#C Aguardando execução de forma assíncrona
#D Shell de uma mensagem que inicia o
ParallelTasksDAG
Coleção #E mapeada em um índice incrementado monotonicamente com uma tarefa a ser executada
#F O processo repete a lista de tarefas, analisando dependências com outras tarefas para criar uma estrutura topológica na qual a ordem das tarefas é apresentada
#G Wrapper de mensagem para enfileirar uma tarefa, executá-la e, finalmente, para remover esta tarefa do estado do agente como uma dependência ativa após a conclusão da tarefa
#H Se o
ExecutionContext
escolhido for
null
, execute a tarefa no contexto atual; caso contrário, vá para #I
# Executo a tarefa no
ExecutionContext
interceptado
onTaskCompleted
e publique o evento
onTaskCompleted
para notificá-lo da conclusão da tarefa. O evento contém informações sobre a tarefa.
#M Wrapper de mensagem para adicionar uma tarefa a ser executada de acordo com suas dependências, se houver
#N Inicia a execução de tarefas registradas
#O Adicionando uma tarefa, suas dependências e o
ExecutionContext
atual para executar o DAG.
O objetivo da função
AddTask
é registrar uma tarefa com arestas de dependência arbitrárias. Essa função aceita um ID exclusivo, uma tarefa a ser executada e muitas arestas representando os IDs de todas as outras tarefas registradas que devem ser concluídas antes que esta tarefa possa ser concluída. Se a matriz estiver vazia, significa que não há dependências. Uma instância do
MailboxProcessor
chamada
dagAgent
armazena tarefas registradas no estado atual de “tarefas”, que é um dicionário (
tasks : Dictionary<int, TaskInfo>
) que correlaciona o ID de cada tarefa e seus detalhes. Além disso, o Agent também armazena o estado das dependências de borda pelo ID de cada tarefa (
edges : Dictionary<int, int list>
). Quando um agente recebe uma notificação sobre a necessidade de iniciar a execução, como parte desse processo, é verificado se todas as dependências das arestas estão registradas e se não há ciclos no gráfico. Esta etapa de verificação está disponível na implementação completa do ParallelTasksDAG fornecida no código online. A seguir, ofereço um exemplo em C #, onde me refiro à biblioteca que F # executa o
ParallelTasksDAG
(e o consome). As tarefas registradas refletem as dependências apresentadas acima na Fig. 1
Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks();
O objetivo da função auxiliar é exibir uma mensagem que a tarefa foi iniciada, referindo-se ao
Id
encadeamento atual para confirmar o multithreading.
OnTaskCompleted
evento
OnTaskCompleted
, por outro lado, é registrado para fornecer uma notificação sobre a conclusão de cada tarefa com a saída do
ID
da tarefa e do
Id
encadeamento atual no console. Aqui está a saída que obtemos ao chamar o método
ExecuteTasks
.
Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30
Como você pode ver, as tarefas são executadas em paralelo em diferentes segmentos (os
ID
segmentos são diferentes para eles) e a ordem das dependências é preservada.
Em essência, é assim que as tarefas com dependências são paralelizadas. Leia mais no livro Concurrency in .NET.