迁移到Rust中的异步等待基础架构

鸟类迁徙
img来源


上周,Rust社区发生了一件大事- 编译器版本1.39发行了 ,并且相应地稳定了async-await功能。 在本文中,我将尝试总结编译器和生态系统中的所有相关更改,并提供异步等待模式的迁移说明。 我将不对Rust中的异步进行详细的分析,在Habré上仍然有相关的文章可以帮助您进入该主题:



除了这些文章之外,您还可以参考标准库的文档和必要的包装箱,以及阅读异步书 (英语)。


本文讨论的所有示例都适用于稳定的编译器1.39,并且应适用于所有后续版本。 最终代码可在github上找到


为了实现异步代码,使用了futures-0.1库。 它提供了futures::Futurefutures::Stream的基本特征,用于处理延迟计算。 它们对Result<..>类型进行操作,并提供一组组合器。 此外,该库提供了任务(任务)之间通信的通道,与执行者及其任务系统一起使用的各种接口等。


考虑一个示例,该示例从阶乘的最高32位生成一个数字序列,并将其发送给Sink


 // futures 0.1.29 use futures::prelude::*; use futures::{stream, futures}; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>, ) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ()) } 

注意:考虑在协程上绑定CPU的任务不是最好的应用程序,但是该示例是自给自足且简单的。


如您所见,代码看起来很麻烦:尽管其中没有任何有用的值,但您必须指定返回值。 在期货0.3中,代码变得更加简单:


 // futures 0.3.1 use futures::prelude::*; use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await; } 

在这里,将async关键字添加到该函数中,该关键字将函数的返回值包装在Future 。 由于在我们的例子中,这是一个零大小的元组,因此可以像普通函数中那样简单地将其省略。


调用链的末尾使用await关键字等待执行。 此调用将暂停当前异步上下文中的执行,并将控制权转移到调度程序,直到准备好预期的Future值。 然后,执行在最后await情况下恢复执行(在我们的示例中终止函数),即 与类似的同步代码相比,控制流程变得非线性。


另一个重要的区别是stream::unfold内的闭包主体中存在异步块。 该包装器完全类似于声明具有相同主体并调用而不是异步块的新异步函数。


#[功能(async_closure)

也许可以很快使用async_closure功能来编写此闭包,但是async_closure是,它尚未实现:


 async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } 

如您所见,新的Stream类型不仅像以前一样适用于Result<..>元素。 对Future trait进行了类似的更改,版本定义如下:


 // futures 0.1 trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } enum Async<T> { Ready(T), NotReady } // futures 0.3 trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending } 

除了返回类型可以是任意值这一事实之外, Future::poll的输入参数也已更改。 出现了一个新的Context参数,该参数提供了用于唤醒当前任务的显式接口。 以前,可以通过特定执行程序的全局变量来实现同一件事(例如,通过调用tokio::prelude::task::current().notify() )。


接口之间最根本的区别是您必须将链接包装到Pin 。 指针上的这个包装器保证了内存中数据的“固定性”(有关Pin的更详细说明,请参见集线器上的编译器1.33版本 ,或者使用英语,参见标准std :: pin库的文档)。


让我们现在尝试运行我们的示例。 作为Sink我们从期货获得一半的通道,而在输出端,我们将在迭代之间有一些延迟的情况下打印结果。 在futures-0.1上,此类代码可以编写如下:


 use std::time::{Duration, Instant}; // futures 0.1.29 use futures::prelude::*; use futures::sync::mpsc; // tokio 0.1.22 use tokio::runtime::Runtime; use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap(); } 

新tokio的类似代码(在撰写本文时仍为alpha),并且futures-0.3可能看起来像这样:


 use std::time::Duration; // futures 0.3.1 use futures::channel::mpsc; use futures::prelude::*; // tokio 0.2.0-alpha.5 use tokio::timer; #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await; } 

如您所见,带有新期货的代码变得更短了。 根据作者的经验,行数总是少得多(有时甚至重写同步代码时)。 但是在我看来,可读性上存在更大得多的差异,并且缺少map / map_err的混合,这是必需的,这是由于Result<..>中标准类型中的错误具有可变性。


尽管如此,仍保留了Result<..>类型的元素上的组合,并且处于单独的类型中,其中一些名称略有更新。 现在,它们分为两种不同的类型: 为以下目的而实施的:



FutureStream特性的实现稍微复杂一些。 例如,让我们尝试为已经考虑过的数字系列实现Stream 。 两种版本的期货的通用类型如下:


 struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } } 

对于futures-0.1,实现如下:


 impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } } 

在此示例中, Stream::poll的实现实际上是闭包stream::unfold的完整副本。 对于Futures-0.3,实现等效:


 impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } } 

但是,如果结构字段的类型未实现std::ops::DerefMut则不会在Pin<&mut T>上实现std::ops::DerefMut ,因此将无法对所有字段进行可变访问:


 use std::marker::PhantomPinned; struct Fact { inner: u32, //    Unpin   _pin: PhantomPinned, } struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <-   // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <-   self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) } } 

在这种情况下,您将不得不使用不安全的功能Pin::get_unchecked_mutPin::map_unchecked_mut才能获得“投影” !Unpin字段( 文档中有更详细的说明)。 幸运的是,在这种情况下,pin_project板条箱中实现了一个安全包装器(有关实现的详细信息,请参见库文档 )。


 use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } } 

我要强调的最后一点是不同版本类型之间的相互稳定性。 为此,有一个模块futures :: compat ,它允许您从旧类型转换为新类型,反之亦然。 例如,您可以使用async-await从Futures-0.1遍历Stream


 use std::fmt::Display; // futures 0.3 use new_futures::compat::Compat01As03 as Compat; use new_futures::StreamExt as _; // futures 0.1 use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>, ) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(()) } 

注意:本文中仅将Tokio执行器视为寿命最长且使用最广泛的执行器。 尽管如此,世界还不止于此,例如,还有一个替代的async-std ,它还为标准库的类型以及考虑中的futures-0.3库的ThreadPoolLocalPool提供了未来派的包装。

Source: https://habr.com/ru/post/zh-CN475272/


All Articles