Halo pembaca. Sudah cukup banyak waktu berlalu sejak rilis .NET Core 2.1. Dan inovasi keren seperti Span dan Memori telah dipertimbangkan secara luas, Anda dapat membaca, melihat, dan mendengar banyak tentangnya. Namun, sayangnya, perpustakaan bernama System.IO.Pipelines tidak menerima perhatian yang sama. Hampir semua yang ada di topik ini adalah
satu -
satunya posting yang banyak diterjemahkan dan diposting di rumah. Pasti harus ada lebih banyak informasi sehingga mereka yang tertarik dapat melihat teknologi dari sudut yang berbeda.

Pendahuluan
Jadi, perpustakaan ini bertujuan untuk mempercepat pekerjaan dengan pemrosesan data streaming. Awalnya dibuat dan digunakan oleh tim pengembangan Kestrel (server web lintas platform untuk ASP.NET Core), tetapi saat ini dikirim melalui
paket nuget terpisah.
Sebelum kita mempelajari topik ini, kita dapat membayangkan mekanisme perpustakaan sebagai analog MemoryStream yang ditingkatkan. Masalah dengan MemoryStream asli adalah jumlah salinan yang berlebihan, yang jelas jika Anda ingat bahwa array byte pribadi digunakan di dalam sebagai buffer. Misalnya, dalam metode
Baca dan
Tulis , penyalinan terlihat jelas. Jadi, untuk objek yang ingin kita tulis ke aliran, salinan akan dibuat di buffer internal, dan selama membaca, salinan salinan internal akan dikirimkan ke konsumen. Sepertinya ini bukan penggunaan ruang yang paling rasional.
System.IO.Pipelines tidak bertujuan untuk mengganti semua stream, ini adalah alat tambahan dalam gudang pengembang yang menulis kode kinerja tinggi. Saya menyarankan agar Anda membiasakan diri dengan metode dan kelas dasar, melihat bagaimana mereka diatur di dalam, dan menganalisis contoh-contoh dasar.
Mari kita mulai dengan perangkat internal, pada saat yang sama memeriksa fragmen kode sederhana. Setelah itu, akan menjadi jelas apa dan bagaimana cara kerjanya, dan bagaimana itu harus digunakan. Ketika bekerja dengan System.IO.Pipelines, perlu diingat bahwa konsep dasarnya adalah bahwa semua operasi baca-tulis harus dilakukan tanpa alokasi tambahan. Tetapi beberapa metode yang menarik pada pandangan pertama bertentangan dengan aturan ini. Karenanya, kode yang Anda coba susah-susah mempercepat mulai mengalokasikan memori untuk data baru dan baru, memuat pengumpul sampah.
Perpustakaan internal perpustakaan menggunakan kemungkinan terluas dari versi terbaru bahasa dan rentang waktu, Rentang, Memori, kumpulan objek, ValueTask, dll. Layak untuk dilihat, setidaknya untuk contoh hebat menggunakan fitur-fitur ini dalam produksi.
Pada suatu waktu, beberapa tidak senang dengan implementasi stream di C #, karena satu kelas digunakan untuk membaca dan menulis. Tetapi, seperti yang mereka katakan, Anda tidak bisa membuang metode dari kelas. Bahkan jika aliran tidak mendukung membaca / menulis / memindahkan pointer, properti CanRead, CanWrite dan CanSeek mulai berlaku, yang tampak seperti penopang kecil. Di sini segalanya berbeda.
Untuk bekerja dengan pipa, 2 kelas digunakan:
PipeWriter dan
PipeReader . Kelas-kelas ini masing-masing berisi sekitar 50 baris dan pseudo-fasad (bukan inkarnasinya yang paling klasik, karena ada kelas tunggal yang tersembunyi di belakangnya, dan tidak banyak) untuk kelas
Pipa , yang berisi semua logika dasar untuk bekerja dengan data. Anggota publik - 2 konstruktor, 2 properti get-only - Reader dan Writer, metode Reset (), yang me-reset bidang internal ke keadaan awal mereka sehingga kelas dapat digunakan kembali. Metode lain untuk pekerjaan disebut menggunakan pseudo-fasad.
Untuk memulai kelas Pipa
Instance class menempati 320 byte, yang cukup banyak (hampir sepertiga dari satu kilobyte, 2 objek seperti itu tidak bisa muat dalam memori Manchester Mark I). Jadi mengalokasikannya dalam jumlah besar adalah ide yang buruk. Selain itu, makna objek dimaksudkan untuk penggunaan jangka panjang. Menggunakan kumpulan juga membuat argumen untuk pernyataan ini. Bagaimanapun, objek yang digunakan di kolam akan hidup selamanya (dalam hal apapun, dalam standar).
Perhatikan bahwa kelas ditandai sebagai tersegel dan aman bagi thread - banyak bagian kode merupakan bagian penting dan dibungkus dengan kunci.
Untuk memulai, buat turunan dari kelas Pipa dan dapatkan objek PipeReader dan PipeWriter menggunakan properti yang disebutkan.
Inisialisasi mudahvar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
Pertimbangkan metode untuk bekerja dengan pipa:
Untuk merekam melalui PipeWriter - WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Lengkap, CancelPendingFlush, OnReaderCompleted.
Untuk membaca melalui PipeReader - AdvanceTo, ReadAsync, TryRead, Lengkap, CancelPendingRead, OnWriterCompleted.
Seperti yang dinyatakan dalam
posting , kelas menggunakan daftar buffer yang terhubung secara tunggal. Tapi, jelas, mereka tidak lewat antara PipeReader dan PipeWriter - semua logika ada dalam satu kelas. Daftar ini digunakan untuk membaca dan menulis. Selain itu, data yang dikembalikan disimpan dalam daftar ini.
Ada juga objek yang menunjukkan awal data untuk dibaca (ReadHead dan indeks), akhir data untuk membaca (ReadTail dan indeks) dan awal tempat untuk menulis (WriteHead dan jumlah byte buffered yang ditulis). Di sini, ReadHead, ReadTail, dan WriteHead adalah segmen tertentu dari daftar, dan indeks menunjukkan posisi tertentu di dalam segmen tersebut. Dengan demikian, perekaman dapat dimulai dari tengah segmen, menangkap seluruh segmen berikutnya dan berakhir di tengah ketiga. Pointer ini bergerak dalam berbagai metode.
Memulai dengan Metode PipeWriter
Hanya metode menggoda itu. Memiliki tanda tangan yang sangat cocok dan trendi - menerima ReadOnlyMemory, asinkron. Dan banyak yang mungkin tergoda, terutama mengingat bahwa Rentang dan Memori begitu cepat dan keren. Tapi jangan menyanjung diri sendiri. Semua yang dilakukan metode ini adalah menyalin ReadOnlyMemory yang diteruskan ke daftar internal. Dan "salin" berarti panggilan ke metode CopyTo, dan tidak menyalin objek itu sendiri. Artinya, semua data yang ingin kita rekam akan disalin, sehingga memuat memori. Metode ini harus dipelajari hanya untuk memastikan bahwa lebih baik tidak menggunakannya. Ya, dan mungkin untuk beberapa situasi yang jarang, perilaku ini sesuai.
Tubuh metode adalah bagian penting, akses ke sana disinkronkan melalui monitor.
Maka timbul pertanyaan, bagaimana menulis sesuatu, jika tidak melalui metode yang paling jelas dan hanya cocok.
Metode ini mengambil satu parameter dari tipe integer. Di dalamnya kita harus menunjukkan berapa banyak byte yang ingin kita tulis (atau lebih, tetapi tidak kurang). Metode ini memeriksa apakah ada cukup ruang untuk menulis dalam fragmen memori saat ini yang disimpan dalam _writingHeadMemory. Jika cukup, _writingHeadMemory dikembalikan sebagai Memori. Jika tidak, maka untuk data yang ditulis ke buffer, tetapi metode FlushAsync tidak dipanggil, dipanggil dan BufferSegment lain dipilih, yang terhubung ke yang sebelumnya (di sini adalah daftar). Dengan tidak adanya _writingHeadMemory, ini diinisialisasi dengan BufferSegment baru. Dan alokasi buffer berikutnya adalah bagian penting dan dilakukan di bawah kunci.
Saya sarankan lihat contoh seperti itu. Pada pandangan pertama, sepertinya kompiler (atau runtime) telah memperdaya iblis.
Iblis var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
Tetapi segala sesuatu dalam contoh ini dapat dimengerti dan sederhana.
Saat membuat turunan Pipa, kita bisa melewatkan objek
PipeOptions dengan opsi untuk membuatnya ke konstruktor.
PipeOptions memiliki bidang ukuran segmen minimum default. Belum lama berselang 2048, tetapi
komit ini mengubah segalanya, sekarang 4096. Pada saat penulisan, versi dengan 4096 adalah paket pra-rilis, dalam versi rilis terbaru adalah 2048. Ini menjelaskan perilaku contoh pertama. Jika Anda kritis dalam menggunakan ukuran yang lebih kecil untuk buffer standar, Anda dapat menentukannya dalam contoh tipe PipeOptions.
Tetapi dalam contoh kedua, di mana ukuran minimum ditunjukkan, panjangnya tidak cocok. Dan ini sudah terjadi karena penciptaan BufferSegment baru terjadi menggunakan kumpulan. Salah satu opsi di PipeOptions adalah kumpulan memori. Setelah itu, kumpulan yang ditentukan akan digunakan untuk membuat segmen baru. Jika Anda tidak menentukan kumpulan memori Anda, ArrayPool standar akan digunakan, yang, seperti yang Anda ketahui, memiliki beberapa ember untuk berbagai ukuran array (masing-masing berikutnya 2 kali lebih besar dari sebelumnya) dan, ketika ditanya untuk ukuran tertentu, itu mencari ember dengan array ukuran yang sesuai (kemudian ada yang terdekat lebih besar atau sama). Dengan demikian, buffer baru hampir pasti akan lebih besar dari yang Anda minta. Ukuran minimum array di ArrayPool standar (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) adalah 16. Tapi jangan khawatir, ini adalah kumpulan array. Oleh karena itu, dalam sebagian besar kasus, array tidak memberikan tekanan pada pemulung dan akan digunakan kembali.
Ini bekerja sama, memberikan Rentang dari Memori.
Jadi GetMemory () atau GetSpan () adalah metode utama untuk menulis. Mereka memberi kita objek yang bisa kita tulis. Untuk melakukan ini, kita tidak perlu mengalokasikan memori untuk array nilai baru, kita dapat menulis langsung ke struktur internal. Yang mana yang akan digunakan terutama tergantung pada API yang Anda gunakan dan metode asinkron. Namun, mengingat hal di atas, muncul pertanyaan. Bagaimana pembaca tahu berapa banyak yang kami tulis? Jika kami selalu menggunakan implementasi spesifik pool, yang memberikan array dengan ukuran yang persis sama seperti yang diminta, maka pembaca dapat membaca seluruh buffer sekaligus. Namun, seperti yang telah kami katakan, kami dialokasikan buffer dengan probabilitas tinggi dari ukuran yang lebih besar. Ini mengarah ke metode berikut yang diperlukan untuk operasi.
Metode sederhana yang mengerikan. Dibutuhkan jumlah byte yang ditulis sebagai argumen. Mereka menambah penghitung internal - _unflushedBytes dan _writingHeadBytesBuffered, yang namanya berbicara sendiri. Ini juga memotong _writingHeadMemory persis ke jumlah byte yang ditulis (menggunakan metode Slice). Oleh karena itu, setelah memanggil metode ini, Anda perlu meminta blok memori baru dalam bentuk Memori atau Rentang, Anda tidak dapat menulis ke yang sebelumnya. Dan seluruh tubuh metode adalah bagian penting dan berjalan di bawah kunci.
Tampaknya setelah ini pembaca dapat menerima data. Tapi satu langkah lagi diperlukan.
Metode ini dipanggil setelah kami menulis data yang diperlukan ke Memori yang diterima dan menunjukkan berapa banyak kami menulis di sana. Metode mengembalikan ValueTask, namun tidak asinkron (tidak seperti StreamPipeWriter turunannya). ValueTask adalah tipe khusus (readonly struct) yang digunakan dalam kasus ketika sebagian besar panggilan tidak akan menggunakan asinkron, yaitu, semua data yang diperlukan akan tersedia pada saat panggilannya dan metode ini akan berakhir secara sinkron. Di dalam, itu berisi data atau Tugas (kalau-kalau itu tidak berhasil secara serempak). Itu tergantung pada keadaan properti _writerAwaitable.IsCompleted. Jika kita mencari apa yang mengubah keadaan objek tunggu ini, kita akan melihat bahwa ini terjadi dengan syarat bahwa jumlah data yang belum diproses (tidak dikonsumsi) (ini tidak persis sama dengan yang belum dibaca (tidak diperiksa), akan dijelaskan kemudian) melebihi ambang tertentu (_pauseWriterThreshold). Standarnya adalah 16 ukuran segmen. Jika diinginkan, nilainya dapat diubah di PipeOptions. Selain itu, metode ini memulai kelanjutan dari metode ReadAsync, jika ada yang diblokir.
Mengembalikan FlushResult yang mengandung 2 properti - IsCanceled dan IsCompleted. IsCanceled menunjukkan apakah Flush telah dibatalkan (panggilan CancelPendingFlush). IsCompleted menunjukkan apakah PipeReader selesai (dengan memanggil metode Complete () atau CompleteAsync ()).
Bagian utama dari metode ini dilakukan di bawah Locke Skywalker.
Metode lain PipeWriter tidak menarik dari sudut pandang implementasi dan digunakan lebih jarang, oleh karena itu hanya deskripsi singkat yang akan diberikan.
# 5 batal Selesai (Pengecualian pengecualian = null) atau ValueTask CompleteAsync (Pengecualian pengecualian = null)
Pipa tanda ditutup untuk penulisan. Setelah selesai, pengecualian akan dilemparkan ketika mencoba menggunakan metode untuk menulis. Jika PipeReader telah selesai, seluruh instance Pipe juga selesai. Sebagian besar pekerjaan dilakukan di bawah kunci.
# 6 void CancelPendingFlush ()
Seperti namanya, ini melengkapi operasi FlushAsync () saat ini. Ada lok.
# 7 void OnReaderCompleted (Tindakan <Pengecualian, objek> panggilan balik, status objek)
Menjalankan delegasi yang didelegasikan ketika pembaca selesai. Ada juga kunci.
Dokumentasi saat ini mengatakan bahwa metode ini mungkin tidak dipanggil pada beberapa keturunan PipeWriter dan akan dihapus di masa depan. Karena itu, Anda tidak boleh mengikat logika dengan metode ini.
Pergi ke PipeReader
Di sini, seperti FlushAsync, ValueTask dikembalikan, yang mengisyaratkan bahwa metode ini kebanyakan sinkron, tetapi tidak selalu. Tergantung pada kondisi _readerAwaitable. Seperti halnya FlushAsync, Anda perlu mengetahui kapan _readerAwaitable disetel menjadi tidak lengkap. Ini terjadi ketika PipeReader membaca semuanya dari daftar (atau berisi data yang ditandai sebagai diperiksa dan membutuhkan lebih banyak data untuk melanjutkan). Padahal, itu logis. Dengan demikian, kita dapat menyimpulkan bahwa diinginkan untuk menyempurnakan Pipa untuk pekerjaan Anda, untuk mengatur semua pilihannya secara bijaksana, berdasarkan statistik yang diidentifikasi secara empiris. Konfigurasi yang tepat akan mengurangi kemungkinan cabang eksekusi asinkron dan akan memungkinkan pemrosesan data yang lebih efisien. Hampir seluruh metode dikelilingi oleh kunci.
Mengembalikan
ReadResult yang misterius. Faktanya, ini hanya buffer + flag yang menunjukkan status operasi (IsCanceled - apakah ReadAsync dibatalkan dan IsCompleted menunjukkan apakah PipeWriter ditutup). Dalam kasus ini, IsCompleted adalah nilai yang menunjukkan apakah metode PipeWriter Complete () atau CompleteAsync () dipanggil. Jika metode ini dipanggil dengan pengecualian, maka itu akan dibuang ketika mencoba membaca.
Buffer lagi memiliki tipe misterius -
ReadOnlySequence . Ini, pada gilirannya, adalah objek untuk mengandung
segmen (ReadOnlySequenceSegment) dari awal dan akhir + awal dan akhir indeks di dalam segmen yang sesuai. Yang sebenarnya menyerupai struktur kelas Pipe itu sendiri. Omong-omong, BufferSegment adalah penerus dari ReadOnlySequenceSegment, yang menunjukkan bahwa itu digunakan di sana. Berkat ini, Anda bisa menyingkirkan alokasi memori yang tidak perlu untuk transfer data dari penulis ke pembaca.
ReadOnlySpan dapat diperoleh dari buffer untuk diproses lebih lanjut. Untuk melengkapi gambar, Anda dapat memeriksa apakah buffer berisi satu ReadOnlySpan. Jika mengandung, kita tidak perlu mengulangi koleksi dari satu elemen dan kita bisa mendapatkannya menggunakan properti First. Jika tidak, Anda perlu memeriksa semua segmen di buffer dan memproses masing-masing ReadOnlySpan.
Topik diskusi - di kelas ReadOnlySequence, tipe referensi nullable secara aktif digunakan dan ada goto (bukan untuk keluar dari bersarang dan tidak dalam kode yang dihasilkan) - khususnya,
di siniSetelah diproses, Anda harus menjelaskan kepada instance Pipe bahwa kami telah membaca data.
Versi sinkron. Memungkinkan Anda mendapatkan hasilnya jika ya. Jika belum ada, tidak seperti ReadAsync, ia tidak memblokir, tetapi mengembalikan false. Juga di kunci.
Dalam metode ini, Anda dapat menentukan berapa banyak byte yang kami baca dan berapa banyak yang diproses. Data yang telah dibaca tetapi tidak diproses akan dikembalikan saat berikutnya dibaca. Fitur ini mungkin tampak aneh pada pandangan pertama, tetapi ketika memproses aliran byte jarang diperlukan untuk memproses setiap byte secara individual. Biasanya, data dipertukarkan menggunakan pesan. Suatu situasi dapat muncul bahwa pembaca, ketika membaca, menerima satu pesan utuh dan bagian dari pesan kedua. Seluruhnya harus diproses, dan bagian yang kedua harus dibiarkan lain kali sehingga ia datang bersama dengan bagian yang tersisa. Metode AdvanceTo menerima SequencePosition, yang sebenarnya merupakan indeks segmen + di dalamnya. Saat memproses semua yang telah dibaca ReadAsync, Anda dapat menentukan buffer.End. Jika tidak, Anda harus secara eksplisit membuat posisi, menunjukkan segmen dan indeks tempat pemrosesan dihentikan. Di bawah kap lok.
Juga, jika jumlah informasi mentah kurang dari cacat yang diinstal (_resumeWriterThreshold), itu memulai kelanjutan PipeWriter jika diblokir. Secara default, ambang ini adalah 8 volume segmen (setengah dari ambang batas pemblokiran).
Kekosongan # 4 Lengkap (Pengecualian pengecualian = nol)
Menyelesaikan PipeReader. Jika PipeWriter selesai pada titik ini, maka seluruh instance Pipe berakhir. Kunci di dalam.
# 5 batal CancelPendingRead ()
Memungkinkan Anda membatalkan pembacaan yang saat ini diharapkan. Locke.
# 6 void OnWriterCompleted (Tindakan <Pengecualian, objek> panggilan balik, status objek)
Memungkinkan Anda menentukan delegasi yang akan dieksekusi setelah menyelesaikan PipeWriter.
Seperti metode serupa untuk PipeWriter,
dokumentasi memiliki catatan yang sama yang akan dihapus. Kunci di bawah tenda.
Contoh
Daftar di bawah ini menunjukkan contoh bekerja dengan pipa.
Sejak diperkenalkannya .NET Core Span dan Memory, banyak kelas untuk bekerja dengan data telah ditambah dengan kelebihan menggunakan jenis ini. Jadi skema interaksi umum akan kurang lebih sama. Dalam contoh saya, saya menggunakan pipa untuk bekerja dengan pipa (saya suka kata-kata root), yaitu saluran - objek OS untuk komunikasi antarproses. API saluran baru saja diperluas untuk membaca data dalam Rentang dan Memori. Versi asinkron menggunakan Memori, karena metode asinkron akan dikonversikan ke metode templat menggunakan mesin keadaan terhingga yang dihasilkan secara otomatis, di mana semua variabel lokal dan parameter metode disimpan, dan karena Span adalah ref readonly struct, masing-masing tidak dapat berada di heap, masing-masing, menggunakan Span dalam metode asinkron tidak dimungkinkan. Tetapi ada juga versi sinkron dari metode yang memungkinkan Anda untuk menggunakan Span. Dalam contoh saya, saya mencoba keduanya dan ternyata versi sinkron dalam situasi ini menunjukkan dirinya lebih baik. Saat menggunakannya, pengumpulan sampah lebih sedikit, dan pemrosesan data lebih cepat. Tapi ini hanya karena ada banyak data. Jika terjadi situasi di mana tidak akan ada data pada saat mengajukan aplikasi untuk batch berikutnya, Anda harus menggunakan versi asinkron agar tidak membebani prosesor idle.
Contohnya memiliki komentar yang menjelaskan beberapa poin. Saya menarik perhatian Anda pada fakta bahwa terlepas dari kenyataan bahwa fragmen program yang bertanggung jawab untuk membaca dari pipa dan pemrosesan dipisahkan, ketika menulis ke file, data tersebut dibaca persis dari tempat di mana mereka ditulis ketika membaca dari pipa.
Evolusi bertahun-tahun demi fitur yang kuat - asinkron maine class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
Penulis pipepata public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
Dataprocessor public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
Prosesor byte public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {