Migración a infraestructura asíncrona en espera en Rust

migración de aves
fuente img


La semana pasada ocurrió un gran evento para la comunidad de Rust: se lanzó la versión del compilador 1.39 y, en su lugar, se estabilizaron las funciones de espera asíncrona. En esta publicación intentaré resumir todos los cambios relevantes en el compilador y el ecosistema, así como proporcionar instrucciones para migrar al paradigma de espera asíncrona. No haré un análisis detallado de la asincronía en Rust, todavía hay artículos relevantes sobre el Habré que ayudarán a entrar en el tema:



Además de estos artículos, también puede consultar la documentación de la biblioteca estándar y las cajas necesarias, así como leer el libro asíncrono (en inglés).


Todos los ejemplos discutidos en este artículo funcionan en el compilador estable 1.39 y deberían funcionar en todas las versiones posteriores. El código final está disponible en github .


Para implementar el código asincrónico, se utilizó la biblioteca futures-0.1 . Proporciona los rasgos básicos futures::Future y futures::Stream para trabajar con computación diferida. Funcionan en los tipos Result<..> y proporcionan un conjunto de combinadores. Además, la biblioteca proporciona canales para la comunicación entre tareas (tareas), varias interfaces para trabajar con el ejecutor y su sistema de tareas, y más.


Considere un ejemplo que genera una serie de números a partir de los 32 bits más altos de factoriales y los envía a Sink :


 // futures 0.1.29 use futures::prelude::*; use futures::{stream, futures}; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>, ) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ()) } 

Nota: Tener en cuenta las tareas vinculadas a la CPU en las rutinas no es la mejor aplicación, pero el ejemplo es autosuficiente y simple.


Como puede ver, el código parece bastante engorroso: debe especificar el valor de retorno, a pesar del hecho de que no contiene ningún valor útil. En futuros 0.3, el código se vuelve un poco más fácil:


 // futures 0.3.1 use futures::prelude::*; use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await; } 

Aquí, la palabra clave async se agrega a la función, que envuelve el valor de retorno de la función en Future . Como en nuestro caso se trata de una tupla de tamaño cero, simplemente se puede omitir, como en las funciones ordinarias.


La palabra clave await se usa al final de la cadena de llamadas para esperar la ejecución. Esta llamada detiene la ejecución en el contexto asíncrono actual y transfiere el control al planificador hasta que el valor Future esperado esté listo. Luego, la ejecución se reanuda con la última await (terminando la función en nuestro ejemplo), es decir el flujo de control se vuelve no lineal en comparación con un código síncrono similar.


Otra diferencia significativa es la presencia de un bloque asíncrono en el cuerpo del cierre dentro de stream::unfold . Este contenedor es un análogo completo para declarar una nueva función asíncrona con el mismo cuerpo y llamar en lugar de un bloque asíncrono.


# [característica (async_closure)

Quizás este cierre pueda escribirse pronto usando la función async_closure , pero async_closure , aún no se ha implementado:


 async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } 

Como puede ver, el nuevo tipo Stream no solo funciona con elementos del tipo Result<..> , como lo era antes. Se hicieron cambios similares al rasgo Future , las definiciones de versión son las siguientes:


 // futures 0.1 trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } enum Async<T> { Ready(T), NotReady } // futures 0.3 trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending } 

Además del hecho de que el tipo de retorno puede ser arbitrario, los parámetros de entrada para Future::poll también han cambiado. Ha aparecido un nuevo parámetro Context , que proporciona una interfaz explícita para activar la tarea actual. Anteriormente, se podía lograr lo mismo a través de variables globales de un ejecutor particular (por ejemplo, llamando a tokio::prelude::task::current().notify() ).


La diferencia más fundamental entre la interfaz es que debe envolver el enlace en Pin . Este contenedor sobre el puntero garantiza la "inmovilidad" de los datos en la memoria (una descripción más detallada de Pin encuentra en la versión 1.33 del compilador en el concentrador, o en inglés, en la documentación de la biblioteca estándar std :: pin ).


Intentemos ejecutar nuestro ejemplo ahora. Como Sink tomamos la mitad del canal de futuros y en el lado de salida imprimiremos el resultado con cierto retraso entre las iteraciones. En futuros-0.1, dicho código se puede escribir de la siguiente manera:


 use std::time::{Duration, Instant}; // futures 0.1.29 use futures::prelude::*; use futures::sync::mpsc; // tokio 0.1.22 use tokio::runtime::Runtime; use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap(); } 

Código similar con el nuevo tokio (que en el momento de la escritura aún es alfa) y futuros-0.3 podría verse así:


 use std::time::Duration; // futures 0.3.1 use futures::channel::mpsc; use futures::prelude::*; // tokio 0.2.0-alpha.5 use tokio::timer; #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await; } 

Como puede ver, el código con los nuevos futuros se ha vuelto mucho más corto. Según la experiencia del autor, el número de líneas siempre sale significativamente menos (a veces incluso cuando se reescribe el código síncrono). Pero me parece que hay una diferencia mucho más significativa en la legibilidad y la falta de una combinación de map_err map / map_err , que fueron necesarias debido a la variabilidad de los errores en los tipos estándar en Result<..> .


Sin embargo, los combinadores sobre elementos del tipo Result<..> permanecieron y están en tipos separados, algunos con un nombre ligeramente actualizado. Ahora se dividen en dos tipos diferentes; aquellos que están implementados para:



La implementación de los rasgos Future y Stream es un poco más complicada. Como ejemplo, intentemos implementar Stream para una serie de números ya considerada. El tipo común para ambas versiones de futuros será el siguiente:


 struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } } 

Para futuros-0.1, la implementación será la siguiente:


 impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } } 

En este ejemplo, la implementación de Stream::poll realidad una copia completa del stream::unfold cierre stream::unfold . En el caso de futuros-0.3, la implementación es equivalente:


 impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } } 

Sin embargo, si el tipo de campo de estructura no implementa Unpin , entonces std::ops::DerefMut no se implementará en Pin<&mut T> y, por lo tanto, no habrá acceso mutable a todos los campos:


 use std::marker::PhantomPinned; struct Fact { inner: u32, //    Unpin   _pin: PhantomPinned, } struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <-   // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <-   self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) } } 

En este caso, de una forma u otra, tendrá que usar las funciones inseguras Pin::get_unchecked_mut y Pin::map_unchecked_mut para obtener una "proyección" !Unpin campos ( hay una descripción más detallada en la documentación ). Afortunadamente, para tales casos, hay un contenedor seguro implementado en la caja pin_project (los detalles de la implementación se pueden encontrar en la documentación de la biblioteca ).


 use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } } 

El último punto que me gustaría destacar es la interestabilidad entre los tipos de diferentes versiones. Para hacer esto, hay un módulo futures :: compat , que le permite convertir de tipos antiguos a nuevos y viceversa. Por ejemplo, puede iterar sobre Stream desde futuros-0.1 usando async-await:


 use std::fmt::Display; // futures 0.3 use new_futures::compat::Compat01As03 as Compat; use new_futures::StreamExt as _; // futures 0.1 use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>, ) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(()) } 

Nota: solo el ejecutor tokio se considera en el artículo como el más longevo y extendido. Sin embargo, el mundo no termina allí, por ejemplo, hay una alternativa async-std , que además proporciona envoltorios futuristas para tipos de la biblioteca estándar, así como ThreadPool y LocalPool de la biblioteca considerada futures-0.3.

Source: https://habr.com/ru/post/475272/


All Articles