Menyejajarkan tugas dengan dependensi - .NET contoh

Halo rekan!

Minggu ini, kami menyumbangkan buku Manning, Concurrency di .NET, sebuah buku kompleksitas yang ambisius:



Penulis dengan hormat memposting di situs Medium sebuah kutipan dari bab ke-13, yang kami usulkan untuk dievaluasi jauh sebelum premier.
Selamat membaca!

Misalkan Anda perlu menulis alat yang memungkinkan Anda untuk melakukan sejumlah tugas asinkron, yang masing-masing memiliki serangkaian dependensi yang memengaruhi urutan operasi. Masalah seperti itu dapat diselesaikan dengan eksekusi yang konsisten dan penting, tetapi jika Anda ingin mencapai kinerja maksimum, maka operasi sekuensial tidak akan bekerja untuk Anda. Sebaliknya, Anda perlu mengatur pelaksanaan tugas secara paralel. Banyak masalah kompetitif dapat diartikan sebagai koleksi statis operasi atom dengan ketergantungan antara input dan output data mereka. Setelah operasi selesai, outputnya digunakan sebagai input untuk operasi lain yang bergantung. Untuk mengoptimalkan kinerja, tugas harus ditetapkan berdasarkan dependensinya, dan algoritme harus dikonfigurasikan agar tugas dependen dilakukan secara berurutan sesuai kebutuhan, dan sejajar mungkin.

Anda ingin membuat komponen yang dapat digunakan kembali yang melakukan serangkaian tugas secara paralel, dan memastikan bahwa semua dependensi yang dapat mempengaruhi urutan operasi diperhitungkan. Bagaimana membangun model pemrograman seperti itu yang akan memastikan paralelisme dasar dari kumpulan operasi yang dilakukan secara efisien, baik secara paralel maupun berurutan, tergantung pada ketergantungan apa yang muncul antara operasi ini dan yang lainnya?

Solusi: Kami menerapkan grafik dependensi menggunakan kelas MailboxProcessor dari F # dan menyediakan metode sebagai tugas standar sehingga mereka dapat dikonsumsi dari C #

Solusi ini disebut Oriented Acyclic Graph (DAG) dan dirancang untuk membentuk grafik dengan memisahkan operasi menjadi urutan masalah atom dengan dependensi yang terdefinisi dengan baik. Dalam hal ini, esensi asiklik dari grafik adalah penting, karena menghilangkan kemungkinan deadlock di antara tugas-tugas, asalkan tugas-tugas tersebut sebenarnya sepenuhnya atom. Saat mengatur grafik, penting untuk memahami semua dependensi antara tugas, terutama dependensi implisit yang dapat menyebabkan kebuntuan atau kondisi balapan. Berikut ini adalah contoh khas dari struktur data seperti grafik, yang dengannya Anda dapat membayangkan batasan yang muncul saat merencanakan interaksi antara operasi dalam grafik yang diberikan.

Grafik adalah struktur data yang sangat kuat, dan seseorang dapat menulis algoritma yang kuat atas dasar itu.



Fig. 1 Grafik adalah kumpulan simpul yang dihubungkan oleh tepian. Dalam representasi grafik yang diarahkan ini, simpul 1 bergantung pada simpul 4 dan 5, simpul 2 tergantung pada simpul 5, simpul 3 tergantung pada simpul 5 dan 6, dll.

Struktur DAG berlaku sebagai strategi untuk pelaksanaan tugas paralel dengan mempertimbangkan urutan ketergantungan, yang meningkatkan produktivitas. Struktur grafik seperti itu dapat ditentukan menggunakan kelas MailboxProcessor dari F #; di kelas ini, keadaan internal untuk tugas-tugas yang terdaftar untuk dieksekusi dalam bentuk dependensi edge dipertahankan.

Validasi Grafik Asiklik Berorientasi

Saat bekerja dengan struktur data grafik apa pun, seperti DAG, Anda harus mengurus pendaftaran tepi yang benar. Sebagai contoh, kembali ke Gambar 1: apa yang terjadi jika kita telah mendaftarkan simpul 2 dengan dependensi pada simpul 5, tetapi simpul 5 tidak ada? Mungkin juga terjadi bahwa beberapa sisi saling bergantung, yang menghasilkan siklus berorientasi. Di hadapan siklus yang berorientasi, sangat penting untuk melakukan beberapa tugas secara paralel; jika tidak, beberapa tugas mungkin selamanya menunggu yang lain selesai dan jalan buntu akan terjadi.

Masalahnya diselesaikan dengan pengurutan topologis: ini berarti bahwa kita dapat memesan semua simpul grafik sehingga setiap tepi mengarah dari titik dengan angka yang lebih rendah ke titik dengan angka yang lebih tinggi. Jadi, jika tugas A harus selesai sebelum tugas B, dan tugas B - sebelum tugas C, yang, pada gilirannya, harus selesai sebelum tugas A, tautan melingkar muncul, dan sistem akan memberi tahu Anda tentang kesalahan ini, dengan melemparkan pengecualian. Jika siklus berorientasi muncul dalam manajemen pesanan, maka tidak ada solusi. Pemeriksaan jenis ini disebut "menemukan siklus dalam grafik yang diarahkan." Jika grafik berarah memenuhi aturan yang dijelaskan, maka itu adalah grafik asiklik terarah yang sangat cocok untuk menjalankan beberapa tugas secara paralel, di antaranya ada dependensi.

Versi lengkap Listing 2, yang berisi kode validasi DAG, ada dalam kode sumber online.

Dalam daftar berikut, kelas F # MailboxProccessor digunakan sebagai kandidat ideal untuk menerapkan DAG yang memungkinkan operasi terkait ketergantungan dilakukan secara paralel. Pertama, mari kita tentukan serikat berlabel yang dengannya kita akan mengelola tugas dan memenuhi ketergantungan mereka.

Daftar 1 Jenis pesan dan struktur data untuk mengoordinasikan tugas sesuai dengan dependensinya

 type TaskMessage = // #A | AddTask of int * TaskInfo | QueueTask of TaskInfo | ExecuteTasks and TaskInfo = // #B { Context : System.Threading.ExecutionContext Edges : int array; Id : int; Task : Func<Task> EdgesLeft : int option; Start : DateTimeOffset option End : DateTimeOffset option } 


#A mengirimkan basis dagAgent ke ParallelTasksDAG , yang bertanggung jawab untuk mengoordinasikan tugas

#B Membungkus detail setiap tugas yang harus diselesaikan

Tipe TaskMessage mewakili pembungkus pesan yang dikirim ke agen dasar dari tipe ParallelTasksDAG . Pesan-pesan ini digunakan untuk mengoordinasikan tugas dan menyinkronkan dependensi. Jenis TaskInfo berisi dan melacak detail tugas yang terdaftar selama pelaksanaan DAG, termasuk tepi ketergantungan. Konteks eksekusi (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) ditangkap untuk tujuan mengakses informasi dalam eksekusi yang tertunda, misalnya, informasi tersebut: saat ini pengguna, keadaan apa pun yang terkait dengan utas eksekusi logis, informasi tentang akses aman ke kode, dll. Setelah acara dipicu, waktu mulai dan berakhir diterbitkan.

Daftar 2 agen DAG pada F # untuk memparalelkan operasi terkait ketergantungan

 type ParallelTasksDAG() = let onTaskCompleted = new Event<TaskInfo>() // #A let dagAgent = new MailboxProcessor<TaskMessage>(fun inbox -> let rec loop (tasks : Dictionary<int, TaskInfo>) // #B (edges : Dictionary<int, int list>) = async { // #B let! msg = inbox.Receive() // #C match msg with | ExecuteTasks -> // #D let fromTo = new Dictionary<int, int list>() let ops = new Dictionary<int, TaskInfo>() // #E for KeyValue(key, value) in tasks do // #F let operation = { value with EdgesLeft = Some(value.Edges.Length) } for from in operation.Edges do let exists, lstDependencies = fromTo.TryGetValue(from) if not <| exists then fromTo.Add(from, [ operation.Id ]) else fromTo.[from] <- (operation.Id :: lstDependencies) ops.Add(key, operation) ops |> Seq.iter (fun kv -> // #F match kv.Value.EdgesLeft with | Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value)) | _ -> ()) return! loop ops fromTo | QueueTask(op) -> // #G Async.Start <| async { // #G let start = DateTimeOffset.Now match op.Context with // #H | null -> op.Task.Invoke() |> Async.AwaitATsk | ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I (fun op -> let opCtx = (op :?> TaskInfo) opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo) let end' = DateTimeOffset.Now onTaskCompleted.Trigger { op with Start = Some(start) End = Some(end') } // #L let exists, deps = edges.TryGetValue(op.Id) if exists && deps.Length > 0 then let depOps = getDependentOperation deps tasks [] edges.Remove(op.Id) |> ignore depOps |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) } return! loop tasks edges | AddTask(id, op) -> tasks.Add(id, op) // #M return! loop tasks edges } loop (new Dictionary<int, TaskInfo>(HashIdentity.Structural)) (new Dictionary<int, int list>(HashIdentity.Structural))) [<CLIEventAttribute>] member this.OnTaskCompleted = onTaskCompleted.Publish // #L member this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #N member this.AddTask(id, task, [<ParamArray>] edges : int array) = let data = { Context = ExecutionContext.Capture() Edges = edges; Id = id; Task = task NumRemainingEdges = None; Start = None; End = None } dagAgent.Post(AddTask(id, data)) // #O 


# onTaskCompletedEvent dari kelas onTaskCompletedEvent , yang digunakan untuk memberi tahu penyelesaian suatu tugas

#B Keadaan internal agen untuk melacak register tugas dan ketergantungannya. Koleksi bisa berubah karena keadaan berubah selama pelaksanaan ParallelTasksDAG , dan karena mereka mewarisi keamanan thread karena mereka berada di Agent

#C Asynchronously menunggu eksekusi

#D Shell pesan yang meluncurkan ParallelTasksDAG

Koleksi #E dipetakan ke indeks yang ditambahkan secara monoton dengan tugas untuk dijalankan

#F Proses ini berulang pada daftar tugas, menganalisis dependensi dengan tugas lain untuk membuat struktur topologi di mana urutan tugas disajikan

Pembungkus pesan #G untuk mengantri tugas, menjalankannya, dan, akhirnya, untuk menghapus tugas ini dari status agen sebagai ketergantungan aktif setelah tugas selesai

#H Jika mengambil ExecutionContext adalah null , maka jalankan tugas dalam konteks saat ini, jika tidak pergi ke #I

# Saya Menjalankan tugas dalam ExecutionContext dicegat

onTaskCompleted dan publikasikan acara onTaskCompleted untuk memberi tahu Anda tentang penyelesaian tugas. Acara tersebut berisi informasi tentang tugas tersebut.

Pembungkus pesan #M untuk menambahkan tugas untuk dieksekusi sesuai dengan dependensinya, jika ada

#N Mulai pelaksanaan tugas terdaftar

#O Menambahkan tugas, ketergantungannya, dan Konteks ExecutionContext saat ini untuk menjalankan DAG.

Tujuan dari fungsi AddTask adalah untuk mendaftarkan tugas dengan tepi ketergantungan yang berubah-ubah. Fungsi ini menerima ID unik, tugas yang harus dilakukan, dan banyak sisi yang mewakili ID dari semua tugas terdaftar lainnya yang harus diselesaikan sebelum tugas ini dapat diselesaikan. Jika array kosong, artinya tidak ada dependensi. Sebuah instance dari MailboxProcessor disebut dagAgent menyimpan tugas yang terdaftar dalam keadaan "tugas" saat ini, yang merupakan kamus ( tasks : Dictionary<int, TaskInfo> ) yang mengkorelasikan ID setiap tugas dan perinciannya. Selain itu, Agen juga menyimpan status dependensi edge oleh ID dari setiap tugas ( edges : Dictionary<int, int list> ). Ketika agen menerima pemberitahuan tentang perlunya memulai eksekusi, sebagai bagian dari proses ini, diperiksa bahwa semua dependensi tepi terdaftar, dan bahwa tidak ada siklus dalam grafik. Langkah verifikasi ini tersedia dalam implementasi penuh ParallelTasksDAG yang disediakan dalam kode online. Selanjutnya, saya menawarkan contoh dalam C #, di mana saya merujuk ke perpustakaan yang F # untuk menjalankan ParallelTasksDAG (dan mengkonsumsinya). Tugas terdaftar mencerminkan dependensi yang disajikan di atas pada Gambar. 1.

 Func<int, int, Func<Task>> action = (id, delay) => async () => { Console.WriteLine($”Starting operation{id} in Thread Id {Thread.CurrentThread.ManagedThreadId}…”); await Task.Delay(delay); }; var dagAsync = new DAG.ParallelTasksDAG(); dagAsync.OnTaskCompleted.Subscribe(op => Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”)); dagAsync.AddTask(1, action(1, 600), 4, 5); dagAsync.AddTask(2, action(2, 200), 5); dagAsync.AddTask(3, action(3, 800), 6, 5); dagAsync.AddTask(4, action(4, 500), 6); dagAsync.AddTask(5, action(5, 450), 7, 8); dagAsync.AddTask(6, action(6, 100), 7); dagAsync.AddTask(7, action(7, 900)); dagAsync.AddTask(8, action(8, 700)); dagAsync.ExecuteTasks(); 

Tujuan dari fungsi tambahan adalah untuk menampilkan pesan bahwa tugas telah dimulai, merujuk pada Id utas saat ini untuk mengonfirmasi multithreading. Acara OnTaskCompleted , di sisi lain, dicatat untuk memberikan pemberitahuan tentang penyelesaian setiap tugas dengan output dari ID tugas dan Id utas saat ini ke konsol. Ini adalah output yang kita dapatkan saat memanggil metode ExecuteTasks .

 Starting operation 8 in Thread Id 23… Starting operation 7 in Thread Id 24… Operation 8 Completed in Thread Id 23 Operation 7 Completed in Thread Id 24 Starting operation 5 in Thread Id 23… Starting operation 6 in Thread Id 25… Operation 6 Completed in Thread Id 25 Starting operation 4 in Thread Id 24… Operation 5 Completed in Thread Id 23 Starting operation 2 in Thread Id 27… Starting operation 3 in Thread Id 30… Operation 4 Completed in Thread Id 24 Starting operation 1 in Thread Id 28… Operation 2 Completed in Thread Id 27 Operation 1 Completed in Thread Id 28 Operation 3 Completed in Thread Id 30 

Seperti yang Anda lihat, tugas dieksekusi secara paralel di utas berbeda ( ID utas berbeda untuk mereka), dan urutan dependensi dipertahankan.

Intinya, ini adalah bagaimana tugas dengan dependensi diparalelkan. Baca lebih lanjut di buku Concurrency di .NET.

Source: https://habr.com/ru/post/id422571/


All Articles