الترحيل إلى البنية الأساسية غير المتزامنة في راست

هجرة الطيور
img المصدر


حدث كبير حدث لمجتمع Rust في الأسبوع الماضي - تم إصدار إصدار برنامج التحويل البرمجي 1.39 ، وفي مكانه تم تثبيت ميزات انتظار المتزامنة. في هذا المنشور ، سأحاول تلخيص جميع التغييرات ذات الصلة في المترجم والنظام الإيكولوجي ، بالإضافة إلى تقديم إرشادات الترحيل للنموذج async-await. لن أقوم بتحليل مفصل للتزامن في Rust ، لا تزال هناك مقالات ذات صلة حول Habré ستساعد في الوصول إلى الموضوع:



بالإضافة إلى هذه المقالات ، يمكنك أيضًا الرجوع إلى الوثائق الخاصة بالمكتبة القياسية والصناديق الضرورية ، بالإضافة إلى قراءة كتاب غير متزامن (باللغة الإنجليزية).


تعمل كافة الأمثلة التي تمت مناقشتها في هذه المقالة على برنامج التحويل البرمجي المستقر 1.39 ويجب أن تعمل على كافة الإصدارات اللاحقة. الرمز النهائي متاح على جيثب .


لتطبيق التعليمات البرمجية غير المتزامنة ، تم استخدام مكتبة futures-0.1 . ويوفر السمات الأساسية futures::Future futures::Stream للعمل مع الحوسبة المؤجلة. وهي تعمل على أنواع Result<..> وتوفر مجموعة من أدوات الجمع. بالإضافة إلى ذلك ، توفر المكتبة قنوات للاتصال بين المهام (المهام) ، واجهات مختلفة للعمل مع المنفذ ونظام مهامه ، وأكثر من ذلك.


فكر في مثال ينشئ سلسلة أرقام من أعلى 32 بت من الفصائل ويرسلها إلى Sink :


 // futures 0.1.29 use futures::prelude::*; use futures::{stream, futures}; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>, ) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ()) } 

ملاحظة: النظر في المهام المرتبطة بوحدة المعالجة المركزية على coroutines ليس هو أفضل تطبيق ، ولكن المثال هو الاكتفاء الذاتي والبساطة.


كما ترون ، تبدو الشفرة مرهقة إلى حد ما: يجب عليك تحديد قيمة الإرجاع ، على الرغم من عدم وجود قيمة مفيدة فيها. في العقود الآجلة 0.3 ، يصبح الكود أسهل قليلاً:


 // futures 0.3.1 use futures::prelude::*; use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await; } 

هنا ، تتم إضافة الكلمة الأساسية غير async إلى الوظيفة ، التي تلتف القيمة المرجعة للدالة في Future . لأنه في حالتنا هذه مجموعة ذات حجم صفري ، يمكن حذفها ببساطة ، كما هو الحال في الوظائف العادية.


يتم استخدام الكلمة الرئيسية " await في نهاية سلسلة الاتصال لانتظار التنفيذ. هذه المكالمة توقف التنفيذ في سياق المزامنة الحالي وتنقل التحكم إلى المجدول حتى تصبح القيمة Future المتوقعة جاهزة. بعد ذلك ، يستأنف التنفيذ مع await آخر (إنهاء الوظيفة في مثالنا) ، أي يصبح تدفق التحكم غير خطي مقارنةً برمز متزامن مماثل.


هناك اختلاف مهم آخر يتمثل في وجود كتلة غير متزامنة في جسم الإغلاق داخل stream::unfold . هذا المجمع هو تناظرية كاملة لإعلان وظيفة غير متزامن جديدة مع نفس الجسم والاتصال بدلاً من كتلة غير متزامنة.


# [الميزة (async_closure)

ربما يمكن كتابة هذا الإغلاق قريبًا باستخدام ميزة async_closure ، ولكن للأسف ، لم يتم تنفيذه بعد:


 async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } 

كما ترون ، لا يعمل نوع Stream الجديد فقط مع عناصر Result<..> ، كما كان من قبل. تم إجراء تغييرات مماثلة على سمة Future ، وتعريفات الإصدار كالتالي:


 // futures 0.1 trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } enum Async<T> { Ready(T), NotReady } // futures 0.3 trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending } 

إلى جانب حقيقة أن نوع الإرجاع يمكن أن يكون تعسفياً ، فإن معلمات الإدخال الخاصة بـ Future::poll قد تغيرت أيضًا. ظهرت معلمة Context جديدة ، والتي توفر واجهة صريحة للاستيقاظ من المهمة الحالية. في السابق ، كان يمكن تحقيق نفس الشيء من خلال المتغيرات العامة لمنفذ معين (على سبيل المثال ، عن طريق استدعاء tokio::prelude::task::current().notify() ).


الفرق الأكثر جوهرية بين الواجهة هو أنه يجب عليك التفاف الرابط بنفسك في Pin . يضمن هذا المجمع الموجود فوق المؤشر "ثبات" البيانات الموجودة في الذاكرة (يوجد وصف أكثر تفصيلاً لـ Pin في الإصدار 1.33 من المحول البرمجي على لوحة الوصل ، أو باللغة الإنجليزية ، في وثائق مكتبة std :: pin القياسية).


دعونا نحاول تشغيل مثالنا الآن. نظرًا لأن Sink نأخذ نصف القناة من العقود المستقبلية وعلى جانب الإخراج ، سنطبع النتيجة مع بعض التأخير بين التكرارات. في العقود المستقبلية - 0.1 ، يمكن كتابة هذا الرمز على النحو التالي:


 use std::time::{Duration, Instant}; // futures 0.1.29 use futures::prelude::*; use futures::sync::mpsc; // tokio 0.1.22 use tokio::runtime::Runtime; use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap(); } 

رمز مشابه مع tokio الجديد (والذي لا يزال وقت كتابة ألفا) والعقود الآجلة 0.3 قد تبدو كما يلي:


 use std::time::Duration; // futures 0.3.1 use futures::channel::mpsc; use futures::prelude::*; // tokio 0.2.0-alpha.5 use tokio::timer; #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await; } 

كما ترون ، أصبح الرمز الخاص بالعقود الآجلة الجديدة أقصر بكثير. وفقًا لتجربة المؤلف ، فإن عدد الخطوط دائمًا ما يكون أقل كثيرًا (أحيانًا حتى عند إعادة كتابة الكود المتزامن). ولكن يبدو لي أن هناك اختلافًا أكبر بكثير في قابلية القراءة وعدم وجود مزيج من map_err map / map_err ، والتي كانت ضرورية بسبب تباين الأخطاء في الأنواع القياسية في Result<..> .


ظلت وحدات الجمع على عناصر من النوع Result<..> مع ذلك وهي في أنواع منفصلة ، بعضها يحمل اسمًا تم تحديثه قليلاً. الآن يتم تقسيمهم إلى نوعين مختلفين ؛ تلك التي يتم تنفيذها من أجل:



يعد تنفيذ سمات Future والدفق أكثر تعقيدًا. على سبيل المثال ، دعونا نحاول تنفيذ Stream لسلسلة الأرقام التي تم النظر فيها بالفعل. النوع الشائع لكلا الإصدارين من العقود المستقبلية سيكون على النحو التالي:


 struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } } 

بالنسبة للعقود المستقبلية - 0.1 ، سيكون التنفيذ كما يلي:


 impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } } 

في هذا المثال ، يعد تطبيق Stream::poll عن نسخة كاملة بالفعل من stream::unfold الإغلاق stream::unfold . في حالة العقود الآجلة 0.3 ، يكون التنفيذ مكافئًا:


 impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } } 

ومع ذلك ، إذا كان نوع حقل الهيكل لا يقوم بتطبيق std::ops::DerefMut فلن يتم تنفيذ std::ops::DerefMut على Pin<&mut T> وبالتالي لن يكون هناك وصول قابل للتغيير إلى جميع الحقول:


 use std::marker::PhantomPinned; struct Fact { inner: u32, //    Unpin   _pin: PhantomPinned, } struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <-   // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <-   self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) } } 

في هذه الحالة ، في نموذج أو آخر ، سيتعين عليك استخدام الدالتين غير Pin::get_unchecked_mut و Pin::map_unchecked_mut أجل الحصول على "إسقاط" !Unpin حقول الحقول ( هناك وصف أكثر تفصيلاً في الوثائق ). لحسن الحظ ، في مثل هذه الحالات ، يوجد برنامج تغليف آمن تم تنفيذه في قفص pin_project (يمكن العثور على تفاصيل التنفيذ في وثائق المكتبة ).


 use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } } 

النقطة الأخيرة التي أود تسليط الضوء عليها هي الاستقرار المتبادل بين أنواع الإصدارات المختلفة. للقيام بذلك ، هناك وحدة مستقبلية :: compat ، تتيح لك التحويل من الأنواع القديمة إلى الأنواع الجديدة والعكس. على سبيل المثال ، يمكنك التكرار عبر Stream من العقود الآجلة -0.1 باستخدام async-await:


 use std::fmt::Display; // futures 0.3 use new_futures::compat::Compat01As03 as Compat; use new_futures::StreamExt as _; // futures 0.1 use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>, ) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(()) } 

ملاحظة: يعتبر منفّذ tokio فقط في المقالة الأكثر عمراً والأكثر انتشارًا. ومع ذلك ، فإن العالم لا ينتهي عند هذا الحد ، على سبيل المثال ، يوجد بديل غير async-std ، يوفر بالإضافة إلى ذلك LocalPool مستقبلية لأنواع المكتبة القياسية ، بالإضافة إلى ThreadPool و LocalPool من مكتبة futures-0.3.

Source: https://habr.com/ru/post/ar475272/


All Articles