
img source
In der vergangenen Woche ereignete sich für die Rust-Community ein riesiges Ereignis - die Compiler-Version 1.39 wurde veröffentlicht , und damit verbunden die Stabilisierung von asynchronen Funktionen. In diesem Beitrag werde ich versuchen, alle relevanten Änderungen im Compiler und im Ökosystem zusammenzufassen und Anweisungen für die Migration auf das asynchrone Warte-Paradigma bereitzustellen. Ich werde keine detaillierte Analyse der Asynchronität in Rust durchführen, es gibt immer noch relevante Artikel über das Habré, die helfen werden, in das Thema einzusteigen:
Zusätzlich zu diesen Artikeln können Sie auch auf die Dokumentation der Standardbibliothek und die erforderlichen Kisten verweisen sowie das Async-Buch (in Englisch) lesen.
Alle in diesem Artikel beschriebenen Beispiele funktionieren auf dem Stable-Compiler 1.39 und sollten auf allen nachfolgenden Versionen funktionieren. Der endgültige Code ist auf Github verfügbar.
Zur Implementierung von asynchronem Code wurde die Futures-0.1- Bibliothek verwendet. Es bietet die Basismerkmale futures::Future
und futures::Stream
für die Arbeit mit verzögertem Computing. Sie arbeiten mit Result<..>
-Typen und bieten eine Reihe von Kombinatoren. Darüber hinaus bietet die Bibliothek Kanäle für die Kommunikation zwischen Tasks (Tasks), verschiedene Schnittstellen für die Arbeit mit dem Executor und seinem Task-System und vieles mehr.
Stellen Sie sich ein Beispiel vor, das eine Zahlenreihe aus den höchsten 32 Bits von Fakultäten generiert und diese an Sink
sendet:
Hinweis: Die Berücksichtigung von CPU-gebundenen Aufgaben auf Coroutinen ist nicht die beste Anwendung, das Beispiel ist jedoch autark und einfach.
Wie Sie sehen, sieht der Code ziemlich umständlich aus: Sie müssen den Rückgabewert angeben, obwohl er keinen nützlichen Wert enthält. In Futures 0.3 wird der Code etwas einfacher:
Hier wird der Funktion das Schlüsselwort async
hinzugefügt, das den Rückgabewert der Funktion in Future
. Da es sich in unserem Fall um ein Tupel der Größe Null handelt, kann es wie bei gewöhnlichen Funktionen einfach weggelassen werden.
Das Schlüsselwort await
wird am Ende der Aufrufkette verwendet, um auf die Ausführung zu warten. Dieser Aufruf unterbricht die Ausführung im aktuellen asynchronen Kontext und überträgt die Steuerung an den Scheduler, bis der erwartete Future
Wert bereit ist. Dann wird die Ausführung mit der letzten await
(Beenden der Funktion in unserem Beispiel), d.h. Der Steuerfluss wird im Vergleich zu einem ähnlichen Synchroncode nichtlinear.
Ein weiterer signifikanter Unterschied ist das Vorhandensein eines asynchronen Blocks im Körper des Verschlusses innerhalb von stream::unfold
. Dieser Wrapper ist eine komplette Entsprechung zum Deklarieren einer neuen Async-Funktion mit demselben Body und zum Aufrufen anstelle eines Async-Blocks.
# [feature (async_closure)Vielleicht kann dieser Abschluss bald mit der Funktion async_closure
geschrieben werden, aber leider wurde er noch nicht implementiert:
async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) }
Wie Sie sehen, funktioniert der neue Stream
Typ nicht nur mit Elementen des Result<..>
wie zuvor. Ähnliche Änderungen wurden am Future
Merkmal vorgenommen. Die Versionsdefinitionen lauten wie folgt:
Neben der Tatsache, dass der Rückgabetyp beliebig sein kann, haben sich auch die Eingabeparameter für Future::poll
geändert. Es wurde ein neuer Context
Parameter angezeigt, der eine explizite Schnittstelle zum Aufwecken der aktuellen Aufgabe bietet. Zuvor konnte dasselbe durch globale Variablen eines bestimmten Executors erreicht werden (z. B. durch Aufrufen von tokio::prelude::task::current().notify()
).
Der grundlegendere Unterschied zwischen der Schnittstelle besteht darin, dass Sie den Link in Pin
. Dieser Wrapper über dem Zeiger garantiert die "Unbeweglichkeit" der Daten im Speicher (eine detailliertere Beschreibung von Pin
in der Version 1.33 des Compilers auf dem Hub oder in englischer Sprache in der Dokumentation der Standardbibliothek std :: pin ).
Lassen Sie uns versuchen, unser Beispiel jetzt auszuführen. Als Sink
nehmen wir die Hälfte des Kanals von Futures und geben das Ergebnis ausgangsseitig mit einer gewissen Verzögerung zwischen den Iterationen aus. Auf Futures-0.1 kann ein solcher Code wie folgt geschrieben werden:
use std::time::{Duration, Instant};
Ein ähnlicher Code mit dem neuen Tokio (das zum Zeitpunkt des Schreibens noch Alpha ist) und Futures-0.3 könnte folgendermaßen aussehen:
use std::time::Duration;
Wie Sie sehen, ist der Code mit den neuen Futures viel kürzer geworden. Nach den Erfahrungen des Autors wird die Anzahl der Zeilen immer deutlich verringert (manchmal sogar beim Umschreiben von synchronem Code). Aber es scheint mir, dass es einen viel bedeutenderen Unterschied in der Lesbarkeit und das Fehlen einer Mischung aus map
/ map_err
, die aufgrund der Variabilität von Fehlern in Standardtypen in Result<..>
erforderlich waren.
Kombinatoren über Elemente des Typs Result<..>
blieben jedoch erhalten und sind separate Typen, einige mit einem leicht aktualisierten Namen. Jetzt werden sie in zwei verschiedene Typen unterteilt; diejenigen, die implementiert sind für:
Die Implementierung der Future
und Stream
Merkmale ist etwas komplizierter. Versuchen wir als Beispiel, Stream
für eine bereits betrachtete Zahlenreihe zu implementieren. Der gemeinsame Typ für beide Versionen von Futures lautet wie folgt:
struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } }
Für Futures-0.1 sieht die Implementierung wie folgt aus:
impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } }
In diesem Beispiel ist die Implementierung von Stream::poll
eine vollständige Kopie von stream::unfold
. Im Falle von Futures-0.3 ist die Implementierung äquivalent:
impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } }
Wenn der Typ eines Strukturfelds jedoch Unpin
nicht implementiert, wird std::ops::DerefMut
nicht auf Pin<&mut T>
implementiert, und daher gibt es keinen veränderlichen Zugriff auf alle Felder:
use std::marker::PhantomPinned; struct Fact { inner: u32,
In diesem Fall müssen Sie in der einen oder anderen Form die unsicheren Funktionen Pin::get_unchecked_mut
und Pin::map_unchecked_mut
verwenden, um eine "Projektion" zu erhalten !Unpin
Glücklicherweise ist für solche Fälle ein sicherer Wrapper in der pin_project-Kiste implementiert (Details zur Implementierung finden Sie in der Bibliotheksdokumentation ).
use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } }
Der letzte Punkt, den ich hervorheben möchte, ist die Interstabilität zwischen den Typen verschiedener Versionen. Dazu gibt es ein Modul futures :: compat , mit dem Sie von alten auf neue Typen konvertieren können und umgekehrt. Beispielsweise können Sie mit async-await über Stream
von Futures-0.1 iterieren:
use std::fmt::Display;
Hinweis: Nur tokio executor wird im Artikel als am langlebigsten und am weitesten verbreitet angesehen. Dennoch endet die Welt nicht dort, zum Beispiel gibt es eine alternative ThreadPool
Standardbibliothek, die zusätzlich futuristische Wrapper für Typen der Standardbibliothek sowie ThreadPool
und LocalPool
aus der betrachteten Futures-0.3-Bibliothek LocalPool
.