Rust рдореЗрдВ async-рдкреНрд░рддреАрдХреНрд╖рд┐рдд рдмреБрдирд┐рдпрд╛рджреА рдврд╛рдВрдЪреЗ рдХреА рдУрд░ рдкрд▓рд╛рдпрди

рдкрдХреНрд╖рд┐рдпреЛрдВ рдХрд╛ рдкреНрд░рд╡рд╛рд╕
img рд╕реНрд░реЛрдд


рдкрд┐рдЫрд▓реЗ рд╕рдкреНрддрд╛рд╣ рд░рд╕реНрдЯ рд╕рдореБрджрд╛рдп рдХреЗ рд▓рд┐рдП рдПрдХ рдмрдбрд╝реА рдШрдЯрдирд╛ рд╣реБрдИ - рд╕рдВрдХрд▓рдХ рд╕рдВрд╕реНрдХрд░рдг 1.39 рдЬрд╛рд░реА рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛ , рдФрд░ рдЗрд╕рдХреЗ рд╕реНрдерд╛рди рдкрд░ рдПрд╕рд┐рдВрдХреНрд╕-рд╡реЗрдЯ рд╕реБрд╡рд┐рдзрд╛рдУрдВ рдХрд╛ рд╕реНрдерд┐рд░реАрдХрд░рдг рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛ ред рдЗрд╕ рдкреЛрд╕реНрдЯ рдореЗрдВ, рдореИрдВ рдХрдВрдкрд╛рдЗрд▓рд░ рдФрд░ рдЗрдХреЛрд╕рд┐рд╕реНрдЯрдо рдореЗрдВ рд╕рднреА рдкреНрд░рд╛рд╕рдВрдЧрд┐рдХ рдкрд░рд┐рд╡рд░реНрддрдиреЛрдВ рдХреЛ рд╕рдВрдХреНрд╖реЗрдк рдореЗрдВ рдкреНрд░рд╕реНрддреБрдд рдХрд░рдиреЗ рдХреА рдХреЛрд╢рд┐рд╢ рдХрд░реВрдВрдЧрд╛, рд╕рд╛рде рд╣реА рд╕рд╛рде рдПрд╕рд┐рдВрдХреНрд╕-рд╡реЗрдЯ рдкреНрд░рддрд┐рдорд╛рди рдХреЗ рд▓рд┐рдП рдорд╛рдЗрдЧреНрд░реЗрд╢рди рдирд┐рд░реНрджреЗрд╢ рднреА рдкреНрд░рджрд╛рди рдХрд░реВрдВрдЧрд╛ред рдореИрдВ рд░рд╕реНрдЯ рдореЗрдВ рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХрд╛ рд╡рд┐рд╕реНрддреГрдд рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдирд╣реАрдВ рдХрд░реВрдВрдЧрд╛, рдЕрднреА рднреА рд╣реИрдмреЗ рдкрд░ рдкреНрд░рд╛рд╕рдВрдЧрд┐рдХ рд▓реЗрдЦ рд╣реИрдВ рдЬреЛ рдЗрд╕ рд╡рд┐рд╖рдп рдкрд░ рдкрд╣реБрдВрдЪрдиреЗ рдореЗрдВ рдорджрдж рдХрд░реЗрдВрдЧреЗ:



рдЗрди рд▓реЗрдЦреЛрдВ рдХреЗ рдЕрд▓рд╛рд╡рд╛, рдЖрдк рдорд╛рдирдХ рдкреБрд╕реНрддрдХрд╛рд▓рдп рдФрд░ рдЖрд╡рд╢реНрдпрдХ рдмрдХреНрд╕реЗ рдХреЗ рдкреНрд░рд▓реЗрдЦрди рдХрд╛ рднреА рдЙрд▓реНрд▓реЗрдЦ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рд╕рд╛рде рд╣реА рд╕рд╛рде async-book (рдЕрдВрдЧреНрд░реЗрдЬреА рдореЗрдВ) рднреА рдкрдврд╝ рд╕рдХрддреЗ рд╣реИрдВред


рдЗрд╕ рдЖрд▓реЗрдЦ рдореЗрдВ рдЪрд░реНрдЪрд╛ рдХрд┐рдП рдЧрдП рд╕рднреА рдЙрджрд╛рд╣рд░рдг рд╕реНрдерд┐рд░ рд╕рдВрдХрд▓рдХ 1.39 рдкрд░ рдХрд╛рдо рдХрд░рддреЗ рд╣реИрдВ рдФрд░ рдмрд╛рдж рдХреЗ рд╕рднреА рд╕рдВрд╕реНрдХрд░рдгреЛрдВ рдкрд░ рдХрд╛рдо рдХрд░рдирд╛ рдЪрд╛рд╣рд┐рдПред рдЕрдВрддрд┐рдо рдХреЛрдб рдЬреАрдердм рдкрд░ рдЙрдкрд▓рдмреНрдз рд╣реИред


рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рдХреЛрдб рдХреЛ рд▓рд╛рдЧреВ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рд╡рд╛рдпрджрд╛-0.1 рдкреБрд╕реНрддрдХрд╛рд▓рдп рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛ред рдпрд╣ рдЖрдзрд╛рд░ рд▓рдХреНрд╖рдг futures::Future рдкреНрд░рджрд╛рди рдХрд░рддрд╛ рд╣реИ futures::Future рдФрд░ futures::Stream рд╕реНрдердЧрд┐рдд рдХрдВрдкреНрдпреВрдЯрд┐рдВрдЧ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП futures::Stream ред рд╡реЗ Result<..> рдкреНрд░рдХрд╛рд░реЛрдВ рдкрд░ рдХрд╛рдо рдХрд░рддреЗ рд╣реИрдВ рдФрд░ рдХреЙрдореНрдмрд┐рдиреЗрдЯрд░ рдХрд╛ рдПрдХ рд╕реЗрдЯ рдкреНрд░рджрд╛рди рдХрд░рддреЗ рд╣реИрдВред рдЗрд╕рдХреЗ рдЕрд▓рд╛рд╡рд╛, рдкреБрд╕реНрддрдХрд╛рд▓рдп рдХрд╛рд░реНрдпреЛрдВ (рдХрд╛рд░реНрдпреЛрдВ) рдХреЗ рдмреАрдЪ рд╕рдВрдЪрд╛рд░ рдХреЗ рд▓рд┐рдП рдЪреИрдирд▓ рдкреНрд░рджрд╛рди рдХрд░рддрд╛ рд╣реИ, рдирд┐рд╖реНрдкрд╛рджрдХ рдФрд░ рдЙрд╕рдХреА рдХрд╛рд░реНрдп рдкреНрд░рдгрд╛рд▓реА рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╡рд┐рднрд┐рдиреНрди рдЗрдВрдЯрд░рдлреЗрд╕, рдФрд░ рдмрд╣реБрдд рдХреБрдЫред


рдПрдХ рдЙрджрд╛рд╣рд░рдг рдкрд░ рд╡рд┐рдЪрд╛рд░ рдХрд░реЗрдВ рдЬреЛ рдЙрдЪреНрдЪрддрдо 32 рдмрд┐рдЯрдХреЙрдиреНрд╕ рд╕реЗ рдПрдХ рд╢реНрд░реГрдВрдЦрд▓рд╛ рдмрдирд╛рддрд╛ рд╣реИ рдФрд░ рдЙрдиреНрд╣реЗрдВ Sink рднреЗрдЬрддрд╛ рд╣реИ:


 // futures 0.1.29 use futures::prelude::*; use futures::{stream, futures}; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>, ) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ()) } 

рдиреЛрдЯ: рдХреЛрд░рдЯрд╛рдЗрди рдкрд░ рд╕реАрдкреАрдпреВ-рдмрд╛рдЙрдВрдб рдХрд╛рд░реНрдпреЛрдВ рдХреЛ рдзреНрдпрд╛рди рдореЗрдВ рд░рдЦрддреЗ рд╣реБрдП рд╕рдмрд╕реЗ рдЕрдЪреНрдЫрд╛ рдЕрдиреБрдкреНрд░рдпреЛрдЧ рдирд╣реАрдВ рд╣реИ, рд▓реЗрдХрд┐рди рдЙрджрд╛рд╣рд░рдг рдЖрддреНрдордирд┐рд░реНрднрд░ рдФрд░ рд╕рд░рд▓ рд╣реИред


рдЬреИрд╕рд╛ рдХрд┐ рдЖрдк рджреЗрдЦ рд╕рдХрддреЗ рд╣реИрдВ, рдХреЛрдб рдмрд▓реНрдХрд┐ рдмреЛрдЭрд┐рд▓ рд▓рдЧ рд░рд╣рд╛ рд╣реИ: рдЖрдкрдХреЛ рдЗрд╕ рддрдереНрдп рдХреЗ рдмрд╛рд╡рдЬреВрдж рдХрд┐ рдХреЛрдИ рдЙрдкрдпреЛрдЧреА рдореВрд▓реНрдп рдирд╣реАрдВ рд╣реИ, рд╡рд╛рдкрд╕реА рдореВрд▓реНрдп рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рдирд╛ рд╣реЛрдЧрд╛ред рд╡рд╛рдпрджрд╛ 0.3 рдореЗрдВ, рдХреЛрдб рдереЛрдбрд╝рд╛ рдЖрд╕рд╛рди рд╣реЛ рдЬрд╛рддрд╛ рд╣реИ:


 // futures 0.3.1 use futures::prelude::*; use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await; } 

рдпрд╣рд╛рдВ, рдлрд╝рдВрдХреНрд╢рди рдореЗрдВ async рдХреАрд╡рд░реНрдб рдЬреЛрдбрд╝рд╛ рдЬрд╛рддрд╛ рд╣реИ, рдЬреЛ Future рдореЗрдВ рдлрд╝рдВрдХреНрд╢рди рдХреЗ рд░рд┐рдЯрд░реНрди рдорд╛рди рдХреЛ рд▓рдкреЗрдЯрддрд╛ рд╣реИред рдЪреВрдВрдХрд┐ рд╣рдорд╛рд░реЗ рдорд╛рдорд▓реЗ рдореЗрдВ рдпрд╣ рд╢реВрдиреНрдп рдЖрдХрд╛рд░ рдХрд╛ рдПрдХ рдЯрдкрд▓ рд╣реИ, рдЗрд╕рд▓рд┐рдП рдЗрд╕реЗ рд╕рд╛рдзрд╛рд░рдг рдХрд╛рд░реНрдпреЛрдВ рдХреА рддрд░рд╣ рд╣реА рдЫреЛрдбрд╝рд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред


рдирд┐рд╖реНрдкрд╛рджрди рдХреЗ рд▓рд┐рдП рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдХреЙрд▓ рд╢реНрд░реГрдВрдЦрд▓рд╛ рдХреЗ рдЕрдВрдд рдореЗрдВ await рдХреАрд╡рд░реНрдб рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред рдпрд╣ рдХреЙрд▓ рд╡рд░реНрддрдорд╛рди async рд╕рдВрджрд░реНрдн рдореЗрдВ рдирд┐рд╖реНрдкрд╛рджрди рдХреЛ рд░реЛрдХрддреА рд╣реИ рдФрд░ рдЬрдм рддрдХ рдХрд┐ Future рдХрд╛ рдЕрдкреЗрдХреНрд╖рд┐рдд рдорд╛рди рддреИрдпрд╛рд░ рдирд╣реАрдВ рд╣реЛ рдЬрд╛рддрд╛ рд╣реИ, рддрдм рддрдХ рдЗрд╕реЗ рд╢реЗрдбреНрдпреВрд▓рд░ рдХреЛ рдирд┐рдпрдВрддреНрд░рд┐рдд рдХрд░рддрд╛ рд╣реИред рдлрд┐рд░, рдирд┐рд╖реНрдкрд╛рджрди рдЕрдВрддрд┐рдо await (рд╣рдорд╛рд░реЗ рдЙрджрд╛рд╣рд░рдг рдореЗрдВ рдлрд╝рдВрдХреНрд╢рди рдХреЛ рд╕рдорд╛рдкреНрдд рдХрд░рдирд╛) рдХреЗ рд╕рд╛рде рдлрд┐рд░ рд╕реЗ рд╢реБрд░реВ рд╣реЛрддрд╛ рд╣реИ, рдЕрд░реНрдерд╛рддред рдирд┐рдпрдВрддреНрд░рдг рдкреНрд░рд╡рд╛рд╣ рд╕рдорд╛рди рд╕рдордХрд╛рд▓рд┐рдХ рдХреЛрдб рдХреА рддреБрд▓рдирд╛ рдореЗрдВ рдЧреИрд░-рд░реИрдЦрд┐рдХ рд╣реЛ рдЬрд╛рддрд╛ рд╣реИред


рдПрдХ рдЕрдиреНрдп рдорд╣рддреНрд╡рдкреВрд░реНрдг рдЕрдВрддрд░ stream::unfold рдЕрдВрджрд░ рдмрдВрдж рд╣реЛрдиреЗ рдХреЗ рд╢рд░реАрд░ рдореЗрдВ рдПрдХ async рдмреНрд▓реЙрдХ рдХреА рдЙрдкрд╕реНрдерд┐рддрд┐ рд╣реИ stream::unfold ред рдпрд╣ рдЖрд╡рд░рдг рдПрдХ рд╣реА рд╢рд░реАрд░ рдХреЗ рд╕рд╛рде рдПрдХ рдирдпрд╛ async рдлрд╝рдВрдХреНрд╢рди рдШреЛрд╖рд┐рдд рдХрд░рдиреЗ рдФрд░ рдПрдХ async рдмреНрд▓реЙрдХ рдХреЗ рдмрдЬрд╛рдп рдХреЙрд▓ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдПрдХ рдкреВрд░реНрдг рдПрдирд╛рд▓реЙрдЧ рд╣реИред


# рд╕реБрд╡рд┐рдзрд╛ (async_closure)

рд╢рд╛рдпрдж рдпрд╣ рдХреНрд▓реЛрдЬрд░ рдЬрд▓реНрдж рд╣реА async_closure рд╕реБрд╡рд┐рдзрд╛ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рд▓рд┐рдЦрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ, рд▓реЗрдХрд┐рди рдЕрдлрд╕реЛрд╕, рдЗрд╕реЗ рдЕрднреА рддрдХ рд▓рд╛рдЧреВ рдирд╣реАрдВ рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ:


 async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } 

рдЬреИрд╕рд╛ рдХрд┐ рдЖрдк рджреЗрдЦ рд╕рдХрддреЗ рд╣реИрдВ, рдирдпрд╛ Stream рдкреНрд░рдХрд╛рд░ рди рдХреЗрд╡рд▓ Result<..> рддрддреНрд╡реЛрдВ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рддрд╛ рд╣реИ Result<..> , рдЬреИрд╕рд╛ рдХрд┐ рдкрд╣рд▓реЗ рдерд╛ред Future рд▓рдХреНрд╖рдг рдХреЗ рд╕рдорд╛рди рдкрд░рд┐рд╡рд░реНрддрди рдХрд┐рдП рдЧрдП рдереЗ, рд╕рдВрд╕реНрдХрд░рдг рдХреА рдкрд░рд┐рднрд╛рд╖рд╛рдПрдВ рдЗрд╕ рдкреНрд░рдХрд╛рд░ рд╣реИрдВ:


 // futures 0.1 trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } enum Async<T> { Ready(T), NotReady } // futures 0.3 trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending } 

рдЗрд╕ рддрдереНрдп рдХреЗ рдЕрд▓рд╛рд╡рд╛ рдХрд┐ рд░рд┐рдЯрд░реНрди рдкреНрд░рдХрд╛рд░ рдордирдорд╛рдирд╛ рд╣реЛ рд╕рдХрддрд╛ рд╣реИ, Future::poll рдХреЗ рдЗрдирдкреБрдЯ рдкреИрд░рд╛рдореАрдЯрд░ рднреА рдмрджрд▓ рдЧрдП рд╣реИрдВред рдПрдХ рдирдпрд╛ Context рдкреИрд░рд╛рдореАрдЯрд░ рджрд┐рдЦрд╛рдИ рджрд┐рдпрд╛ рд╣реИ, рдЬреЛ рд╡рд░реНрддрдорд╛рди рдХрд╛рд░реНрдп рдХреЛ рдЬрд╛рдЧрдиреЗ рдХреЗ рд▓рд┐рдП рдПрдХ рд╕реНрдкрд╖реНрдЯ рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдкреНрд░рджрд╛рди рдХрд░рддрд╛ рд╣реИред рдЗрд╕рд╕реЗ рдкрд╣рд▓реЗ, рдПрдХ рд╣реА рдЪреАрдЬрд╝ рдХреЛ рдХрд┐рд╕реА рд╡рд┐рд╢реЗрд╖ рдирд┐рд╖реНрдкрд╛рджрдХ рдХреЗ рд╡реИрд╢реНрд╡рд┐рдХ рдЪрд░ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдкреНрд░рд╛рдкреНрдд рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ (рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, tokio::prelude::task::current().notify() рдХреЛ рдХреЙрд▓ рдХрд░рдХреЗ tokio::prelude::task::current().notify() )ред


рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдХреЗ рдмреАрдЪ рдЕрдзрд┐рдХ рдореМрд▓рд┐рдХ рдЕрдВрддрд░ рдпрд╣ рд╣реИ рдХрд┐ рдЖрдкрдХреЛ рдЕрдкрдиреЗ рдЖрдк рдХреЛ Pin рдореЗрдВ рд▓рд┐рдВрдХ рд▓рдкреЗрдЯрдирд╛ рд╣реЛрдЧрд╛ред рдкреЙрдЗрдВрдЯрд░ рдкрд░ рдпрд╣ рдЖрд╡рд░рдг рд╕реНрдореГрддрд┐ рдореЗрдВ рдбреЗрдЯрд╛ рдХреА "рдЧрддрд┐рд╣реАрдирддрд╛" рдХреА рдЧрд╛рд░рдВрдЯреА рджреЗрддрд╛ рд╣реИ ( Pin рдХрд╛ рдЕрдзрд┐рдХ рд╡рд┐рд╕реНрддреГрдд рд╡рд┐рд╡рд░рдг рд╣рдм рдкрд░ рдХрдВрдкрд╛рдЗрд▓рд░ рдХреА 1.33 рд░рд┐рд▓реАрдЬ рдореЗрдВ рд╣реИ , рдпрд╛ рдЕрдВрдЧреНрд░реЗрдЬреА рдореЗрдВ, рдорд╛рдирдХ std :: pin рдкреБрд╕реНрддрдХрд╛рд▓рдп рдХреЗ рдкреНрд░рд▓реЗрдЦрди рдореЗрдВ)ред


рдЖрдЗрдП рдЕрдм рд╣рдорд╛рд░реЗ рдЙрджрд╛рд╣рд░рдг рдХреЛ рдЪрд▓рд╛рдиреЗ рдХрд╛ рдкреНрд░рдпрд╛рд╕ рдХрд░реЗрдВред Sink рд░реВрдк рдореЗрдВ рд╣рдо рд╡рд╛рдпрджрд╛ рд╕реЗ рдЖрдзрд╛ рдЪреИрдирд▓ рд▓реЗрддреЗ рд╣реИрдВ рдФрд░ рдЖрдЙрдЯрдкреБрдЯ рдкрдХреНрд╖ рдкрд░ рд╣рдо рдкреБрдирд░рд╛рд╡реГрддреНрддрд┐рдпреЛрдВ рдХреЗ рдмреАрдЪ рдХреБрдЫ рджреЗрд░реА рдХреЗ рд╕рд╛рде рдкрд░рд┐рдгрд╛рдо рдкреНрд░рд┐рдВрдЯ рдХрд░реЗрдВрдЧреЗред рд╡рд╛рдпрджрд╛-0.1 рдкрд░, рдЗрд╕ рддрд░рд╣ рдХреЗ рдХреЛрдб рдХреЛ рдирд┐рдореНрдирд╛рдиреБрд╕рд╛рд░ рд▓рд┐рдЦрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ:


 use std::time::{Duration, Instant}; // futures 0.1.29 use futures::prelude::*; use futures::sync::mpsc; // tokio 0.1.22 use tokio::runtime::Runtime; use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap(); } 

рдирдП рдЯреЛрдХрд┐рдУ рдХреЗ рд╕рд╛рде рд╕рдорд╛рди рдХреЛрдб (рдЬреЛ рд▓реЗрдЦрди рдХреЗ рд╕рдордп рдЕрднреА рднреА рдЕрд▓реНрдлрд╝рд╛ рд╣реИ) рдФрд░ рд╡рд╛рдпрджрд╛-0.3 рдЗрд╕ рддрд░рд╣ рджрд┐рдЦ рд╕рдХрддрд╛ рд╣реИ:


 use std::time::Duration; // futures 0.3.1 use futures::channel::mpsc; use futures::prelude::*; // tokio 0.2.0-alpha.5 use tokio::timer; #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await; } 

рдЬреИрд╕рд╛ рдХрд┐ рдЖрдк рджреЗрдЦ рд╕рдХрддреЗ рд╣реИрдВ, рдирдП рд╡рд╛рдпрджрд╛ рдХреЗ рд╕рд╛рде рдХреЛрдб рдмрд╣реБрдд рдХрдо рд╣реЛ рдЧрдпрд╛ рд╣реИред рд▓реЗрдЦрдХ рдХреЗ рдЕрдиреБрднрд╡ рдХреЗ рдЕрдиреБрд╕рд╛рд░, рд▓рд╛рдЗрдиреЛрдВ рдХреА рд╕рдВрдЦреНрдпрд╛ рд╣рдореЗрд╢рд╛ рдХрд╛рдлреА рдХрдо рдирд┐рдХрд▓рддреА рд╣реИ (рдХрднреА-рдХрднреА рд╕рдордХрд╛рд▓рд┐рдХ рдХреЛрдб рдХреЛ рдлрд┐рд░ рд╕реЗ рд▓рд┐рдЦрддреЗ рд╕рдордп рднреА)ред рд▓реЗрдХрд┐рди рдпрд╣ рдореБрдЭреЗ рд▓рдЧрддрд╛ рд╣реИ рдХрд┐ рдкрдардиреАрдпрддрд╛ рдФрд░ map / map_err рдорд┐рд╢реНрд░рдг рдХреА рдХрдореА рдореЗрдВ рдмрд╣реБрдд рдЕрдзрд┐рдХ рдЕрдВрддрд░ рд╣реИ, рдЬреЛ рдХрд┐ Result<..> рдореЗрдВ рдорд╛рдирдХ рдкреНрд░рдХрд╛рд░реЛрдВ рдореЗрдВ рддреНрд░реБрдЯрд┐рдпреЛрдВ рдХреА рдкрд░рд┐рд╡рд░реНрддрдирд╢реАрд▓рддрд╛ рдХреЗ рдХрд╛рд░рдг рдЖрд╡рд╢реНрдпрдХ рдереЗред


Result<..> рддрддреНрд╡реЛрдВ рдкрд░ рд╕рдВрдпреЛрдЬрдХ Result<..> рдкреНрд░рдХрд╛рд░ рдЕрднреА рднреА рдмрдиреЗ рд╣реБрдП рд╣реИрдВ рдФрд░ рдЕрд▓рдЧ-рдЕрд▓рдЧ рдкреНрд░рдХрд╛рд░реЛрдВ рдореЗрдВ рд╣реИрдВ, рдХреБрдЫ рдереЛрдбрд╝рд╛ рдЕрджреНрдпрддрди рдирд╛рдо рдХреЗ рд╕рд╛рдеред рдЕрдм рд╡реЗ рджреЛ рдЕрд▓рдЧ-рдЕрд▓рдЧ рдкреНрд░рдХрд╛рд░реЛрдВ рдореЗрдВ рд╡рд┐рднрд╛рдЬрд┐рдд рд╣реИрдВ; рдЬрд┐рдиреНрд╣реЗрдВ рд▓рд╛рдЧреВ рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ:



Future рдФрд░ Stream рд▓рдХреНрд╖рдгреЛрдВ рдХрд╛ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдереЛрдбрд╝рд╛ рдЕрдзрд┐рдХ рдЬрдЯрд┐рд▓ рд╣реИред рдПрдХ рдЙрджрд╛рд╣рд░рдг рдХреЗ рд░реВрдк рдореЗрдВ, рдЖрдЗрдП рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рд╡рд┐рдЪрд╛рд░ рдХреА рдЧрдИ рд╕рдВрдЦреНрдпрд╛ рд╢реНрд░реГрдВрдЦрд▓рд╛ рдХреЗ рд▓рд┐рдП Stream рдХреЛ рд▓рд╛рдЧреВ рдХрд░рдиреЗ рдХрд╛ рдкреНрд░рдпрд╛рд╕ рдХрд░реЗрдВред рд╡рд╛рдпрджрд╛ рдХреЗ рджреЛрдиреЛрдВ рд╕рдВрд╕реНрдХрд░рдгреЛрдВ рдХреЗ рд▓рд┐рдП рд╕рд╛рдорд╛рдиреНрдп рдкреНрд░рдХрд╛рд░ рдирд┐рдореНрдирд╛рдиреБрд╕рд╛рд░ рд╣реЛрдВрдЧреЗ:


 struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } } 

рд╡рд╛рдпрджрд╛ -рез.реж рдХреЗ рд▓рд┐рдП, рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдирд┐рдореНрдирд╛рдиреБрд╕рд╛рд░ рд╣реЛрдЧрд╛:


 impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } } 

рдЗрд╕ рдЙрджрд╛рд╣рд░рдг рдореЗрдВ, Stream::poll рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ рдХреНрд▓реЛрдЬрд░ stream::unfold рдХреА рдкреВрд░реА рдкреНрд░рддрд┐рд▓рд┐рдкрд┐ рд╣реИ stream::unfold ред рд╡рд╛рдпрджрд╛-0.3 рдХреЗ рдорд╛рдорд▓реЗ рдореЗрдВ, рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рд╕рдорд╛рди рд╣реИ:


 impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } } 

рд╣рд╛рд▓рд╛рдБрдХрд┐, рдпрджрд┐ рд╕рдВрд░рдЪрдирд╛ рдХреНрд╖реЗрддреНрд░ рдХрд╛ рдкреНрд░рдХрд╛рд░ Unpin рд▓рд╛рдЧреВ рдирд╣реАрдВ рдХрд░рддрд╛ рд╣реИ, рддреЛ std::ops::DerefMut рдХреЛ Pin<&mut T> рдкрд░ рд▓рд╛рдЧреВ рдирд╣реАрдВ рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ рдФрд░ рдЗрд╕ рдкреНрд░рдХрд╛рд░ рд╕рднреА рдХреНрд╖реЗрддреНрд░реЛрдВ рдореЗрдВ рдХреЛрдИ рдкрд░рд┐рд╡рд░реНрддрдирд╢реАрд▓ рдкрд╣реБрдВрдЪ рдирд╣реАрдВ рд╣реЛрдЧреА:


 use std::marker::PhantomPinned; struct Fact { inner: u32, //    Unpin   _pin: PhantomPinned, } struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <-   // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <-   self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) } } 

рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ, рдПрдХ рдпрд╛ рджреВрд╕рд░реЗ рд░реВрдк рдореЗрдВ, рдЖрдкрдХреЛ "рдкреНрд░реЛрдЬреЗрдХреНрд╢рди" рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдЕрд╕реБрд░рдХреНрд╖рд┐рдд рдХрд╛рд░реНрдпреЛрдВ Pin::get_unchecked_mut рдФрд░ Pin::map_unchecked_mut рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдирд╛ рд╣реЛрдЧрд╛ ( рджрд╕реНрддрд╛рд╡реЗрдЬрд╝ рдореЗрдВ рдЕрдзрд┐рдХ рд╡рд┐рд╕реНрддреГрдд рд╡рд┐рд╡рд░рдг рд╣реИ )ред рд╕реМрднрд╛рдЧреНрдп рд╕реЗ, рдРрд╕реЗ рдорд╛рдорд▓реЛрдВ рдХреЗ рд▓рд┐рдП, рдкрд┐рди_рдкреНрд░реЛрдЬреЗрдХреНрдЯ рдЯреЛрдХрд░реЗ рдореЗрдВ рдПрдХ рд╕реБрд░рдХреНрд╖рд┐рдд рдЖрд╡рд░рдг рд▓рд╛рдЧреВ рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ (рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдХрд╛ рд╡рд┐рд╡рд░рдг рдкреБрд╕реНрддрдХрд╛рд▓рдп рдкреНрд░рд▓реЗрдЦрди рдореЗрдВ рдкрд╛рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ)ред


 use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } } 

рдЕрдВрддрд┐рдо рдмрд┐рдВрджреБ рдЬрд┐рд╕реЗ рдореИрдВ рдЙрдЬрд╛рдЧрд░ рдХрд░рдирд╛ рдЪрд╛рд╣реВрдВрдЧрд╛ рд╡рд╣ рд╣реИ рд╡рд┐рднрд┐рдиреНрди рд╕рдВрд╕реНрдХрд░рдгреЛрдВ рдХреЗ рдкреНрд░рдХрд╛рд░реЛрдВ рдХреЗ рдмреАрдЪ рдЕрдВрддрд░-рд╕реНрдерд┐рд░рддрд╛ред рдРрд╕рд╛ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдПрдХ рдореЙрдбреНрдпреВрд▓ рдлреНрдпреВрдЪрд░реНрд╕ :: рдХрдВрдкреЗрдЯрд░реНрд╕ рд╣реИ , рдЬреЛ рдЖрдкрдХреЛ рдкреБрд░рд╛рдиреЗ рдкреНрд░рдХрд╛рд░реЛрдВ рд╕реЗ рдирдП рдореЗрдВ рдмрджрд▓рдиреЗ рдФрд░ рдЗрд╕рдХреЗ рд╡рд┐рдкрд░реАрдд рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИред рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдЖрдк async-wait рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рд╡рд╛рдпрджрд╛-0.1 рд╕реЗ Stream рдкрд░ рдкреБрдирд░рд╛рд╡реГрддрд┐ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ:


 use std::fmt::Display; // futures 0.3 use new_futures::compat::Compat01As03 as Compat; use new_futures::StreamExt as _; // futures 0.1 use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>, ) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(()) } 

рдиреЛрдЯ: рдХреЗрд╡рд▓ рдЯреЛрдХрд┐рдпреЛ рдирд┐рд╖реНрдкрд╛рджрдХ рдХреЛ рд▓реЗрдЦ рдореЗрдВ рд╕рдмрд╕реЗ рд▓рдВрдмреЗ рд╕рдордп рддрдХ рд░рд╣рдиреЗ рд╡рд╛рд▓рд╛ рдФрд░ рд╡реНрдпрд╛рдкрдХ рдорд╛рдирд╛ рдЬрд╛рддрд╛ рд╣реИред рдлрд┐рд░ рднреА, рджреБрдирд┐рдпрд╛ рд╡рд╣рд╛рдВ рд╕рдорд╛рдкреНрдд рдирд╣реАрдВ рд╣реЛрддреА рд╣реИ, рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдПрдХ рд╡реИрдХрд▓реНрдкрд┐рдХ async-std , рдЬреЛ рдЗрд╕рдХреЗ рдЕрд▓рд╛рд╡рд╛ рдорд╛рдирдХ рдкреБрд╕реНрддрдХрд╛рд▓рдп рдХреЗ рдкреНрд░рдХрд╛рд░реЛрдВ рдХреЗ рд▓рд┐рдП рдлреНрдпреВрдЪрд░рд┐рд╕реНрдЯрд┐рдХ рд░реИрдкрд░ рдкреНрд░рджрд╛рди рдХрд░рддрд╛ рд╣реИ, рд╕рд╛рде рд╣реА LocalPool рдФрд░ LocalPool рдХреЛ рдорд╛рдирд╛ рдЧрдпрд╛ рд╡рд╛рдпрджрд╛-0.3 рдкреБрд╕реНрддрдХрд╛рд▓рдп рд╕реЗред

Source: https://habr.com/ru/post/hi475272/


All Articles