Bermigrasi ke infrastruktur menunggu async di Rust

migrasi burung
sumber img


Sebuah peristiwa besar terjadi untuk komunitas Rust minggu lalu - versi kompiler 1.39 dirilis , dan di tempat itu stabilisasi fitur-fitur async-waiting. Dalam posting ini, saya akan mencoba merangkum semua perubahan yang relevan dalam kompiler dan ekosistem, serta memberikan instruksi migrasi untuk paradigma menunggu-async. Saya tidak akan melakukan analisis terperinci dari asynchrony di Rust, masih ada artikel yang relevan tentang Habré yang akan membantu untuk masuk ke dalam topik:



Selain artikel-artikel ini, Anda juga dapat merujuk ke dokumentasi perpustakaan standar dan peti yang diperlukan, serta membaca buku-async (dalam bahasa Inggris).


Semua contoh yang dibahas dalam artikel ini berfungsi pada kompiler stabil 1.39 dan harus bekerja pada semua versi selanjutnya. Kode terakhir tersedia di github .


Untuk menerapkan kode asinkron, pustaka berjangka-0,1 digunakan. Ini memberikan sifat dasar futures::Future dan futures::Stream untuk bekerja dengan komputasi yang ditangguhkan. Mereka beroperasi pada tipe Result<..> dan menyediakan satu set kombinator. Selain itu, perpustakaan menyediakan saluran untuk komunikasi antara tugas (tugas), berbagai antarmuka untuk bekerja dengan pelaksana dan sistem tugasnya, dan banyak lagi.


Pertimbangkan contoh yang menghasilkan serangkaian angka dari 32 bit faktorial tertinggi dan mengirimkannya ke Sink :


 // futures 0.1.29 use futures::prelude::*; use futures::{stream, futures}; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>, ) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ()) } 

Catatan: Mempertimbangkan tugas-tugas yang terikat CPU pada coroutine bukanlah aplikasi terbaik, tetapi contohnya mandiri dan sederhana.


Seperti yang dapat Anda lihat, kode tersebut terlihat agak rumit: Anda harus menentukan nilai kembali, meskipun faktanya tidak ada nilai yang berguna di dalamnya. Dalam futures 0,3, kode menjadi sedikit lebih mudah:


 // futures 0.3.1 use futures::prelude::*; use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await; } 

Di sini, kata kunci async ditambahkan ke fungsi, yang membungkus nilai pengembalian fungsi di Future . Karena dalam kasus kami ini adalah tuple dari ukuran nol, itu hanya dapat dihilangkan, seperti dalam fungsi biasa.


Kata kunci yang await digunakan di akhir rantai panggilan untuk menunggu eksekusi. Panggilan ini menjeda eksekusi dalam konteks async saat ini dan mentransfer kontrol ke scheduler hingga nilai Future diharapkan siap. Kemudian, eksekusi dilanjutkan dengan await terakhir (menghentikan fungsi dalam contoh kita), yaitu aliran kontrol menjadi non-linear dibandingkan dengan kode sinkron yang sama.


Perbedaan signifikan lainnya adalah adanya blok async di tubuh closure inside stream::unfold . Wrapper ini adalah analog lengkap untuk mendeklarasikan fungsi async baru dengan tubuh yang sama dan memanggil bukan blok async.


# [fitur (async_closure)

Mungkin penutupan ini dapat segera ditulis menggunakan fitur async_closure , tetapi sayangnya, belum diimplementasikan:


 async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } 

Seperti yang Anda lihat, tipe Stream baru berfungsi tidak hanya dengan elemen tipe Result<..> , seperti sebelumnya. Perubahan serupa dibuat pada sifat Future , definisi versi adalah sebagai berikut:


 // futures 0.1 trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } enum Async<T> { Ready(T), NotReady } // futures 0.3 trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending } 

Selain fakta bahwa tipe pengembalian dapat arbitrer, parameter input untuk Future::poll juga telah berubah. Parameter Context baru telah muncul, yang menyediakan antarmuka eksplisit untuk membangunkan tugas saat ini. Sebelumnya, hal yang sama dapat dicapai melalui variabel global pelaksana tertentu (misalnya, dengan memanggil tokio::prelude::task::current().notify() ).


Perbedaan yang lebih mendasar antara antarmuka adalah Anda harus membungkus tautan itu sendiri di Pin . Pembungkus di atas penunjuk ini menjamin "imobilitas" data dalam memori (deskripsi Pin lebih terperinci ada pada rilis 1.33 kompiler pada hub, atau dalam bahasa Inggris, dalam dokumentasi standar std :: pin library).


Mari kita coba jalankan contoh kita sekarang. Sebagai Sink kami mengambil setengah saluran dari masa depan dan di sisi keluaran kami akan mencetak hasilnya dengan penundaan antar iterasi. Pada futures-0.1, kode tersebut dapat ditulis sebagai berikut:


 use std::time::{Duration, Instant}; // futures 0.1.29 use futures::prelude::*; use futures::sync::mpsc; // tokio 0.1.22 use tokio::runtime::Runtime; use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap(); } 

Kode serupa dengan tokio baru (yang pada saat penulisan ini masih alpha) dan futures-0.3 mungkin terlihat seperti ini:


 use std::time::Duration; // futures 0.3.1 use futures::channel::mpsc; use futures::prelude::*; // tokio 0.2.0-alpha.5 use tokio::timer; #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await; } 

Seperti yang Anda lihat, kode dengan futures baru telah menjadi jauh lebih pendek. Menurut pengalaman penulis, jumlah baris selalu keluar secara signifikan lebih sedikit (kadang-kadang bahkan ketika menulis ulang kode sinkron). Tapi menurut saya ada perbedaan yang jauh lebih signifikan dalam keterbacaan dan kurangnya campuran map_err map / map_err , yang diperlukan karena variabilitas kesalahan dalam tipe standar dalam Result<..> .


Penggabung atas elemen tipe Result<..> tetap ada dan terpisah, beberapa dengan nama yang sedikit diperbarui. Sekarang mereka dibagi menjadi dua jenis; yang diterapkan untuk:



Penerapan sifat-sifat Future dan Stream sedikit lebih rumit. Sebagai contoh, mari kita coba menerapkan Stream untuk seri nomor yang sudah dipertimbangkan. Jenis umum untuk kedua versi berjangka adalah sebagai berikut:


 struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } } 

Untuk futures-0.1, implementasinya adalah sebagai berikut:


 impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } } 

Dalam contoh ini, implementasi Stream::poll sebenarnya adalah salinan lengkap dari stream::unfold closure stream::unfold . Dalam hal futures-0.3, implementasinya setara:


 impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } } 

Namun, jika tipe bidang struktur tidak mengimplementasikan Unpin , maka std::ops::DerefMut tidak akan diimplementasikan pada Pin<&mut T> dan dengan demikian tidak akan ada akses yang dapat diubah ke semua bidang:


 use std::marker::PhantomPinned; struct Fact { inner: u32, //    Unpin   _pin: PhantomPinned, } struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <-   // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <-   self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) } } 

Dalam hal ini, dalam satu atau lain bentuk, Anda harus menggunakan fungsi yang tidak aman Pin::get_unchecked_mut dan Pin::map_unchecked_mut untuk mendapatkan "proyeksi" !Unpin bidang ( ada uraian lebih rinci dalam dokumentasi ). Untungnya, untuk kasus-kasus seperti itu, ada pembungkus aman yang diimplementasikan pada peti pin_project (detail implementasi dapat ditemukan dalam dokumentasi perpustakaan ).


 use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } } 

Poin terakhir yang ingin saya soroti adalah inter-stabilitas antara jenis versi yang berbeda. Untuk ini, ada modul futures :: compat , yang memungkinkan Anda mengkonversi dari tipe lama ke yang baru dan sebaliknya. Misalnya, Anda dapat beralih dari Stream dari futures-0.1 menggunakan async-wait:


 use std::fmt::Display; // futures 0.3 use new_futures::compat::Compat01As03 as Compat; use new_futures::StreamExt as _; // futures 0.1 use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>, ) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(()) } 

Catatan: hanya pelaksana tokio yang dianggap dalam artikel sebagai yang paling berumur panjang dan tersebar luas. Namun demikian, dunia tidak berakhir di sana, misalnya, ada alternatif async-std , yang selain menyediakan pembungkus futuristik untuk jenis perpustakaan standar, serta ThreadPool dan LocalPool dari perpustakaan futures-0.3 yang dianggap.

Source: https://habr.com/ru/post/id475272/


All Articles