Migration vers une infrastructure d'attente asynchrone à Rust

migration des oiseaux
source img


Un énorme événement s'est produit pour la communauté Rust la semaine dernière - la version 1.39 du compilateur a été publiée et, avec elle, les fonctionnalités d'attente asynchrone ont été stabilisées. Dans cet article, je vais essayer de résumer tous les changements pertinents dans le compilateur et l'écosystème, ainsi que de fournir des instructions pour la migration vers le paradigme asynchrone. Je ne ferai pas une analyse détaillée de l'asynchronie dans Rust, il y a encore des articles pertinents sur le Habré qui aideront à entrer dans le sujet:



En plus de ces articles, vous pouvez également consulter la documentation de la bibliothèque standard et des caisses nécessaires, ainsi que lire un livre asynchrone (en anglais).


Tous les exemples abordés dans cet article fonctionnent sur le compilateur stable 1.39 et devraient fonctionner sur toutes les versions ultérieures. Le code final est disponible sur github .


Pour implémenter du code asynchrone, la bibliothèque futures-0.1 a été utilisée. Il fournit les futures::Future traits de futures::Future et futures::Stream pour travailler avec l'informatique différée. Ils fonctionnent sur les types Result<..> et fournissent un ensemble de combinateurs. De plus, la bibliothèque fournit des canaux de communication entre les tâches (tâches), diverses interfaces pour travailler avec l'exécuteur et son système de tâches, et plus encore.


Prenons un exemple qui génère une série de nombres à partir des 32 bits de factorielles les plus élevés et les envoie à Sink :


 // futures 0.1.29 use futures::prelude::*; use futures::{stream, futures}; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>, ) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ()) } 

Remarque: la prise en compte des tâches liées au processeur sur les coroutines n'est pas la meilleure application, mais l'exemple est autosuffisant et simple.


Comme vous pouvez le voir, le code semble plutôt lourd: vous devez spécifier la valeur de retour, malgré le fait qu'il n'y ait pas de valeur utile. Dans futures 0.3, le code devient un peu plus facile:


 // futures 0.3.1 use futures::prelude::*; use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await; } 

Ici, la fonction ajoute le async , qui encapsule la valeur de retour de la fonction dans Future . Comme dans notre cas, c'est un tuple de taille nulle, il peut simplement être omis, comme dans les fonctions ordinaires.


Le mot-clé await est utilisé à la fin de la chaîne d'appel pour attendre l'exécution. Cet appel suspend l'exécution dans le contexte asynchrone actuel et transfère le contrôle au planificateur jusqu'à ce que la valeur Future attendue soit prête. Ensuite, l'exécution reprend avec la dernière await (terminant la fonction dans notre exemple), c'est-à-dire le flux de contrôle devient non linéaire par rapport à un code synchrone similaire.


Une autre différence significative est la présence d'un bloc asynchrone dans le corps de la fermeture à l'intérieur du stream::unfold . Ce wrapper est un analogue complet pour déclarer une nouvelle fonction asynchrone avec le même corps et appeler à la place d'un bloc asynchrone.


# [fonctionnalité (async_closure)

Peut-être que cette fermeture peut bientôt être écrite en utilisant la fonction async_closure , mais hélas, elle n'a pas encore été implémentée:


 async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } 

Comme vous pouvez le voir, le nouveau type de Stream fonctionne non seulement avec les éléments du type Result<..> , comme il l'était auparavant. Des modifications similaires ont été apportées au trait Future , les définitions de version sont les suivantes:


 // futures 0.1 trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } enum Async<T> { Ready(T), NotReady } // futures 0.3 trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending } 

Outre le fait que le type de retour peut être arbitraire, les paramètres d'entrée de Future::poll ont également changé. Un nouveau paramètre de Context est apparu, qui fournit une interface explicite pour réveiller la tâche en cours. Auparavant, la même chose pouvait être obtenue grâce aux variables globales d'un exécuteur particulier (par exemple, en appelant tokio::prelude::task::current().notify() ).


La différence la plus fondamentale entre l'interface est que vous devez envelopper le lien vers vous-même dans Pin . Ce wrapper sur le pointeur garantit «l'immobilité» des données en mémoire (une description plus détaillée de Pin est dans la version 1.33 du compilateur sur le hub, ou en anglais, dans la documentation de la bibliothèque standard std :: pin ).


Essayons d'exécuter notre exemple maintenant. En tant que Sink nous prenons la moitié du canal des futures et du côté de la sortie, nous imprimerons le résultat avec un certain retard entre les itérations. Sur futures-0.1, ce code peut être écrit comme suit:


 use std::time::{Duration, Instant}; // futures 0.1.29 use futures::prelude::*; use futures::sync::mpsc; // tokio 0.1.22 use tokio::runtime::Runtime; use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap(); } 

Un code similaire avec le nouveau tokio (qui au moment de l'écriture est toujours alpha) et futures-0.3 pourrait ressembler à ceci:


 use std::time::Duration; // futures 0.3.1 use futures::channel::mpsc; use futures::prelude::*; // tokio 0.2.0-alpha.5 use tokio::timer; #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await; } 

Comme vous pouvez le voir, le code avec de nouveaux futurs est devenu beaucoup plus court. Selon l'expérience de l'auteur, le nombre de lignes sort toujours beaucoup moins (parfois même lors de la réécriture de code synchrone). Mais il me semble qu'il y a une différence beaucoup plus significative de lisibilité et l'absence d'un mélange d' map_err map / map_err , qui étaient nécessaires en raison de la variabilité des erreurs dans les types standard dans Result<..> .


Les combinateurs sur les éléments du type Result<..> néanmoins restés et sont dans des types distincts, certains avec un nom légèrement mis à jour. Maintenant, ils sont divisés en deux types différents; ceux qui sont mis en œuvre pour:



La mise en œuvre des traits Future et Stream est un peu plus compliquée. Par exemple, essayons d'implémenter Stream pour une série de nombres déjà considérée. Le type commun pour les deux versions de futures sera le suivant:


 struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } } 

Pour futures-0.1, l'implémentation sera la suivante:


 impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } } 

Dans cet exemple, l'implémentation de Stream::poll fait une copie complète du stream::unfold fermeture stream::unfold . Dans le cas de futures-0.3, l'implémentation est équivalente:


 impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } } 

Cependant, si le type d'un champ de structure Unpin pas Unpin , std::ops::DerefMut ne sera pas implémenté sur Pin<&mut T> et il n'y aura donc pas d'accès mutable à tous les champs:


 use std::marker::PhantomPinned; struct Fact { inner: u32, //    Unpin   _pin: PhantomPinned, } struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <-   // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <-   self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) } } 

Dans ce cas, sous une forme ou une autre, vous devrez utiliser les fonctions non sécurisées Pin::get_unchecked_mut et Pin::map_unchecked_mut afin d'obtenir une "projection" !Unpin champs ( il y a une description plus détaillée dans la documentation ). Heureusement, pour de tels cas, il y a un wrapper sécurisé implémenté dans la caisse pin_project (les détails de l'implémentation peuvent être trouvés dans la documentation de la bibliothèque ).


 use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } } 

Le dernier point que je voudrais souligner est l'inter-stabilité entre les types de versions différentes. Pour ce faire, il existe un module futures :: compat , qui vous permet de convertir d'anciens types en nouveaux et vice versa. Par exemple, vous pouvez itérer sur Stream partir de futures-0.1 en utilisant async-wait:


 use std::fmt::Display; // futures 0.3 use new_futures::compat::Compat01As03 as Compat; use new_futures::StreamExt as _; // futures 0.1 use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>, ) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(()) } 

Remarque: seul l'exécuteur testamentaire de tokio est considéré dans l'article comme le plus durable et le plus répandu. Néanmoins, le monde ne s'arrête pas là, par exemple, il existe une alternative async-std , qui fournit en outre des wrappers futuristes pour les types de bibliothèque standard, ainsi que ThreadPool et LocalPool de la bibliothèque future-0.3 considérée.

Source: https://habr.com/ru/post/fr475272/


All Articles