Migrando para a infraestrutura de espera assíncrona no Rust

migração de aves
fonte img


Um grande evento aconteceu para a comunidade Rust na semana passada - a versão 1.39 do compilador foi lançada e, junto com ele, os recursos de espera assíncrona foram estabilizados. Neste post, tentarei resumir todas as alterações relevantes no compilador e no ecossistema, além de fornecer instruções para migrar para o paradigma de espera assíncrona. Não farei uma análise detalhada da assincronia no Rust, ainda existem artigos relevantes sobre o Habré que ajudarão a entrar no tópico:



Além desses artigos, você também pode consultar a documentação da biblioteca padrão e as caixas necessárias, além de ler o livro assíncrono (em inglês).


Todos os exemplos discutidos neste artigo funcionam no compilador estável 1.39 e devem funcionar em todas as versões subseqüentes. O código final está disponível no github .


Para implementar código assíncrono, foi utilizada a biblioteca de futuros-0,1 . Ele fornece os traços básicos futures::Future e futures::Stream para trabalhar com computação adiada. Eles operam nos tipos de Result<..> e fornecem um conjunto de combinadores. Além disso, a biblioteca fornece canais para comunicação entre tarefas (tarefas), várias interfaces para trabalhar com o executor e seu sistema de tarefas e muito mais.


Considere um exemplo que gera uma série numérica a partir dos 32 bits de fatoriais mais altos e os envia ao Sink :


 // futures 0.1.29 use futures::prelude::*; use futures::{stream, futures}; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>, ) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ()) } 

Nota: Considerar tarefas ligadas à CPU em corotinas não é a melhor aplicação, mas o exemplo é auto-suficiente e simples.


Como você pode ver, o código parece bastante complicado: você deve especificar o valor de retorno, apesar do fato de não haver um valor útil nele. Nos futuros 0.3, o código fica um pouco mais fácil:


 // futures 0.3.1 use futures::prelude::*; use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await; } 

Aqui, a palavra async chave async é adicionada à função, que envolve o valor de retorno da função em Future . Como, no nosso caso, é uma tupla de tamanho zero, ela pode ser simplesmente omitida, como em funções comuns.


A palavra-chave wait é usada no final da cadeia de chamadas para aguardar a execução. Essa chamada interrompe a execução no contexto assíncrono atual e transfere o controle para o planejador até que o valor Future esperado esteja pronto. Em seguida, a execução continua com o último await (encerrando a função em nosso exemplo), ou seja, o fluxo de controle se torna não linear em comparação com um código síncrono semelhante.


Outra diferença significativa é a presença de um bloco assíncrono no corpo do fechamento dentro do stream::unfold . Esse wrapper é um análogo completo para declarar uma nova função assíncrona com o mesmo corpo e chamada em vez de um bloco assíncrono.


# [recurso (async_closure)

Talvez esse fechamento possa em breve ser escrito usando o recurso async_closure , mas, infelizmente, ainda não foi implementado:


 async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } 

Como você pode ver, o novo tipo de Stream funciona não apenas com elementos do tipo Result<..> , como era antes. Alterações semelhantes foram feitas na característica Future , as definições de versão são as seguintes:


 // futures 0.1 trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } enum Async<T> { Ready(T), NotReady } // futures 0.3 trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending } 

Além do fato de que o tipo de retorno pode ser arbitrário, os parâmetros de entrada para Future::poll também foram alterados. Um novo parâmetro de Context apareceu, o que fornece uma interface explícita para ativar a tarefa atual. Anteriormente, o mesmo podia ser alcançado através de variáveis ​​globais de um executor específico (por exemplo, chamando tokio::prelude::task::current().notify() ).


A diferença mais fundamental entre a interface é que você deve quebrar o link para si mesmo no Pin . Esse invólucro sobre o ponteiro garante a "imobilidade" dos dados na memória (uma descrição mais detalhada do Pin está na versão 1.33 do compilador no hub, ou em inglês, na documentação da biblioteca std :: pin padrão).


Vamos tentar executar o nosso exemplo agora. Como Sink pegamos metade do canal de futuros e, no lado da saída, imprimiremos o resultado com algum atraso entre as iterações. No futuro-0.1, esse código pode ser escrito da seguinte maneira:


 use std::time::{Duration, Instant}; // futures 0.1.29 use futures::prelude::*; use futures::sync::mpsc; // tokio 0.1.22 use tokio::runtime::Runtime; use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap(); } 

Código semelhante com o novo tokio (que no momento da redação ainda é alfa) e os futuros-0,3 podem se parecer com isso:


 use std::time::Duration; // futures 0.3.1 use futures::channel::mpsc; use futures::prelude::*; // tokio 0.2.0-alpha.5 use tokio::timer; #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await; } 

Como você pode ver, o código com os novos futuros ficou muito menor. De acordo com a experiência do autor, o número de linhas sempre sai significativamente menos (às vezes até na reescrita do código síncrono). Mas parece-me que há uma diferença muito mais significativa na legibilidade e na falta de uma combinação de map_err map / map_err , necessárias devido à variabilidade dos erros nos tipos padrão em Result<..> .


Os combinadores sobre elementos do tipo Result<..> permanecem e estão em tipos separados, alguns com um nome ligeiramente atualizado. Agora eles estão divididos em dois tipos diferentes; aqueles que são implementados para:



A implementação das características Future e Stream é um pouco mais complicada. Como exemplo, vamos tentar implementar o Stream para uma série numérica já considerada. O tipo comum para ambas as versões de futuros será o seguinte:


 struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } } 

Para os futuros 0,1, a implementação será a seguinte:


 impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } } 

Neste exemplo, a implementação do Stream::poll na verdade uma cópia completa do stream::unfold . No caso dos futuros 0,3, a implementação é equivalente:


 impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } } 

No entanto, se o tipo de um campo de estrutura não implementar Unpin , std::ops::DerefMut não será implementado no Pin<&mut T> e, portanto, não haverá acesso mutável a todos os campos:


 use std::marker::PhantomPinned; struct Fact { inner: u32, //    Unpin   _pin: PhantomPinned, } struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <-   // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <-   self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) } } 

Nesse caso, de uma forma ou de outra, você terá que usar as funções não seguras Pin::get_unchecked_mut e Pin::map_unchecked_mut para obter uma "projeção" !Unpin campos ( há uma descrição mais detalhada na documentação ). Felizmente, nesses casos, há um wrapper seguro implementado na caixa pin_project (detalhes da implementação podem ser encontrados na documentação da biblioteca ).


 use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } } 

O último ponto que gostaria de destacar é a inter-estabilidade entre os tipos de versões diferentes. Para fazer isso, existe um módulo futures :: compat , que permite converter de tipos antigos para novos e vice-versa. Por exemplo, você pode iterar no Stream partir de futuros-0.1 usando async-waitit:


 use std::fmt::Display; // futures 0.3 use new_futures::compat::Compat01As03 as Compat; use new_futures::StreamExt as _; // futures 0.1 use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>, ) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(()) } 

Nota: apenas o executor do tokio é considerado no artigo como o mais prolongado e difundido. No entanto, o mundo não termina aí, por exemplo, existe uma alternativa async-std , que além disso fornece invólucros futuristas para os tipos de biblioteca padrão, assim como ThreadPool e LocalPool da biblioteca de futuros-0,3 considerada.

Source: https://habr.com/ru/post/pt475272/


All Articles