Halo pembaca. Sudah cukup banyak waktu berlalu sejak rilis .NET Core 2.1. Dan inovasi keren seperti Span dan Memori sudah banyak dikenal, Anda dapat membaca, melihat, dan mendengar banyak tentangnya. Namun, sayangnya, perpustakaan bernama System.IO. Pipeslines tidak menerima perhatian yang sama. Hampir semua yang ada pada topik ini adalah
satu-satunya posting yang telah diterjemahkan dan disalin pada banyak sumber. Seharusnya ada lebih banyak informasi tentang teknologi itu untuk melihatnya dari sudut yang berbeda.

Pendahuluan
Jadi, perpustakaan ini bertujuan untuk mempercepat pemrosesan data streaming. Awalnya dibuat dan digunakan oleh tim pengembangan Kestrel (server web lintas-platform untuk ASP.NET Core), tetapi saat ini tersedia untuk manusia melalui
paket nuget .
Sebelum kita mempelajari topik ini, kita dapat membayangkan mekanisme perpustakaan sebagai analog MemoryStream yang ditingkatkan. Masalah dengan MemoryStream asli adalah jumlah salinan yang berlebihan, yang jelas jika Anda ingat bahwa array byte pribadi digunakan di dalam MemoryStream sebagai buffer. Misalnya, dalam metode
Baca dan
Tulis, Anda dapat dengan jelas melihat penyalinan data. Jadi, untuk objek yang ingin kita tulis ke aliran, salinan akan dibuat di buffer internal, dan selama membaca, salinan salinan internal akan dikembalikan ke konsumen. Sepertinya ini bukan penggunaan memori yang paling rasional.
System.IO.Pipelines tidak bertujuan untuk mengganti semua stream, ini adalah alat tambahan dalam gudang pengembang yang menulis kode kinerja tinggi. Saya menyarankan agar Anda membiasakan diri dengan metode dan kelas dasar, melihat detail implementasi mereka dan menganalisis contoh-contoh dasar.
Mari kita mulai dengan detail internal dan implementasi, pada saat yang sama melihat fragmen kode sederhana. Setelah itu, akan menjadi jelas bagaimana cara kerjanya dan bagaimana seharusnya digunakan. Ketika bekerja dengan System.IO.Pipelines, perlu diingat bahwa konsep dasarnya adalah bahwa semua operasi baca-tulis harus dilakukan tanpa alokasi tambahan. Tetapi beberapa metode yang menarik pada pandangan pertama bertentangan dengan aturan ini. Dengan demikian, kode yang Anda coba percepat dengan susah payah mulai mengalokasikan memori untuk data baru dan baru, memuat pengumpul sampah.
Internal perpustakaan menggunakan kemungkinan terluas dari versi terbaru bahasa dan runtime - Rentang, Memori, kumpulan objek, ValueTask, dan sebagainya. Layak untuk melihat setidaknya di sana untuk contoh yang bagus menggunakan fitur-fitur ini dalam produksi.
Pada suatu waktu, beberapa pengembang tidak puas dengan implementasi stream di C #, karena satu kelas digunakan untuk membaca dan menulis. Tetapi seperti yang mereka katakan, Anda tidak dapat membuang metode dari kelas. Bahkan jika streaming tidak mendukung membaca / menulis / mencari, properti CanRead, CanWrite dan CanSeek digunakan. Itu terlihat seperti tongkat kecil. Tetapi sekarang segalanya menjadi berbeda.
Untuk bekerja dengan jalur pipa, 2 kelas digunakan:
PipeWriter dan
PipeReader . Kelas-kelas ini berisi sekitar 50 baris kode dan fasad pseudo (bukan yang paling klasik dari inkarnasinya, karena mereka menyembunyikan satu kelas, tidak banyak) untuk
pipa kelas, yang berisi semua logika dasar untuk bekerja dengan data. Kelas ini berisi 5 anggota publik: 2 konstruktor, 2 properti get-only - Reader dan Writer, metode Reset (), yang me-reset bidang internal ke keadaan awal mereka sehingga kelas dapat digunakan kembali. Metode yang tersisa untuk pekerjaan internal dan disebut menggunakan pseudo-fasad.
Mari kita mulai dengan kelas pipa
Instance class menempati 320 byte, yang cukup banyak (hampir sepertiga dari satu kilobyte, 2 objek seperti itu tidak bisa muat dalam memori Manchester Mark I). Jadi mengalokasikan sejumlah besar instance itu adalah ide yang buruk. Selain itu, objek ini dimaksudkan untuk penggunaan jangka panjang. Menggunakan kumpulan juga membuat argumen untuk pernyataan ini. Objek yang digunakan di pool akan hidup selamanya (untuk implementasi pool standar).
Perhatikan bahwa kelas ditandai sebagai tersegel dan aman bagi thread - banyak bagian kode merupakan bagian penting dan dibungkus dengan kunci.
Untuk mulai menggunakan kelas ini, Anda harus membuat turunan dari kelas Pipa dan mendapatkan objek PipeReader dan PipeWriter menggunakan properti yang disebutkan.
Inisialisasi sederhanavar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
Pertimbangkan metode untuk bekerja dengan pipa:
Menulis dengan PipeWriter - WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Lengkap, CancelPendingFlush, OnReaderCompleted.
Membaca dengan PipeReader - AdvanceTo, ReadAsync, TryRead, Lengkap, CancelPendingRead, OnWriterCompleted.
Sebagaimana dinyatakan dalam
posting yang disebutkan , kelas menggunakan daftar buffer yang terhubung secara tunggal. Tapi, jelas, mereka tidak ditransfer antara PipeReader dan PipeWriter - semua logika ada dalam satu kelas. Daftar ini digunakan untuk membaca dan menulis. Selain itu, data yang dikembalikan disimpan dalam daftar ini (jadi tidak ada penyalinan yang dilakukan).
Juga, ada objek yang menunjukkan awal data untuk dibaca (ReadHead dan indeks), akhir data untuk membaca (ReadTail dan indeks) dan awal ruang untuk menulis (WriteHead dan jumlah byte buffered yang ditulis). Di sini, ReadHead, ReadTail, dan WriteHead adalah anggota khusus (segmen) dari daftar internal segmen, dan indeks menunjukkan posisi tertentu dalam segmen tersebut. Dengan demikian, rekaman dapat dimulai dari tengah segmen, menangkap satu segmen berikutnya secara keseluruhan dan berakhir di tengah segmen ketiga. Pointer ini dipindahkan dengan berbagai metode.
Memulai dengan metode PipeWriter
Itu disebutkan menarik pada metode pandangan pertama. Ini memiliki tanda tangan yang sangat cocok dan modis - menerima ReadOnlyMemory, asinkron. Dan banyak yang mungkin tergoda, terutama mengingat bahwa Rentang dan Memori begitu cepat dan keren. Tapi jangan menyanjung diri sendiri. Semua yang dilakukan metode ini adalah menyalin ReadOnlyMemory yang diteruskan ke daftar internal. Dan dengan "menyalin" berarti panggilan ke metode CopyTo (), dan tidak hanya menyalin objek itu sendiri. Semua data yang ingin kami rekam akan disalin, sehingga memuat memori. Metode ini harus disebutkan hanya untuk memastikan bahwa lebih baik tidak menggunakannya. Ya, dan mungkin untuk beberapa situasi yang jarang, perilaku ini sesuai.
Tubuh metode adalah bagian penting, akses ke sana disinkronkan melalui monitor.
Maka timbul pertanyaan, bagaimana menulis sesuatu, jika tidak melalui metode yang paling jelas dan hanya cocok
Metode ini mengambil satu parameter dari tipe integer. Di dalamnya, kita harus menunjukkan berapa banyak byte yang ingin kita tulis ke pipeline (berapa ukuran buffer yang kita inginkan). Metode ini memeriksa apakah ada cukup ruang untuk menulis dalam fragmen memori saat ini yang disimpan dalam _writingHeadMemory. Jika cukup, _writingHeadMemory dikembalikan sebagai Memori. Kalau tidak, untuk data yang ditulis ke buffer, tetapi metode FlushAsync tidak dipanggil, dipanggil dan BufferSegment lain dialokasikan, yang terhubung ke yang sebelumnya (berikut adalah daftar internal kami). Jika _writingHeadMemory adalah nol, ini diinisialisasi dengan BufferSegment baru. Dan alokasi buffer adalah bagian kritis dan dilakukan di bawah kunci.
Saya sarankan untuk melihat contoh seperti itu. Pada pandangan pertama, sepertinya kompiler (atau runtime) telah memperdaya iblis.
Iblis var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
Tetapi segala sesuatu dalam contoh ini dapat dimengerti dan sederhana.
Saat membuat turunan Pipa, kita bisa meneruskan objek
PipeOptions ke dalam konstruktor dengan opsi untuk membuat.
PipeOptions memiliki bidang ukuran segmen minimum default. Belum lama ini, itu adalah 2048, tetapi
komit ini telah memperbarui nilai ini ke 4096. Pada saat penulisan artikel ini, versi 4096 berada dalam paket prugelease nuget, versi rilis terakhir memiliki nilai 2048. Ini menjelaskan contoh perilaku pertama. Jika Anda kritis dalam menggunakan ukuran yang lebih kecil untuk buffer default, Anda dapat menentukannya dalam contoh tipe PipeOptions.
Tetapi dalam contoh kedua, di mana ukuran minimum ditentukan, panjangnya tidak cocok. Dan ini terjadi karena pembuatan BufferSegment baru terjadi menggunakan kumpulan. Salah satu opsi di PipeOptions adalah kumpulan memori. Setelah itu, kumpulan yang ditentukan akan digunakan untuk membuat segmen baru. Jika Anda tidak menentukan kumpulan memori, ArrayPool default akan digunakan, yang, seperti yang Anda ketahui, memiliki beberapa ember untuk berbagai ukuran array (masing-masing berikutnya 2 kali lebih besar dari yang sebelumnya) dan ketika diminta untuk tertentu ukuran, itu mencari ember dengan array ukuran yang sesuai (yaitu, yang terdekat lebih besar atau sama). Dengan demikian, buffer baru hampir pasti akan lebih besar dari yang Anda minta. Ukuran array minimum di ArrayPool default (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) adalah 16. Tapi jangan khawatir, ini adalah kumpulan array. Karenanya, dalam sebagian besar kasus, array tidak memberikan tekanan pada pengumpul sampah dan akan digunakan kembali nanti.
Ini bekerja sama, memberikan Rentang dari Memori.
Jadi GetMemory () atau GetSpan () adalah metode utama untuk menulis. Mereka memberi kita objek yang bisa kita tulis. Untuk melakukan ini, kita tidak perlu mengalokasikan memori untuk array nilai baru, kita dapat menulis langsung ke dalam pipa. Yang mana yang akan digunakan terutama tergantung pada API yang Anda gunakan dan metode asinkron. Namun, mengingat hal di atas, muncul pertanyaan. Bagaimana pembaca tahu berapa banyak yang kami tulis? Jika kami selalu menggunakan implementasi spesifik pool, yang memberikan array dengan ukuran yang persis sama seperti yang diminta, maka pembaca dapat membaca seluruh buffer sekaligus. Namun, seperti yang telah kami katakan, kami dialokasikan buffer dengan probabilitas tinggi dari ukuran yang lebih besar. Ini mengarah ke metode berikut yang diperlukan untuk operasi.
Metode yang sangat sederhana. Dibutuhkan jumlah byte yang ditulis sebagai argumen. Mereka menambah penghitung internal - _unflushedBytes dan _writingHeadBytesBuffered, yang namanya berbicara sendiri. Ini juga memotong (irisan) _writingHeadMemory persis ke jumlah byte yang ditulis (menggunakan metode Slice). Oleh karena itu, setelah memanggil metode ini, Anda perlu meminta blok memori baru dalam bentuk Memori atau Rentang, Anda tidak dapat menulis ke yang sebelumnya. Dan seluruh tubuh metode adalah bagian penting dan berjalan di bawah kunci.
Tampaknya setelah ini pembaca dapat menerima data. Tapi satu langkah lagi diperlukan.
Metode ini dipanggil setelah kami menulis data yang diperlukan ke Memori yang diterima (GetMemory) dan menunjukkan seberapa banyak kami menulis di sana (Lanjutan). Metode mengembalikan ValueTask, namun tidak asinkron (tidak seperti StreamPipeWriter turunannya). ValueTask adalah tipe khusus (readonly struct) yang digunakan dalam kasus ketika sebagian besar panggilan tidak akan sinkron, itu semua data yang diperlukan akan tersedia pada saat panggilannya dan metode ini akan berakhir secara sinkron. Di dalamnya sendiri berisi data atau Tugas (kalau-kalau tidak bekerja secara serempak). Itu tergantung pada properti _writerAwaitable.IsCompleted. Jika kita mencari apa yang mengubah keadaan _writerAwaitable ini, kita akan melihat bahwa ini terjadi jika jumlah data yang tidak dikonsumsi (ini tidak persis sama dengan data yang tidak diperiksa akan dijelaskan nanti) melebihi ambang tertentu (_pauseWriterThreshold). Nilai default adalah 16 ukuran segmen. Jika diinginkan, nilainya dapat diubah di PipeOptions. Selain itu, metode ini memulai kelanjutan dari metode ReadAsync, jika ada yang diblokir.
Mengembalikan FlushResult yang mengandung 2 properti - IsCanceled dan IsCompleted. IsCanceled menunjukkan apakah Flush telah dibatalkan (CancelPendingFlush () panggilan). IsCompleted menunjukkan apakah PipeReader telah menyelesaikan (dengan memanggil metode Complete () atau CompleteAsync ()).
Bagian utama dari metode ini dilakukan di bawah kunci.
Metode lain PipeWriter tidak menarik dari sudut pandang implementasi dan digunakan lebih jarang, oleh karena itu hanya deskripsi singkat yang akan diberikan.
# 5 batal Selesai (Pengecualian pengecualian = null) atau ValueTask CompleteAsync (Pengecualian pengecualian = null)
Pipa tanda ditutup untuk penulisan. Pengecualian akan dilemparkan ketika mencoba menggunakan metode tulis setelah selesai. Jika PipeReader telah selesai, seluruh instance Pipe juga selesai. Sebagian besar pekerjaan dilakukan di bawah kunci.
# 6 void CancelPendingFlush ()
Seperti namanya, ini membatalkan operasi FlushAsync () saat ini. Ada kunci.
# 7 void OnReaderCompleted (Tindakan <Pengecualian, objek> panggilan balik, status objek)
Menjalankan delegasi yang lulus ketika pembaca selesai. Ada juga kunci.
Dalam
dokumentasi saat ini tertulis bahwa metode ini mungkin tidak dipanggil pada beberapa implementasi PipeWriter dan akan dihapus di masa depan. Karena itu, Anda tidak boleh mengikat logika dengan metode ini.
Sudah waktunya untuk PipeReader
Di sini, seperti di FlushAsync (), ValueTask dikembalikan, yang mengisyaratkan bahwa metode ini kebanyakan sinkron, tetapi tidak selalu. Tergantung pada kondisi _readerAwaitable. Seperti halnya FlushAsync, Anda perlu mengetahui kapan _readerAwaitable disetel menjadi tidak lengkap. Ini terjadi ketika PipeReader telah membaca semuanya dari daftar internal (atau itu berisi data yang ditandai sebagai diperiksa dan Anda memerlukan lebih banyak data untuk melanjutkan). Yang, sebenarnya, sudah jelas. Dengan demikian, kita dapat menyimpulkan bahwa diinginkan untuk menyempurnakan Pipa untuk pekerjaan Anda, untuk mengatur semua pilihannya secara bijaksana, berdasarkan statistik yang diidentifikasi secara empiris. Konfigurasi yang tepat akan mengurangi kemungkinan cabang eksekusi asinkron dan akan memungkinkan pemrosesan data yang lebih efisien. Hampir semua kode di seluruh metode dikelilingi oleh kunci.
Mengembalikan
ReadResult yang misterius. Bahkan, itu hanya buffer + bendera yang menunjukkan status operasi (IsCanceled - apakah ReadAsync dibatalkan dan IsCompleted menunjukkan apakah PipeWriter ditutup). IsCompleted adalah nilai yang menunjukkan apakah metode PipeWriter Complete () atau CompleteAsync () dipanggil. Jika metode ini dipanggil dengan pengecualian yang dilewati, maka itu akan dibuang ketika mencoba membaca.
Dan lagi, buffer memiliki tipe misterius -
ReadOnlySequence . Ini, pada gilirannya, adalah objek untuk isi segmen
(ReadOnlySequenceSegment) dari awal dan akhir + awal dan akhir indeks di dalam segmen yang sesuai. Yang sebenarnya menyerupai struktur kelas Pipe itu sendiri. Omong-omong, BufferSegment diwarisi dari ReadOnlySequenceSegment, yang mengisyaratkan bahwa BufferSegment digunakan dalam urutan ini. Berkat ini, Anda bisa menyingkirkan alokasi memori yang tidak perlu untuk transfer data dari penulis ke pembaca.
ReadOnlySpan dapat diperoleh dari buffer untuk diproses lebih lanjut. Untuk melengkapi gambar, Anda dapat memeriksa apakah buffer berisi satu ReadOnlySpan. Jika mengandung, kita tidak perlu mengulangi koleksi dari satu elemen dan kita bisa mendapatkannya menggunakan properti First. Jika tidak, perlu untuk membahas semua segmen di buffer dan memproses ReadOnlySpan masing-masing.
Topik diskusi - di kelas ReadOnlySequence, tipe referensi nullable secara aktif digunakan dan ada goto (bukan untuk deep loop nesting dan bukan dalam kode yang dihasilkan) - khususnya, di
sini .
Setelah diproses, Anda perlu memberi sinyal instance Pipa yang kami baca datanya.
Versi sinkron. Memungkinkan Anda mendapatkan hasilnya jika itu ada. Jika tidak, tidak seperti ReadAsync, ia tidak memblokir dan mengembalikan false. Kode metode ini juga ada di kunci.
Dalam metode ini, Anda dapat menentukan berapa banyak byte yang kami periksa dan konsumsi. Data yang telah diperiksa tetapi tidak dikonsumsi akan dikembalikan saat berikutnya dibaca. Fitur ini mungkin tampak aneh pada pandangan pertama, tetapi ketika memproses aliran byte jarang diperlukan untuk memproses setiap byte secara individual. Biasanya data dipertukarkan menggunakan pesan. Suatu situasi dapat muncul bahwa pembaca, ketika membaca, menerima satu pesan utuh dan bagian dari pesan kedua. Keseluruhan harus diproses, dan bagian yang kedua harus dibiarkan untuk masa depan sehingga ia datang bersama dengan bagian yang tersisa. Metode AdvanceTo mengambil SequencePosition, yang sebenarnya merupakan indeks segmen +. Saat memproses semua yang telah dibaca ReadAsync, Anda dapat menentukan buffer.End. Jika tidak, Anda harus secara eksplisit membuat posisi, menunjukkan segmen dan indeks tempat pemrosesan dihentikan. Kunci ada di bawah kap.
Juga, jika jumlah informasi yang tidak dikonsumsi kurang dari ambang yang ditentukan (_resumeWriterThreshold), itu mulai kelanjutan PipeWriter jika diblokir. Secara default, ambang ini adalah 8 volume segmen (setengah dari ambang batas pemblokiran).
Kekosongan # 4 Lengkap (Pengecualian pengecualian = nol)
Menyelesaikan PipeReader. Jika PipeWriter selesai pada titik ini, maka seluruh instance Pipe selesai. Kunci di dalam.
# 5 batal CancelPendingRead ()
Memungkinkan Anda membatalkan pembacaan yang saat ini dalam status tertunda. Kunci
# 6 void OnWriterCompleted (Tindakan <Pengecualian, objek> panggilan balik, status objek)
Memungkinkan Anda menentukan delegasi yang akan dieksekusi setelah menyelesaikan PipeWriter.
Seperti metode serupa PipeWriter, dalam
dokumentasi ada tag yang sama yang akan dihapus. Kunci ada di bawah kap.
Contoh
Daftar di bawah ini menunjukkan contoh bekerja dengan pipa.
Sejak diperkenalkannya .NET Core Span dan Memory, banyak kelas untuk bekerja dengan data telah ditambah dengan kelebihan menggunakan jenis ini. Jadi skema interaksi umum akan kurang lebih sama. Dalam contoh saya, saya menggunakan pipa untuk bekerja dengan pipa (saya suka kata-kata serupa) - objek OS untuk komunikasi antarproses. API pipa baru saja diperluas untuk membaca data dalam Rentang dan Memori. Versi asinkron menggunakan Memori, karena metode asinkron akan dikonversikan ke metode templat menggunakan mesin keadaan terhingga yang dihasilkan secara otomatis, di mana semua variabel lokal dan parameter metode disimpan, dan karena Span adalah ref readonly struct, itu tidak dapat ditempatkan di heap, masing-masing, menggunakan Span dalam metode asinkron tidak mungkin. Tetapi ada juga versi sinkron dari metode yang memungkinkan Anda untuk menggunakan Span. Dalam contoh saya, saya mencoba keduanya dan ternyata versi sinkron dalam situasi ini menunjukkan dirinya lebih baik. Saat menggunakannya, pengumpulan sampah lebih sedikit, dan pemrosesan data lebih cepat. Tapi ini hanya karena ada banyak data di dalam pipa (data selalu tersedia). Dalam situasi di mana agaknya tidak akan ada data pada saat mengajukan aplikasi untuk batch berikutnya, Anda harus menggunakan versi asinkron agar tidak membebani prosesor idle.
Contohnya memiliki komentar yang menjelaskan beberapa poin. Saya menarik perhatian Anda pada fakta bahwa terlepas dari kenyataan bahwa fragmen program yang bertanggung jawab untuk membaca dari pipa dan pemrosesan dipisahkan, ketika menulis ke file, data tersebut dibaca persis dari tempat di mana itu ditulis ketika membaca dari pipa.
Evolusi bertahun-tahun demi fitur yang kuat - asinkron utama class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
Penulis pipepata public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
Dataprocessor public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
Prosesor byte public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {