خط أنابيب معالجة البيانات المعلن على رأس الجهات الفاعلة؟ لم لا؟

منذ بعض الوقت ، في مناقشة حول أحد إصدارات SObjectizer ، سُئلنا: "هل من الممكن إنشاء DSL لوصف خط أنابيب معالجة البيانات؟" بمعنى آخر ، هل يمكن كتابة شيء كالتالي:


A | B | C | D


واحصل على خط أنابيب يعمل حيث تنتقل الرسائل من A إلى B ، ثم إلى C ، ثم إلى D. مع التحكم ، يتلقى B بالضبط هذا النوع الذي تُرجعه A. ويتلقى C بالضبط هذا النوع الذي يعود B. و هكذا.


لقد كانت مهمة مثيرة للاهتمام مع حل بسيط بشكل مدهش. على سبيل المثال ، هكذا يبدو إنشاء خط أنابيب:


 auto pipeline = make_pipeline(env, stage(A) | stage(B) | stage(C) | stage(D)); 

أو في حالة أكثر تعقيدًا (سيتم مناقشتها أدناه):


 auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) ); 

في هذه المقالة ، سنتحدث عن تنفيذ خط الأنابيب DSL هذا. سنناقش في الغالب الأجزاء المتعلقة stage() ، broadcast() operator|() ووظائف operator|() مع العديد من الأمثلة على استخدام قوالب C ++. لذا آمل أن يكون الأمر ممتعًا حتى بالنسبة للقراء الذين لا يعرفون عن SObjectizer (إذا لم تسمع عن SObjectizer هنا فهي نظرة عامة على هذه الأداة).


بضع كلمات حول التجريبي المستخدمة


تأثر المثال المستخدم في المقالة بتجربتي القديمة (والمنسية) في منطقة SCADA.


فكرة العرض التوضيحي هي معالجة البيانات المقروءة من بعض أجهزة الاستشعار. يتم الحصول على البيانات من جهاز استشعار مع بعض الفترة ، ثم يجب التحقق من صحة البيانات (يجب تجاهل البيانات غير الصحيحة) وتحويلها إلى بعض القيم الفعلية. على سبيل المثال ، يمكن أن تكون البيانات الأولية المقروءة من مستشعر قيمتين صحيحتين 8 بت ويجب تحويل تلك القيم إلى رقم الفاصلة العائمة.


ثم ينبغي أرشفة القيم الصالحة والمحوّلة ، وتوزيعها في مكان ما (على عقد مختلفة للتخيل ، على سبيل المثال) ، والتحقق من وجود "إنذارات" (إذا كانت القيم خارجة عن نطاقات آمنة ، فيجب معالجة ذلك بشكل خاص). هذه العمليات مستقلة ويمكن تنفيذها على التوازي.


يمكن إجراء العمليات المتعلقة بالإنذار المكتشف بشكل متوازٍ أيضًا: يجب بدء "المنبه" (حتى يمكن لجزء SCADA على العقدة الحالية أن يتفاعل معه) ويجب توزيع المعلومات حول "المنبه" في مكان آخر (على سبيل المثال : تخزينها في قاعدة بيانات تاريخية و / أو تصور على شاشة مشغل SCADA).


يمكن التعبير عن هذا المنطق بشكل نصي بهذه الطريقة:


 optional(valid_raw_data) = validate(raw_data); if valid_raw_data is not empty then { converted_value = convert(valid_raw_data); do_async archive(converted_value); do_async distribute(converted_value); do_async { optional(suspicious_value) = check_range(converted_value); if suspicious_value is not empty then { optional(alarm) = detect_alarm(suspicious_value); if alarm is not empty then { do_async initiate_alarm(alarm); do_async distribute_alarm(alam); } } } } 

أو ، في شكل رسوم بيانية:



إنه مثال مصطنع إلى حد ما ، ولكنه يحتوي على بعض الأشياء المثيرة للاهتمام التي أريد عرضها. الأول هو وجود مراحل متوازية في خط أنابيب (عملية broadcast() موجودة فقط بسبب ذلك). والثاني هو وجود دولة في بعض المراحل. على سبيل المثال ، alarm_detector هو مرحلة الدولة.


قدرات خطوط الأنابيب


تم بناء خط أنابيب من مراحل منفصلة. كل مرحلة هي وظيفة أو فاعل بالتنسيق التالي:


 opt<Out> func(const In &); 

أو


 void func(const In &); 

لا يمكن استخدام المراحل التي تُرجع void إلا في المرحلة الأخيرة من خط الأنابيب.


لا بد المراحل في سلسلة. تستقبل كل مرحلة تالية كائنًا تم إرجاعه بواسطة المرحلة السابقة. إذا كانت المرحلة السابقة تُرجع القيمة الفارغة ، opt<Out> القيمة opt<Out> ثم لا يتم استدعاء المرحلة التالية.


هناك مرحلة broadcast الخاصة. هي التي شيدت من عدة خطوط أنابيب. تستقبل مرحلة broadcast كائنًا من المرحلة السابقة وتبثه إلى كل خط أنابيب فرعي.


من وجهة نظر خط الأنابيب ، تبدو مرحلة broadcast كدالة بالتنسيق التالي:


 void func(const In &); 

نظرًا لعدم وجود قيمة إرجاع من مرحلة broadcast يمكن أن تكون مرحلة broadcast المرحلة الأخيرة فقط في خط أنابيب.


لماذا ترجع مرحلة خط الأنابيب قيمة اختيارية؟


ذلك لأن هناك حاجة لإسقاط بعض القيم الواردة. على سبيل المثال ، لا تقوم مرحلة validate بإرجاع أي شيء إذا كانت القيمة الأولية غير صحيحة ، ولا يوجد أي معنى لمعالجتها.


مثال آخر: لا تقوم مرحلة alarm_detector بإرجاع أي شيء إذا لم تنتج القيمة المشبوهة الحالية عن حالة إنذار جديدة.


تفاصيل التنفيذ



لنبدأ من أنواع البيانات والوظائف المتعلقة بمنطق التطبيق. في المثال الذي تمت مناقشته ، يتم استخدام أنواع البيانات التالية لتمرير المعلومات من مرحلة إلى أخرى:


 // Raw data from a sensor. struct raw_measure { int m_meter_id; uint8_t m_high_bits; uint8_t m_low_bits; }; // Type of input for validation stage with raw data from a sensor. struct raw_value { raw_measure m_data; }; // Type of input for conversion stage with valid raw data from a sensor. struct valid_raw_value { raw_measure m_data; }; // Data from a sensor after conversion to Celsius degrees. struct calculated_measure { int m_meter_id; float m_measure; }; // The type for result of conversion stage with converted data from a sensor. struct sensor_value { calculated_measure m_data; }; // Type with value which could mean a dangerous level of temperature. struct suspicious_value { calculated_measure m_data; }; // Type with information about detected dangerous situation. struct alarm_detected { int m_meter_id; }; 

هناك مثيل لـ raw_value ينتقل إلى المرحلة الأولى من خط أنابيبنا. يحتوي هذا raw_value على معلومات تم الحصول عليها من جهاز استشعار في شكل كائن raw_measure . ثم يتم تحويل valid_raw_value إلى valid_raw_value . ثم valid_raw_value تحولت إلى sensor_value مع قيمة جهاز استشعار الفعلي في شكل calulated_measure . إذا احتوى مثيل من sensor_value على قيمة مريبة ، فسيتم إنتاج مثيل للقيمة suspicious_value . ويمكن تحويل هذا القيمة suspicious_value إلى مثيل alarm_detected لاحقًا.


أو بالصيغة الرسومية:



الآن يمكننا إلقاء نظرة على تنفيذ مراحل خط الأنابيب لدينا:


 // // The first stage of a pipeline. Validation of raw data from a sensor. // // Returns valid_raw_value or nothing if value is invalid. // stage_result_t< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return make_result< valid_raw_value >( v.m_data ); else return make_empty< valid_raw_value >(); } // // The second stage of a pipeline. Conversion from raw data to a value // in Celsius degrees. // stage_result_t< sensor_value > conversion( const valid_raw_value & v ) { return make_result< sensor_value >( calculated_measure{ v.m_data.m_meter_id, 0.5f * ((static_cast< uint16_t >( v.m_data.m_high_bits ) << 8) + v.m_data.m_low_bits) } ); } // // Simulation of the data archiving. // void archiving( const sensor_value & v ) { clog << "archiving (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } // // Simulation of the data distribution. // void distribution( const sensor_value & v ) { clog << "distributing (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } // // The first stage of a child pipeline at third level of the main pipeline. // // Checking for to high value of the temperature. // // Returns suspicious_value message or nothing. // stage_result_t< suspicious_value > range_checking( const sensor_value & v ) { if( v.m_data.m_measure >= 45.0f ) return make_result< suspicious_value >( v.m_data ); else return make_empty< suspicious_value >(); } // // The next stage of a child pipeline. // // Checks for two suspicious_value-es in 25ms time window. // class alarm_detector { using clock = chrono::steady_clock; public : stage_result_t< alarm_detected > operator()( const suspicious_value & v ) { if( m_previous ) if( *m_previous + chrono::milliseconds(25) > clock::now() ) { m_previous = nullopt; return make_result< alarm_detected >( v.m_data.m_meter_id ); } m_previous = clock::now(); return make_empty< alarm_detected >(); } private : optional< clock::time_point > m_previous; }; // // One of last stages of a child pipeline. // Imitates beginning of the alarm processing. // void alarm_initiator( const alarm_detected & v ) { clog << "=== alarm (" << v.m_meter_id << ") ===" << endl; } // // Another of last stages of a child pipeline. // Imitates distribution of the alarm. // void alarm_distribution( ostream & to, const alarm_detected & v ) { to << "alarm_distribution (" << v.m_meter_id << ")" << endl; } 

فقط تخطي أشياء مثل stage_result_t و make_result و make_empty ، سنناقشها في القسم التالي.


آمل أن رمز تلك المراحل تافهة إلى حد ما. الجزء الوحيد الذي يتطلب بعض التفسير الإضافي هو تنفيذ مرحلة alarm_detector .


في هذا المثال ، يتم تشغيل المنبه فقط إذا كان هناك على الأقل اثنين من القيم suspicious_values في نافذة زمنية تبلغ 25 مللي ثانية. لذلك يتعين علينا أن نتذكر وقت المثيل suspicious_value السابق في مرحلة alarm_detector . وذلك لأن تطبيق alarm_detector يتم تطبيقه كمسؤول ذي حالة جيدة مع مشغل استدعاء دالة.


تقوم المراحل بإرجاع نوع SObjectizer بدلاً من std :: اختياري


قلت في وقت سابق أن هذه المرحلة يمكن أن تعود قيمة اختيارية لكن std::optional لا يستخدم في التعليمات البرمجية ، يمكن رؤية النوع stage_result_t في تنفيذ المراحل.


فذلك لأن بعض SObjectizer المحددة تلعب دورها هنا. سيتم توزيع القيم التي يتم إرجاعها كرسائل بين وكلاء SObjectizer (ويعرف أيضًا باسم العناصر الفاعلة). يتم إرسال كل رسالة في SObjectizer ككائن مخصص بشكل حيوي. لذلك لدينا نوع من "التحسين" هنا: بدلاً من إرجاع std::optional ومن ثم تخصيص كائن رسالة جديد ، نحن فقط نخصص كائن رسالة ونعيد مؤشر ذكي إليه.


في الواقع ، stage_result_t هو مجرد typedef ل anal_ptr Shared_ptr التناظرية:


 template< typename M > using stage_result_t = message_holder_t< M >; 

و make_result و make_empty هما دالات مساعدة فقط لإنشاء stage_result_t مع أو بدون قيمة فعلية بداخل:


 template< typename M, typename... Args > stage_result_t< M > make_result( Args &&... args ) { return stage_result_t< M >::make(forward< Args >(args)...); } template< typename M > stage_result_t< M > make_empty() { return stage_result_t< M >(); } 

للبساطة ، يمكن القول أن مرحلة validation من validation يمكن التعبير عنها بهذه الطريقة:


 std::shared_ptr< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return std::make_shared< valid_raw_value >( v.m_data ); else return std::shared_ptr< valid_raw_value >{}; } 

ولكن نظرًا لخصوصية SObjectizer ، لا يمكننا استخدام std::shared_ptr so_5::message_holder_t التعامل مع so_5::message_holder_t type. ونحن نخفي ذلك المحدد وراء stage_result_t و make_result و make_empty المساعدين.


stage_handler_t والفصل stage_builder_t


تتمثل إحدى النقاط المهمة في تنفيذ خط الأنابيب في الفصل بين مفاهيم معالج المرحلة ومرحلة البناء . يتم ذلك من أجل البساطة. سمح لي وجود هذه المفاهيم بأن يكون لدي خطوتين في تعريف خط الأنابيب.


في الخطوة الأولى ، يصف المستخدم مراحل خطوط الأنابيب. نتيجة لذلك ، أتلقى مثيل stage_t الذي يحتفظ بجميع مراحل خط الأنابيب في الداخل.


في الخطوة الثانية ، يتم إنشاء مجموعة من وكلاء SObjectizer الأساسي. يتلقى هؤلاء الوكلاء رسائل تحتوي على نتائج المراحل السابقة ويتصلون بمعالجات المرحلة الفعلية ، ثم يرسلون النتائج إلى المراحل التالية.


ولكن لإنشاء هذه المجموعة من العوامل ، يجب أن يكون لكل مرحلة منشئ مرحلة . يمكن رؤية منشئ المرحلة كمصنع يقوم بإنشاء وكيل SObjectizer الأساسي.


لذلك لدينا العلاقة التالية: كل مرحلة من مراحل خط الأنابيب تنتج كائنين: معالج المرحلة الذي يحتفظ بالمنطق المرتبط بالمرحلة ، ومنشئ المرحلة الذي ينشئ وكيل SObjectizer الأساسي لاستدعاء معالج المرحلة في الوقت المناسب:



يتم تمثيل معالج المرحلة بالطريقة التالية:


 template< typename In, typename Out > class stage_handler_t { public : using traits = handler_traits_t< In, Out >; using func_type = function< typename traits::output(const typename traits::input &) >; stage_handler_t( func_type handler ) : m_handler( move(handler) ) {} template< typename Callable > stage_handler_t( Callable handler ) : m_handler( handler ) {} typename traits::output operator()( const typename traits::input & a ) const { return m_handler( a ); } private : func_type m_handler; }; 

حيث يتم تعريف handler_traits_t بالطريقة التالية:


 // // We have to deal with two types of stage handlers: // - intermediate handlers which will return some result (eg some new // message); // - terminal handlers which can return nothing (eg void instead of // stage_result_t<M>); // // This template with specialization defines `input` and `output` // aliases for both cases. // template< typename In, typename Out > struct handler_traits_t { using input = In; using output = stage_result_t< Out >; }; template< typename In > struct handler_traits_t< In, void > { using input = In; using output = void; }; 

ويمثل باني المرحلة فقط std::function :


 using stage_builder_t = function< mbox_t(coop_t &, mbox_t) >; 

أنواع المساعدة lambda_traits_t و callable_traits_t


نظرًا لأن المراحل يمكن تمثيلها بواسطة وظائف مجانية أو عوامل توجيه (مثل مثيلات فئة alarm_detector أو الفئات التي تم إنشاؤها بواسطة مترجم مجهول تمثل lambdas) ، فنحن بحاجة إلى بعض المساعدين للكشف عن أنواع وسيطة المرحلة وقيمة الإرجاع. لقد استخدمت الكود التالي لهذا الغرض:


 // // Helper type for `arg_type` and `result_type` alises definition. // template< typename R, typename A > struct callable_traits_typedefs_t { using arg_type = A; using result_type = R; }; // // Helper type for dealing with stateful objects with operator() // (they could be user-defined objects or generated by compiler // like lambdas). // template< typename T > struct lambda_traits_t; template< typename M, typename A, typename T > struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) const > : public callable_traits_typedefs_t< M, A > {}; template< typename A, typename T > struct lambda_traits_t< void (T::*)(const A &) const > : public callable_traits_typedefs_t< void, A > {}; template< typename M, typename A, typename T > struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) > : public callable_traits_typedefs_t< M, A > {}; template< typename A, typename T > struct lambda_traits_t< void (T::*)(const A &) > : public callable_traits_typedefs_t< void, A > {}; // // Main type for definition of `arg_type` and `result_type` aliases. // With specialization for various cases. // template< typename T > struct callable_traits_t : public lambda_traits_t< decltype(&T::operator()) > {}; template< typename M, typename A > struct callable_traits_t< stage_result_t< M >(*)(const A &) > : public callable_traits_typedefs_t< M, A > {}; template< typename A > struct callable_traits_t< void(*)(const A &) > : public callable_traits_typedefs_t< void, A > {}; 

آمل أن يكون هذا الرمز مفهوما تمامًا للقراء الذين لديهم معرفة جيدة بـ C ++. إذا لم يكن الأمر كذلك ، فلا تتردد في lambda_traits_t ، lambda_traits_t أن أشرح المنطق وراء lambda_traits_t و callable_traits_t بالتفصيل.


المرحلة () ، وظائف البث () و | () المشغل


الآن يمكننا أن ننظر داخل وظائف بناء خطوط الأنابيب الرئيسية. ولكن قبل ذلك ، من الضروري إلقاء نظرة على تعريف فئة القالب stage_t :


 template< typename In, typename Out > struct stage_t { stage_builder_t m_builder; }; 

إنها بنية بسيطة للغاية تحتوي على مثيل stage_bulder_t فقط. لا يتم استخدام معلمات القالب داخل stage_t ، فلماذا تكون موجودة هنا؟


وهي ضرورية للتحقق من وقت ترجمة توافق النوع بين مراحل خطوط الأنابيب. سنرى ذلك قريباً.


لنلقِ نظرة على أبسط وظيفة لبناء خطوط الأنابيب ، stage() :


 template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( Callable handler ) { stage_builder_t builder{ [h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent< a_stage_point_t<In, Out> >( std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; } 

يتلقى معالج مرحلة الفعلي كمعلمة واحدة. يمكن أن يكون مؤشرًا لوظيفة أو وظيفة lambda أو functor. يتم استنباط أنواع المدخلات والمخرجات في المرحلة تلقائيًا بسبب "قالب السحر" وراء قالب callable_traits_t .


يتم إنشاء مثيل منشئ المرحلة داخل ويتم إرجاع هذا المثيل في كائن stage_t جديد stage_t stage() . يتم التقاط معالج المرحلة الفعلي بواسطة أداة بناء المرحلة lambda ، ثم سيتم استخدامه لإنشاء وكيل SObjectizer الأساسي (سنتحدث عن ذلك في القسم التالي).


الوظيفة التالية للمراجعة هي operator|() الذي يسلسل مرحلتين معًا ويعيد مرحلة جديدة:


 template< typename In, typename Out1, typename Out2 > stage_t< In, Out2 > operator|( stage_t< In, Out1 > && prev, stage_t< Out1, Out2 > && next ) { return { stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } }; } 

إن أبسط طريقة لشرح منطق operator|() هي محاولة رسم صورة. لنفترض أن لدينا التعبير:


 stage(A) | stage(B) | stage(C) | stage(B) 

سيتم تحويل هذا التعبير بهذه الطريقة:



هناك يمكننا أيضًا أن نرى كيف يعمل فحص نوع الترجمة: يتطلب تعريف operator|() أن يكون نوع إخراج المرحلة الأولى هو مدخلات المرحلة الثانية. إذا لم يكن الأمر كذلك ، فلن يتم تجميع الشفرة.


والآن يمكننا إلقاء نظرة على وظيفة بناء خطوط الأنابيب الأكثر تعقيدًا ، broadcast() . الوظيفة نفسها بسيطة إلى حد ما:


 template< typename In, typename Out, typename... Rest > stage_t< In, void > broadcast( stage_t< In, Out > && first, Rest &&... stages ) { stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; return { std::move(builder) }; } 

الفرق الرئيسي بين المرحلة العادية والمرحلة الإذاعية هو أن المرحلة الإذاعية يجب أن تحمل ناقلات منشئي المرحلة الفرعية. لذلك يتعين علينا إنشاء هذا المتجه وتمريره إلى باني المسرح الرئيسي لمرحلة البث. لهذا السبب ، يمكننا أن نرى دعوة إلى collect_sink_builders في قائمة التقاط lambda داخل وظيفة broadcast() :


 stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] 

إذا نظرنا إلى collect_sink_builder الرمز التالي:


 // // Serie of helper functions for building description for // `broadcast` stage. // // Those functions are used for collecting // `builders` functions for every child pipeline. // // Please note that this functions checks that each child pipeline has the // same In type. // template< typename In, typename Out, typename... Rest > void move_sink_builder_to( vector< stage_builder_t > & receiver, stage_t< In, Out > && first, Rest &&... rest ) { receiver.emplace_back( move( first.m_builder ) ); if constexpr( 0u != sizeof...(rest) ) move_sink_builder_to<In>( receiver, forward< Rest >(rest)... ); } template< typename In, typename Out, typename... Rest > vector< stage_builder_t > collect_sink_builders( stage_t< In, Out > && first, Rest &&... stages ) { vector< stage_builder_t > receiver; receiver.reserve( 1 + sizeof...(stages) ); move_sink_builder_to<In>( receiver, move(first), std::forward<Rest>(stages)... ); return receiver; } 

يعمل التحقق من وقت التحويل البرمجي للوقت هنا أيضًا: لأنه يتم استدعاء معلمة move_sink_builder_to بشكل واضح حسب النوع "في". يعني ذلك أن إجراء مكالمة في النموذج collect_sink_builders(stage_t<In1, Out1>, stage_t<In2, Out2>, ...) سيؤدي إلى حدوث خطأ في الترجمة لأن المحول البرمجي يحظر إجراء المكالمة move_sink_builder_to<In1>(receiver, stage_t<In2, Out2>, ...) .


يمكنني أيضًا ملاحظة أنه نظرًا لأن عدد خطوط الأنابيب الفرعية broadcast() معروف في وقت الترجمة ، يمكننا استخدام std::array بدلاً من std::vector ويمكن تجنب بعض عمليات تخصيص الذاكرة. لكن std::vector يستخدم هنا فقط للبساطة.


العلاقة بين المراحل ووكلاء / mboxes SObjectizer


الفكرة وراء تنفيذ خط الأنابيب هي إنشاء وكيل منفصل لكل مرحلة من مراحل خط الأنابيب. يتلقى الوكيل رسالة واردة ، ويمررها إلى معالج المرحلة المقابل ، ويحلل النتيجة ، وإذا لم تكن النتيجة فارغة ، يرسل النتيجة كرسالة واردة إلى المرحلة التالية. يمكن توضيحه بواسطة مخطط التسلسل التالي:



يجب مناقشة بعض الأشياء المرتبطة بـ SObjectizer ، على الأقل لفترة وجيزة. إذا لم تكن مهتمًا بمثل هذه التفاصيل ، يمكنك تخطي الأقسام أدناه والانتقال إلى الاستنتاج مباشرةً.


Coop هي مجموعة من العملاء للعمل معًا


يتم تقديم الوكلاء في SObjectizer ليس بشكل فردي ولكن في مجموعات باسم حظائر. حظيرة هي مجموعة من العوامل التي يجب أن تعمل معا وليس هناك معنى لمواصلة العمل إذا كان أحد وكلاء المجموعة مفقود.


لذا فإن إدخال العوامل إلى SObjectizer يشبه إنشاء مثيل حظيرة ، وملء هذا المثيل بالوكلاء المناسبين ثم تسجيل حظيرة في SObjectizer.


وبسبب ذلك ، فإن الحجة الأولى لباني المسرح هي إشارة إلى حظيرة جديدة. يتم إنشاء هذا حظيرة في وظيفة make_pipeline() (تمت مناقشتها أدناه) ، ثم يتم ملؤها بواسطة منشئي المسرح ثم يتم تسجيلها (مرة أخرى في وظيفة make_pipeline() ).


صناديق الرسائل


ينفذ SObjectizer العديد من النماذج المتعلقة التزامن. ممثل نموذج واحد منهم فقط. بسبب ذلك ، يمكن أن تختلف SObjectizer بشكل كبير عن أطر الفاعل الأخرى. أحد الاختلافات هو نظام العنونة للرسائل.


لا يتم توجيه الرسائل في SObjectizer إلى الجهات الفاعلة ، ولكن مربعات الرسائل (mboxes). يتعين على الممثلين الاشتراك في رسائل من mbox. إذا اشترك أحد الممثلين في نوع رسالة معين من mbox ، فسيتلقى رسائل من هذا النوع:



هذه الحقيقة ضرورية لأنه من الضروري إرسال رسائل من مرحلة إلى أخرى. وهذا يعني أن كل مرحلة يجب أن يكون لها mbox وأن mbox يجب أن تكون معروفة للمرحلة السابقة.


كل ممثل (الملقب وكيل) في SObjectizer لديه mbox المباشر . يرتبط mbox هذا بوكيل المالك فقط ولا يمكن استخدامه من قبل أي وكلاء آخرين. سيتم استخدام mboxes المباشر للعوامل التي تم إنشاؤها لمراحل التفاعل التفاعلي.


تحدد ميزة SObjectizer المحددة بعض تفاصيل تنفيذ خط الأنابيب.


الأول هو حقيقة أن باني المسرح لديه النموذج الأولي التالي:


 mbox_t builder(coop_t &, mbox_t); 

هذا يعني أن منشئ المرحلة يتلقى mbox من المرحلة التالية ويجب أن ينشئ وكيلًا جديدًا يرسل نتائج المرحلة إلى ذلك mbox. يجب إعادة mbox الخاص بالوكيل الجديد بواسطة باني المسرح . سيتم استخدام mbox لإنشاء وكيل للمرحلة السابقة.


والثاني هو حقيقة أن يتم إنشاء وكلاء للمراحل في ترتيب احتياطي. وهذا يعني أنه إذا كان لدينا خط أنابيب:


 stage(A) | stage(B) | stage(C) 

سيتم إنشاء وكيل للمرحلة C أولاً ، ثم سيتم استخدام mbox الخاص به لإنشاء وكيل للمرحلة B ، ومن ثم سيتم استخدام mbox من وكيل B- المرحلة لإنشاء وكيل للمرحلة A.


تجدر الإشارة إلى أن operator|() لا ينشئ وكلاء:


 stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } 

ينشئ operator|() منشئًا يستدعي منشئين آخرين فقط ولكن لا يقدم عوامل إضافية. لذلك بالنسبة للقضية:


 stage(A) | stage(B) 

سيتم إنشاء وكيلين فقط (للمرحلة A و B- المرحلة) وبعد ذلك سيتم ربطهما معًا في منشئ المرحلة الذي تم إنشاؤه بواسطة operator|() .


لا يوجد وكيل لتطبيق broadcast()


تتمثل إحدى الطرق الواضحة لتطبيق مرحلة البث في إنشاء وكيل خاص يتلقى رسالة واردة ثم يعيد إرسال هذه الرسالة إلى قائمة بمربعات mbox المقصودة. تم استخدام هذه الطريقة في التنفيذ الأول لخط أنابيب DSL الموصوف.


لكن مشروعنا المصاحب ، so5extra ، لديه الآن نوع خاص من mbox: إذاعة واحدة. يعمل mbox بالضبط على ما هو مطلوب هنا: فهو يأخذ رسالة جديدة ويقوم بتسليمها إلى مجموعة من mboxes الوجهة.


نظرًا لعدم وجود حاجة لإنشاء وكيل بث منفصل ، يمكننا فقط استخدام mbox للبث من so5extra:


 // // A special mbox for broadcasting of a message to a set of destination // mboxes. // using broadcast_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>; ... // // Inside the broadcast() function: // stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); // That is the creation of broadcasting mbox instance. return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; 

تنفيذ وكيل المرحلة


الآن يمكننا إلقاء نظرة على تنفيذ وكيل المرحلة:


 // // An agent which will be used as intermediate or terminal pipeline stage. // It will receive input message, call the stage handler and pass // handler result to the next stage (if any). // template< typename In, typename Out > class a_stage_point_t final : public agent_t { public : a_stage_point_t( context_t ctx, stage_handler_t< In, Out > handler, mbox_t next_stage ) : agent_t{ ctx } , m_handler{ move( handler ) } , m_next{ move(next_stage) } {} void so_define_agent() override { if( m_next ) // Because there is the next stage the appropriate // message handler will be used. so_subscribe_self().event( [=]( const In & evt ) { auto r = m_handler( evt ); if( r ) so_5::send( m_next, r ); } ); else // There is no next stage. A very simple message handler // will be used for that case. so_subscribe_self().event( [=]( const In & evt ) { m_handler( evt ); } ); } private : const stage_handler_t< In, Out > m_handler; const mbox_t m_next; }; // // A specialization of a_stage_point_t for the case of terminal stage of // a pipeline. This type will be used for stage handlers with void // return type. // template< typename In > class a_stage_point_t< In, void > final : public agent_t { public : a_stage_point_t( context_t ctx, stage_handler_t< In, void > handler, mbox_t next_stage ) : agent_t{ ctx } , m_handler{ move( handler ) } { if( next_stage ) throw std::runtime_error( "sink point cannot have next stage" ); } void so_define_agent() override { so_subscribe_self().event( [=]( const In & evt ) { m_handler( evt ); } ); } private : const stage_handler_t< In, void > m_handler; }; 

انها تافهة إلى حد ما إذا كنت تفهم أساسيات SObjectizer. إذا لم يكن الأمر كذلك ، فسيكون من الصعب للغاية شرحه بعدة كلمات (لذلك لا تتردد في طرح الأسئلة في التعليقات).


يقوم التطبيق الرئيسي لوكيل a_stage_point_t بإنشاء اشتراك في رسالة من النوع In. عند وصول رسالة من هذا النوع ، يتم استدعاء معالج المرحلة . في حالة إرجاع معالج المرحلة لنتيجة فعلية ، يتم إرسال النتيجة إلى المرحلة التالية (إذا كانت تلك المرحلة موجودة).


يوجد أيضًا إصدار a_stage_point_t للحالة عندما تكون المرحلة المقابلة هي المرحلة النهائية ولا يمكن أن تكون هناك المرحلة التالية.


قد يبدو تطبيق a_stage_point_t معقدًا بعض الشيء ولكن a_stage_point_t ، إنه أحد أبسط العوامل التي كتبتها.


وظيفة make_pipeline ()


حان الوقت لمناقشة آخر وظيفة لبناء خطوط الأنابيب ، make_pipeline() :


 template< typename In, typename Out, typename... Args > mbox_t make_pipeline( // SObjectizer Environment to work in. so_5::environment_t & env, // Definition of a pipeline. stage_t< In, Out > && sink, // Optional args to be passed to make_coop() function. Args &&... args ) { auto coop = env.make_coop( forward< Args >(args)... ); auto mbox = sink.m_builder( *coop, mbox_t{} ); env.register_coop( move(coop) ); return mbox; } 

لا يوجد سحر ولا مفاجآت هنا. نحتاج فقط إلى إنشاء حظيرة جديدة للعوامل الأساسية لخط الأنابيب ، وملء تلك القوقعة بالوكلاء عن طريق استدعاء منشئ مرحلة من المستوى الأعلى ، ومن ثم تسجيل ذلك القفاز في SObjectizer. هذا كل شيء.


نتيجة make_pipeline() هي mbox للمرحلة الأكثر يسارًا (الأولى) من خط الأنابيب. يجب استخدام mbox لإرسال الرسائل إلى خط الأنابيب.


المحاكاة والتجارب معها


إذن لدينا الآن أنواع البيانات ووظائفها لمنطق التطبيق لدينا والأدوات اللازمة لتسلسل تلك الوظائف إلى خط أنابيب لمعالجة البيانات. دعونا نفعل ذلك ونرى النتيجة:


 int main() { // Launch SObjectizer in a separate thread. wrapped_env_t sobj; // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) ); // Send messages to a pipeline in a loop with 10ms delays. for( uint8_t i = 0; i < static_cast< uint8_t >(250); i += 10 ) { send< raw_value >( pipeline, raw_measure{ 0, 0, i } ); std::this_thread::sleep_for( chrono::milliseconds{10} ); } } 

إذا قمنا بتشغيل هذا المثال ، فسنرى الإخراج التالي:


 archiving (0,0) distributing (0,0) archiving (0,5) distributing (0,5) archiving (0,10) distributing (0,10) archiving (0,15) distributing (0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,30) distributing (0,30) ... archiving (0,105) distributing (0,105) archiving (0,110) distributing (0,110) === alarm (0) === alarm_distribution (0) archiving (0,115) distributing (0,115) archiving (0,120) distributing (0,120) === alarm (0) === alarm_distribution (0) 

إنه يعمل.


لكن يبدو أن مراحل خط أنابيبنا تعمل بالتتابع ، واحدة تلو الأخرى ، أليس كذلك؟


نعم هو كذلك. وذلك لأن جميع وكلاء خطوط الأنابيب مرتبطون بـ مرسل SObjectizer الافتراضي. ويستخدم هذا المرسل خيط عامل واحد فقط لخدمة معالجة الرسائل لجميع الوكلاء.


ولكن هذا يمكن تغييره بسهولة. ما عليك make_pipeline() تمرير وسيطة إضافية make_pipeline() مكالمة make_pipeline() :


 // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), disp::thread_pool::make_dispatcher( sobj.environment() ).binder( disp::thread_pool::bind_params_t{}.fifo( disp::thread_pool::fifo_t::individual ) ) ); 

يؤدي هذا إلى إنشاء تجمع مؤشرات ترابط جديد ويربط كافة عوامل خط أنابيب إلى ذلك التجمع. سيتم تقديم الخدمات لكل وكيل بشكل مستقل عن الوكلاء الآخرين.


إذا قمنا بتشغيل المثال المعدل ، يمكننا أن نرى شيئًا كهذا:


 archiving (0,0) distributing (0,0) distributing (0,5) archiving (0,5) archiving (0,10) distributing (0,10) distributing (archiving (0,15) 0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,distributing (030) ,30) ... archiving (0,distributing (0,105) 105) archiving (0,alarm_distribution (0) distributing (0,=== alarm (0) === 110) 110) archiving (distributing (0,0,115) 115) archiving (distributing (=== alarm (0) === 0alarm_distribution (0) 0,120) ,120) 

لذلك يمكننا أن نرى أن المراحل المختلفة لخط الأنابيب تعمل بشكل متوازٍ.


ولكن هل من الممكن المضي قدمًا ولدينا القدرة على ربط المراحل بالمرسلين المختلفين؟


نعم ، هذا ممكن ، لكن يتعين علينا تنفيذ وظيفة حمل زائد أخرى stage() :


 template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( disp_binder_shptr_t disp_binder, Callable handler ) { stage_builder_t builder{ [binder = std::move(disp_binder), h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent_with_binder< a_stage_point_t<In, Out> >( std::move(binder), std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; } 

لا يقبل هذا الإصدار من stage() معالج المرحلة فحسب ، بل يقبل أيضًا رابط المرسِل. Dispatcher binder is a way to bind an agent to the particular dispatcher. So to assign a stage to a specific working context we can create an appropriate dispatcher and then pass the binder to that dispatcher to stage() function. Let's do that:


 // An active_obj dispatcher to be used for some stages. auto ao_disp = disp::active_obj::make_dispatcher( sobj.environment() ); // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(ao_disp.binder(), archiving), stage(ao_disp.binder(), distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(ao_disp.binder(), alarm_initiator), stage(ao_disp.binder(), []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), disp::one_thread::make_dispatcher( sobj.environment() ).binder() ); 

In that case stages archiving , distribution , alarm_initiator and alarm_distribution will work on own worker threads. All other stages will work on the same single worker thread.


The conclusion


This was an interesting experiment and I was surprised how easy SObjectizer could be used in something like reactive programming or data-flow programming.


However, I don't think that pipeline DSL can be practically meaningful. It's too simple and, maybe not flexible enough. But, I hope, it can be a base for more interesting experiments for those why need to deal with different workflows and data-processing pipelines. At least as a base for some ideas in that area. C++ language a rather good here and some (not so complicated) template magic can help to catch various errors at compile-time.


In conclusion, I want to say that we see SObjectizer not as a specialized tool for solving a particular problem, but as a basic set of tools to be used in solutions for different problems. And, more importantly, that basic set can be extended for your needs. Just take a look at SObjectizer , try it, and share your feedback. Maybe you missed something in SObjectizer? Perhaps you don't like something? Tell us , and we can try to help you.


If you want to help further development of SObjectizer, please share a reference to it or to this article somewhere you want (Reddit, HackerNews, LinkedIn, Facebook, Twitter, ...). The more attention and the more feedback, the more new features will be incorporated into SObjectizer.


And many thanks for reading this ;)


PS. The source code for that example can be found in that repository .

Source: https://habr.com/ru/post/ar460123/


All Articles