अभिनेताओं के शीर्ष पर एक घोषणात्मक डाटा-प्रोसेसिंग पाइपलाइन? क्यों नहीं?

कुछ समय पहले, SObjectizer के एक रिलीज़ पर चर्चा में, हमसे पूछा गया था: "क्या डेटा-प्रोसेसिंग पाइपलाइन का वर्णन करने के लिए डीएसएल बनाना संभव है?" दूसरे शब्दों में, क्या ऐसा कुछ लिखना संभव है:


A | B | C | D


और एक कार्यशील पाइपलाइन प्राप्त करें जहां संदेश ए से बी और फिर सी से जा रहे हैं, और फिर डी के साथ नियंत्रण है कि बी को ठीक उसी प्रकार प्राप्त होता है जो ए रिटर्न देता है। और C ठीक उसी प्रकार प्राप्त करता है जिस प्रकार B वापस आता है। और इसी तरह।


यह आश्चर्यजनक रूप से सरल समाधान के साथ एक दिलचस्प काम था। उदाहरण के लिए, यह है कि पाइपलाइन का निर्माण कैसा दिख सकता है:


 auto pipeline = make_pipeline(env, stage(A) | stage(B) | stage(C) | stage(D)); 

या, एक अधिक जटिल मामले में (जो नीचे चर्चा की जाएगी):


 auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) ); 

इस लेख में, हम ऐसे पाइपलाइन डीएसएल के कार्यान्वयन के बारे में बात करेंगे। हम stage() से संबंधित अधिकांश भागों stage() , broadcast() और operator|() C ++ टेम्पलेट्स के उपयोग के कई उदाहरणों के साथ कार्य करेंगे। इसलिए मुझे आशा है कि यह उन पाठकों के लिए भी दिलचस्प होगा, जिन्हें SObjectizer के बारे में जानकारी नहीं है (यदि आपने कभी भी SObjectizer के बारे में नहीं सुना है तो इस टूल का अवलोकन करें)।


प्रयुक्त डेमो के बारे में शब्दों की एक जोड़ी


लेख में प्रयुक्त उदाहरण SCADA क्षेत्र में मेरे पुराने (और बल्कि भूल गए) अनुभव से प्रभावित हुआ है।


डेमो का विचार कुछ सेंसर से पढ़े गए डेटा की हैंडलिंग है। डेटा को कुछ अवधि के साथ एक सेंसर से अधिग्रहित किया जाता है, फिर उस डेटा को मान्य किया जाना चाहिए (गलत डेटा को नजरअंदाज किया जाना चाहिए) और वास्तविक मूल्यों में बदल दिया जाता है। उदाहरण के लिए, एक सेंसर से पढ़ा गया कच्चा डेटा दो 8-बिट पूर्णांक मान हो सकता है और उन मानों को एक फ़्लोटिंग-पॉइंट नंबर में परिवर्तित किया जाना चाहिए।


फिर वैध और परिवर्तित मूल्यों को संग्रहीत किया जाना चाहिए, कहीं वितरित किया जाना चाहिए (उदाहरण के लिए अलग-अलग नोड्स पर, उदाहरण के लिए), "अलार्म" के लिए जाँच की जाती है (यदि मान सुरक्षित सीमाओं से बाहर हैं तो विशेष रूप से नियंत्रित किया जाना चाहिए)। ये ऑपरेशन स्वतंत्र हैं और समानांतर में किए जा सकते हैं।


पता चला अलार्म से संबंधित संचालन समानांतर में भी किया जा सकता है: एक "अलार्म" शुरू किया जाना चाहिए (इसलिए वर्तमान नोड पर SCADA का हिस्सा इस पर प्रतिक्रिया कर सकता है) और "अलार्म" के बारे में जानकारी अन्यत्र वितरित की जानी चाहिए (उदाहरण के लिए) : एक ऐतिहासिक डेटाबेस में संग्रहीत और / या SCADA ऑपरेटर के प्रदर्शन पर कल्पना)।


इस तर्क को इस तरह से पाठ रूप में व्यक्त किया जा सकता है:


 optional(valid_raw_data) = validate(raw_data); if valid_raw_data is not empty then { converted_value = convert(valid_raw_data); do_async archive(converted_value); do_async distribute(converted_value); do_async { optional(suspicious_value) = check_range(converted_value); if suspicious_value is not empty then { optional(alarm) = detect_alarm(suspicious_value); if alarm is not empty then { do_async initiate_alarm(alarm); do_async distribute_alarm(alam); } } } } 

या, चित्रमय रूप में:



यह एक कृत्रिम उदाहरण है, लेकिन इसमें कुछ दिलचस्प चीजें हैं जो मैं दिखाना चाहता हूं। पहला एक पाइप लाइन में समानांतर चरणों की उपस्थिति है (ऑपरेशन broadcast() बस उसी के कारण मौजूद है)। दूसरा कुछ अवस्थाओं में राज्य की उपस्थिति है। उदाहरण के लिए, अलार्म_डेटेटर स्टेटफुल स्टेज है।


पाइपलाइन की क्षमता


एक पाइपलाइन अलग-अलग चरणों से बनाई गई है। प्रत्येक चरण एक कार्य या निम्नलिखित प्रारूप का एक फ़नकार है:


 opt<Out> func(const In &); 

या


 void func(const In &); 

चरण जो void वापस करते हैं, उन्हें केवल एक पाइपलाइन के अंतिम चरण के रूप में इस्तेमाल किया जा सकता है।


चरण एक श्रृंखला में बंधे होते हैं। प्रत्येक अगले चरण में पिछले चरण द्वारा लौटाया गया ऑब्जेक्ट प्राप्त होता है। यदि पिछला चरण खाली हो जाता है opt<Out> मान फिर अगला चरण नहीं कहा जाता है।


एक विशेष broadcast चरण है। इसका निर्माण कई पाइपलाइनों से किया गया है। एक broadcast चरण पिछले चरण से एक वस्तु प्राप्त करता है और इसे प्रत्येक सहायक पाइपलाइन को प्रसारित करता है।


पाइपलाइन के दृष्टिकोण से, broadcast चरण निम्न प्रारूप के एक समारोह की तरह दिखता है:


 void func(const In &); 

क्योंकि broadcast चरण से कोई वापसी मूल्य नहीं है एक broadcast चरण केवल एक पाइपलाइन में अंतिम चरण हो सकता है।


पाइपलाइन चरण एक वैकल्पिक मान क्यों लौटाता है?


ऐसा इसलिए है क्योंकि कुछ आवक मूल्यों को छोड़ने की आवश्यकता है। उदाहरण के लिए, validate चरण एक कच्चा मान गलत होने पर कुछ भी नहीं लौटाता है, और इसे संभालने का कोई अर्थ नहीं है।


एक अन्य उदाहरण: alarm_detector चरण कुछ भी नहीं देता है यदि वर्तमान संदिग्ध मान एक नया अलार्म केस उत्पन्न नहीं करता है।


कार्यान्वयन का विवरण



आइए आवेदन तर्क से संबंधित डेटा प्रकारों और कार्यों से शुरू करें। चर्चा किए गए उदाहरण में, निम्न डेटा प्रकारों का उपयोग सूचनाओं को एक चरण से दूसरे चरण में भेजने के लिए किया जाता है:


 // Raw data from a sensor. struct raw_measure { int m_meter_id; uint8_t m_high_bits; uint8_t m_low_bits; }; // Type of input for validation stage with raw data from a sensor. struct raw_value { raw_measure m_data; }; // Type of input for conversion stage with valid raw data from a sensor. struct valid_raw_value { raw_measure m_data; }; // Data from a sensor after conversion to Celsius degrees. struct calculated_measure { int m_meter_id; float m_measure; }; // The type for result of conversion stage with converted data from a sensor. struct sensor_value { calculated_measure m_data; }; // Type with value which could mean a dangerous level of temperature. struct suspicious_value { calculated_measure m_data; }; // Type with information about detected dangerous situation. struct alarm_detected { int m_meter_id; }; 

हमारे पाइप लाइन के पहले चरण में raw_value का एक उदाहरण बन रहा है। इस raw_value में raw_measure ऑब्जेक्ट के रूप में एक सेंसर से प्राप्त जानकारी शामिल है। फिर raw_value में बदल valid_raw_value । तब valid_raw_value , calulated_measure के रूप में एक वास्तविक सेंसर के मूल्य के साथ valid_raw_value बदल गया। यदि sensor_value के sensor_value में एक संदिग्ध मान है, तो suspicious_value का एक उदाहरण उत्पन्न होता है। और उस suspicious_value alarm_detected बाद में alarm_detected उदाहरण में alarm_detected जा सकता है।


या, चित्रमय रूप में:



अब हम अपने पाइपलाइन चरणों के कार्यान्वयन पर एक नज़र डाल सकते हैं:


 // // The first stage of a pipeline. Validation of raw data from a sensor. // // Returns valid_raw_value or nothing if value is invalid. // stage_result_t< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return make_result< valid_raw_value >( v.m_data ); else return make_empty< valid_raw_value >(); } // // The second stage of a pipeline. Conversion from raw data to a value // in Celsius degrees. // stage_result_t< sensor_value > conversion( const valid_raw_value & v ) { return make_result< sensor_value >( calculated_measure{ v.m_data.m_meter_id, 0.5f * ((static_cast< uint16_t >( v.m_data.m_high_bits ) << 8) + v.m_data.m_low_bits) } ); } // // Simulation of the data archiving. // void archiving( const sensor_value & v ) { clog << "archiving (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } // // Simulation of the data distribution. // void distribution( const sensor_value & v ) { clog << "distributing (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } // // The first stage of a child pipeline at third level of the main pipeline. // // Checking for to high value of the temperature. // // Returns suspicious_value message or nothing. // stage_result_t< suspicious_value > range_checking( const sensor_value & v ) { if( v.m_data.m_measure >= 45.0f ) return make_result< suspicious_value >( v.m_data ); else return make_empty< suspicious_value >(); } // // The next stage of a child pipeline. // // Checks for two suspicious_value-es in 25ms time window. // class alarm_detector { using clock = chrono::steady_clock; public : stage_result_t< alarm_detected > operator()( const suspicious_value & v ) { if( m_previous ) if( *m_previous + chrono::milliseconds(25) > clock::now() ) { m_previous = nullopt; return make_result< alarm_detected >( v.m_data.m_meter_id ); } m_previous = clock::now(); return make_empty< alarm_detected >(); } private : optional< clock::time_point > m_previous; }; // // One of last stages of a child pipeline. // Imitates beginning of the alarm processing. // void alarm_initiator( const alarm_detected & v ) { clog << "=== alarm (" << v.m_meter_id << ") ===" << endl; } // // Another of last stages of a child pipeline. // Imitates distribution of the alarm. // void alarm_distribution( ostream & to, const alarm_detected & v ) { to << "alarm_distribution (" << v.m_meter_id << ")" << endl; } 

बस stage_result_t , make_result और make_empty जैसे सामान छोड़ें, हम अगले भाग में इसकी चर्चा करेंगे।


मुझे उम्मीद है कि उन चरणों के कोड बल्कि तुच्छ। एकमात्र भाग जिसे कुछ अतिरिक्त स्पष्टीकरण की आवश्यकता होती है, वह है alarm_detector चरण का कार्यान्वयन।


उस उदाहरण में, एक अलार्म केवल तभी शुरू किया जाता है जब 25ms टाइम विंडो में कम से कम दो संदिग्ध_वायु हों। इसलिए हमें alarm_detector स्टेज पर पिछले alarm_detector उदाहरण का समय याद रखना होगा। ऐसा इसलिए है क्योंकि alarm_detector एक फ़ंक्शन कॉल ऑपरेटर के साथ एक स्टेटफुल alarm_detector रूप में कार्यान्वित किया जाता है।


चरण :: std के बजाय SObjectizer का प्रकार लौटाते हैं :: वैकल्पिक


मैंने पहले कहा था कि मंच वैकल्पिक मूल्य लौटा सकता है। लेकिन std::optional का उपयोग कोड में नहीं किया जाता है, विभिन्न प्रकार के stage_result_t को चरणों के कार्यान्वयन में देखा जा सकता है।


ऐसा इसलिए है क्योंकि एसओबीजेलाइज़र के कुछ विशिष्ट यहां अपनी भूमिका निभाते हैं। लौटाए गए मानों को SObjectizer के एजेंटों (उर्फ अभिनेताओं) के बीच संदेशों के रूप में वितरित किया जाएगा। SObjectizer में प्रत्येक संदेश को गतिशील रूप से आवंटित ऑब्जेक्ट के रूप में भेजा जाता है। इसलिए हमारे यहां कुछ "अनुकूलन" हैं: std::optional की वापसी के बजाय std::optional और फिर एक नया संदेश ऑब्जेक्ट आवंटित करना, हम बस एक संदेश ऑब्जेक्ट आवंटित करते हैं और इसे एक स्मार्ट पॉइंटर लौटाते हैं।


वास्तव में, stage_result_t , एसओबीजेलाइज़र के साझा किए गए एनालॉग के लिए केवल एक stage_result_t है:


 template< typename M > using stage_result_t = message_holder_t< M >; 

और make_result और make_empty stage_result_t साथ या वास्तविक मूल्य के बिना निर्माण के लिए सिर्फ सहायक कार्य हैं:


 template< typename M, typename... Args > stage_result_t< M > make_result( Args &&... args ) { return stage_result_t< M >::make(forward< Args >(args)...); } template< typename M > stage_result_t< M > make_empty() { return stage_result_t< M >(); } 

सादगी के लिए यह कहना सुरक्षित है कि validation चरण इस तरह व्यक्त किया जा सकता है:


 std::shared_ptr< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return std::make_shared< valid_raw_value >( v.m_data ); else return std::shared_ptr< valid_raw_value >{}; } 

लेकिन, SObjectizer के विशिष्ट होने के कारण, हम std::shared_ptr share_ptr का उपयोग नहीं कर सकते हैं और so_5::message_holder_t प्रकार से so_5::message_holder_t । और हम उस विशिष्ट को stage_result_t पीछे stage_result_t , make_result और make_empty


stage_handler_t और stage_builder_t जुदाई


पाइपलाइन कार्यान्वयन का एक महत्वपूर्ण बिंदु स्टेज हैंडलर और स्टेज बिल्डर अवधारणाओं का पृथक्करण है। यह सरलता के लिए किया जाता है। इन अवधारणाओं की उपस्थिति ने मुझे पाइपलाइन की परिभाषा में दो कदम रखने की अनुमति दी।


पहले चरण में, एक उपयोगकर्ता पाइपलाइन चरणों का वर्णन करता है। नतीजतन, मुझे stage_t का एक उदाहरण मिलता है जो सभी पाइपलाइन चरणों को अंदर रखता है।


दूसरे चरण में, अंतर्निहित SObjectizer के एजेंटों का एक सेट बनाया गया है। वे एजेंट पिछले चरणों के परिणामों के साथ संदेश प्राप्त करते हैं और वास्तविक चरण संचालकों को कॉल करते हैं, फिर परिणामों को अगले चरणों में भेजते हैं।


लेकिन एजेंटों के इस सेट को बनाने के लिए हर चरण में एक स्टेज बिल्डर होना चाहिएस्टेज बिल्डर को एक कारखाने के रूप में देखा जा सकता है जो एक अंतर्निहित SObjectizer एजेंट बनाता है।


इसलिए हमारा निम्नलिखित संबंध है: प्रत्येक पाइपलाइन चरण दो ऑब्जेक्ट्स का निर्माण करता है: स्टेज हैंडलर जो स्टेज से संबंधित तर्क रखता है, और स्टेज बिल्डर जो उचित समय पर स्टेज हैंडलर को कॉल करने के लिए एक अंतर्निहित SObjectizer का एजेंट बनाता है:



स्टेज हैंडलर निम्नलिखित तरीके से दर्शाया गया है:


 template< typename In, typename Out > class stage_handler_t { public : using traits = handler_traits_t< In, Out >; using func_type = function< typename traits::output(const typename traits::input &) >; stage_handler_t( func_type handler ) : m_handler( move(handler) ) {} template< typename Callable > stage_handler_t( Callable handler ) : m_handler( handler ) {} typename traits::output operator()( const typename traits::input & a ) const { return m_handler( a ); } private : func_type m_handler; }; 

जहाँ handler_traits_t को निम्न तरीके से परिभाषित किया गया है:


 // // We have to deal with two types of stage handlers: // - intermediate handlers which will return some result (eg some new // message); // - terminal handlers which can return nothing (eg void instead of // stage_result_t<M>); // // This template with specialization defines `input` and `output` // aliases for both cases. // template< typename In, typename Out > struct handler_traits_t { using input = In; using output = stage_result_t< Out >; }; template< typename In > struct handler_traits_t< In, void > { using input = In; using output = void; }; 

स्टेज बिल्डर को सिर्फ std::function द्वारा दर्शाया जाता std::function :


 using stage_builder_t = function< mbox_t(coop_t &, mbox_t) >; 

सहायक प्रकार lambda_traits_t और callable_traits_t


क्योंकि चरणों को नि: शुल्क फ़ंक्शन या alarm_detector द्वारा दर्शाया जा सकता है (जैसे कि alarm_detector क्लास के उदाहरण या अनाम कंपाइलर-जनरेट किए गए वर्ग जो लैम्ब्डा का प्रतिनिधित्व करते हैं), हमें चरण के तर्क और रिटर्न वैल्यू के प्रकारों का पता लगाने के लिए कुछ सहायकों की आवश्यकता है। मैंने उस उद्देश्य के लिए निम्नलिखित कोड का उपयोग किया:


 // // Helper type for `arg_type` and `result_type` alises definition. // template< typename R, typename A > struct callable_traits_typedefs_t { using arg_type = A; using result_type = R; }; // // Helper type for dealing with stateful objects with operator() // (they could be user-defined objects or generated by compiler // like lambdas). // template< typename T > struct lambda_traits_t; template< typename M, typename A, typename T > struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) const > : public callable_traits_typedefs_t< M, A > {}; template< typename A, typename T > struct lambda_traits_t< void (T::*)(const A &) const > : public callable_traits_typedefs_t< void, A > {}; template< typename M, typename A, typename T > struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) > : public callable_traits_typedefs_t< M, A > {}; template< typename A, typename T > struct lambda_traits_t< void (T::*)(const A &) > : public callable_traits_typedefs_t< void, A > {}; // // Main type for definition of `arg_type` and `result_type` aliases. // With specialization for various cases. // template< typename T > struct callable_traits_t : public lambda_traits_t< decltype(&T::operator()) > {}; template< typename M, typename A > struct callable_traits_t< stage_result_t< M >(*)(const A &) > : public callable_traits_typedefs_t< M, A > {}; template< typename A > struct callable_traits_t< void(*)(const A &) > : public callable_traits_typedefs_t< void, A > {}; 

मुझे उम्मीद है कि यह कोड C ++ के अच्छे ज्ञान वाले पाठकों के लिए काफी समझ में आएगा। यदि नहीं, तो बेझिझक मुझे टिप्पणियों में पूछें, मुझे विवरण में lambda_traits_t और callable_traits_t पीछे के तर्क की व्याख्या करने में खुशी होगी।


चरण (), प्रसारण () और ऑपरेटर | () फ़ंक्शन


अब हम मुख्य पाइपलाइन-निर्माण कार्यों के अंदर देख सकते हैं। लेकिन इससे पहले, एक खाका वर्ग के स्तर की परिभाषा पर एक नज़र stage_t :


 template< typename In, typename Out > struct stage_t { stage_builder_t m_builder; }; 

यह एक बहुत ही सरल संरचना है, जो कि केवल stage_bulder_t उदाहरण प्रस्तुत करती है। टेम्पलेट पैरामीटर stage_t अंदर उपयोग नहीं किए stage_t , इसलिए वे यहां क्यों मौजूद हैं?


वे पाइपलाइन चरणों के बीच संगतता के संकलन-समय की जाँच के लिए आवश्यक हैं। हम जल्द ही देखेंगे।


आइए सरलतम पाइपलाइन-निर्माण कार्य को देखें, stage() :


 template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( Callable handler ) { stage_builder_t builder{ [h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent< a_stage_point_t<In, Out> >( std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; } 

यह एक एकल पैरामीटर के रूप में एक वास्तविक चरण हैंडलर प्राप्त करता है। यह एक फंक्शन या लैम्ब्डा-फंक्शन या फन्क्टर का पॉइंटर हो सकता है। callable_traits_t टेम्पलेट के पीछे "टेम्पलेट मैजिक" के कारण चरण के इनपुट और आउटपुट स्वचालित रूप से कट जाते हैं।


स्टेज बिल्डर का एक उदाहरण अंदर बनाया गया है और उस stage() के परिणाम के रूप में एक नया stage_t ऑब्जेक्ट में वापस आ गया है। एक वास्तविक स्टेज हैंडलर को स्टेज बिल्डर लैम्ब्डा द्वारा कैप्चर किया जाता है, इसका उपयोग तब अंतर्निहित SObjectizer के एजेंट के निर्माण के लिए किया जाएगा (हम अगले भाग में इसके बारे में बोलेंगे)।


समीक्षा करने के लिए अगला कार्य operator|() जो दो चरणों को एक साथ जोड़ता है और एक नया चरण लौटाता है:


 template< typename In, typename Out1, typename Out2 > stage_t< In, Out2 > operator|( stage_t< In, Out1 > && prev, stage_t< Out1, Out2 > && next ) { return { stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } }; } 

operator|() का तर्क समझाने का सबसे सरल तरीका operator|() एक चित्र बनाने की कोशिश करना है। मान लेते हैं कि हमारे पास अभिव्यक्ति है:


 stage(A) | stage(B) | stage(C) | stage(B) 

यह अभिव्यक्ति इस तरह से बदल जाएगी:



वहां हम यह भी देख सकते हैं कि कंपाइल-टाइम टाइप-चेकिंग कैसे काम कर रही है: operator|() की परिभाषा operator|() आवश्यकता है कि पहले चरण का आउटपुट प्रकार दूसरे चरण का इनपुट है। यदि ऐसा नहीं है तो कोड संकलित नहीं किया जाएगा।


और अब हम सबसे जटिल पाइपलाइन-बिल्डिंग फ़ंक्शन, broadcast() पर एक नज़र डाल सकते हैं। समारोह ही सरल है:


 template< typename In, typename Out, typename... Rest > stage_t< In, void > broadcast( stage_t< In, Out > && first, Rest &&... stages ) { stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; return { std::move(builder) }; } 

एक साधारण चरण और प्रसारण-मंच के बीच मुख्य अंतर यह है कि प्रसारण-चरण में सहायक चरण बिल्डरों का वेक्टर पकड़ना होता है। इसलिए हमें उस वेक्टर को बनाना होगा और इसे ब्रॉडकास्ट-स्टेज के मुख्य स्टेज बिल्डर में डालना होगा। उसकी वजह से, हम broadcast() अंदर एक collect_sink_builders की कैप्चर सूची में collect_sink_builders कॉल को देख सकते हैं:


 stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] 

यदि हम collect_sink_builder में देखते हैं, तो हम निम्नलिखित कोड देखेंगे:


 // // Serie of helper functions for building description for // `broadcast` stage. // // Those functions are used for collecting // `builders` functions for every child pipeline. // // Please note that this functions checks that each child pipeline has the // same In type. // template< typename In, typename Out, typename... Rest > void move_sink_builder_to( vector< stage_builder_t > & receiver, stage_t< In, Out > && first, Rest &&... rest ) { receiver.emplace_back( move( first.m_builder ) ); if constexpr( 0u != sizeof...(rest) ) move_sink_builder_to<In>( receiver, forward< Rest >(rest)... ); } template< typename In, typename Out, typename... Rest > vector< stage_builder_t > collect_sink_builders( stage_t< In, Out > && first, Rest &&... stages ) { vector< stage_builder_t > receiver; receiver.reserve( 1 + sizeof...(stages) ); move_sink_builder_to<In>( receiver, move(first), std::forward<Rest>(stages)... ); return receiver; } 

कंपाइल-टाइम टाइप-चेकिंग यहाँ भी काम करती है: यह इसलिए है क्योंकि 'in' टाइप करके स्पष्ट रूप से पैरामीटर किए जाने वाले move_sink_builder_to को कॉल किया जाता है। इसका मतलब है कि फॉर्म में एक कॉल collect_sink_builders(stage_t<In1, Out1>, stage_t<In2, Out2>, ...) संकलन त्रुटि का कारण move_sink_builder_to<In1>(receiver, stage_t<In2, Out2>, ...) क्योंकि कंपाइलर move_sink_builder_to<In1>(receiver, stage_t<In2, Out2>, ...)


मैं यह भी नोट कर सकता हूं कि क्योंकि broadcast() लिए सहायक पाइपलाइनों की गिनती broadcast() संकलन-समय पर ज्ञात है, हम std::vector बजाय std::array उपयोग कर सकते हैं और कुछ मेमोरी आवंटन से बच सकते हैं। लेकिन std::vector का उपयोग यहाँ केवल सादगी के लिए किया जाता है।


चरणों और SObjectizer के एजेंटों / mboxes के बीच संबंध


पाइपलाइन के कार्यान्वयन के पीछे का विचार हर पाइपलाइन चरण के लिए एक अलग एजेंट का निर्माण है। एक एजेंट एक इनकमिंग संदेश प्राप्त करता है, उसे संबंधित चरण हैंडलर को भेजता है, परिणाम का विश्लेषण करता है और, यदि परिणाम खाली नहीं है, तो परिणाम को अगले चरण में आने वाले संदेश के रूप में भेजता है। यह निम्नलिखित अनुक्रम आरेख द्वारा चित्रित किया जा सकता है:



कुछ SObjectizer से संबंधित चीजों पर चर्चा की जानी है, कम से कम संक्षेप में। यदि आपको ऐसे विवरणों में कोई दिलचस्पी नहीं है, तो आप नीचे दिए गए अनुभागों को छोड़ सकते हैं और सीधे निष्कर्ष पर जा सकते हैं।


कूपर एक साथ काम करने के लिए एजेंटों का एक समूह है


एजेंटों को व्यक्तिगत रूप से नहीं बल्कि कॉप्स नाम के समूहों में एसोबिजाइज़र में पेश किया जाता है। कॉप एजेंटों का एक समूह है जिसे एक साथ काम करना चाहिए और समूह के किसी एक एजेंट के गायब होने पर काम जारी रखने का कोई मतलब नहीं है।


अतः SObjectizer में एजेंटों का परिचय, कॉप उदाहरण के निर्माण की तरह दिखता है, उस उदाहरण को उपयुक्त एजेंटों के साथ भरना और फिर कॉप को SObjectizer में पंजीकृत करना।


उसके कारण एक स्टेज बिल्डर के लिए पहला तर्क एक नए कॉप का संदर्भ है। यह कॉप make_pipeline() फ़ंक्शन (नीचे चर्चा की गई) में बनाया गया है, फिर यह स्टेज बिल्डरों द्वारा पॉपुलेटेड है और फिर पंजीकृत (फिर से make_pipeline() फ़ंक्शन) में बनाया गया है।


संदेश बॉक्स


SObjectizer कई संगामिति-संबंधित मॉडल लागू करता है। अभिनेता मॉडल उनमें से सिर्फ एक है। उस वजह से, एसोबिजाइज़र अन्य अभिनेता फ्रेमवर्क से काफी भिन्न हो सकता है। एक अंतर संदेशों के लिए संबोधन योजना है।


SObjectizer में संदेश अभिनेताओं को नहीं बल्कि संदेश बॉक्स (mboxes) को संबोधित किया जाता है। अभिनेताओं को एक mbox से संदेशों की सदस्यता लेनी होती है। यदि कोई अभिनेता किसी विशेष संदेश प्रकार से mbox से सदस्यता लेता है, तो उसे उस प्रकार के संदेश प्राप्त होंगे:



यह तथ्य महत्वपूर्ण है क्योंकि एक चरण से दूसरे चरण में संदेश भेजना आवश्यक है। इसका मतलब है कि हर चरण में उसका बॉक्स होना चाहिए और उस बॉक्स को पिछले चरण के लिए जाना जाना चाहिए।


SObjectizer में प्रत्येक अभिनेता (उर्फ एजेंट) का प्रत्यक्ष बॉक्स है। यह एमबॉक्स केवल मालिक एजेंट के साथ जुड़ा हुआ है और किसी अन्य एजेंट द्वारा उपयोग नहीं किया जा सकता है। चरणों के लिए बनाए गए एजेंटों के प्रत्यक्ष बॉक्स का उपयोग चरणों की बातचीत के लिए किया जाएगा।


यह SObjectizer की विशिष्ट विशेषता कुछ पाइपलाइन-कार्यान्वयन विवरणों को निर्धारित करती है।


पहला तथ्य यह है कि स्टेज बिल्डर में निम्नलिखित प्रोटोटाइप है:


 mbox_t builder(coop_t &, mbox_t); 

इसका मतलब है कि स्टेज बिल्डर को अगले चरण का एक बॉक्स प्राप्त होता है और उसे एक नया एजेंट बनाना चाहिए जो उस बॉक्स में स्टेज के परिणाम भेजेगा। नए एजेंट का एक बॉक्स स्टेज बिल्डर द्वारा लौटाया जाना चाहिए। उस mbox का उपयोग पिछले चरण के लिए एक एजेंट के निर्माण के लिए किया जाएगा।


दूसरा तथ्य यह है कि चरणों के लिए एजेंट आरक्षित क्रम में बनाए जाते हैं। इसका मतलब है कि अगर हमारे पास पाइपलाइन है:


 stage(A) | stage(B) | stage(C) 

स्टेज सी के लिए एक एजेंट पहले बनाया जाएगा, फिर इसके एमबॉक्स का उपयोग स्टेज बी के लिए एजेंट के निर्माण के लिए किया जाएगा, और फिर स्टेज ए के लिए एजेंट के निर्माण के लिए बी-स्टेज एजेंट के एमबॉक्स का उपयोग किया जाएगा।


यह भी ध्यान देने योग्य है कि operator|() एजेंट नहीं बनाता है:


 stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } 

operator|() एक बिल्डर बनाता है जो केवल अन्य बिल्डरों को कॉल करता है लेकिन अतिरिक्त एजेंटों का परिचय नहीं देता है। तो मामले के लिए:


 stage(A) | stage(B) 

केवल दो एजेंट बनाए जाएंगे (ए-स्टेज और बी-स्टेज के लिए) और फिर उन्हें operator|() द्वारा बनाए गए स्टेज बिल्ड में एक साथ जोड़ा जाएगा। operator|()


broadcast() कार्यान्वयन के लिए कोई एजेंट नहीं है


प्रसारण चरण को लागू करने का एक स्पष्ट तरीका एक विशेष एजेंट बनाना है जो आने वाले संदेश प्राप्त करेगा और फिर उस संदेश को गंतव्य mboxes की सूची में फिर से भेज देगा। इस तरह का उपयोग वर्णित पाइपलाइन डीएसएल के पहले कार्यान्वयन में किया गया था


लेकिन हमारा साथी प्रोजेक्ट, so5extra , अब mbox का एक विशेष संस्करण है: एक प्रसारण। वह एमबॉक्स ठीक वही करता है जो यहां आवश्यक है: यह एक नया संदेश लेता है और इसे डेस्टिनेशन बॉक्स के सेट तक पहुंचाता है।


इसकी वजह से अलग प्रसारण एजेंट बनाने की कोई आवश्यकता नहीं है, हम सिर्फ so5extra से प्रसारण mbox का उपयोग कर सकते हैं:


 // // A special mbox for broadcasting of a message to a set of destination // mboxes. // using broadcast_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>; ... // // Inside the broadcast() function: // stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); // That is the creation of broadcasting mbox instance. return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; 

स्टेज-एजेंट का कार्यान्वयन


अब हम स्टेज एजेंट के कार्यान्वयन पर एक नज़र डाल सकते हैं:


 // // An agent which will be used as intermediate or terminal pipeline stage. // It will receive input message, call the stage handler and pass // handler result to the next stage (if any). // template< typename In, typename Out > class a_stage_point_t final : public agent_t { public : a_stage_point_t( context_t ctx, stage_handler_t< In, Out > handler, mbox_t next_stage ) : agent_t{ ctx } , m_handler{ move( handler ) } , m_next{ move(next_stage) } {} void so_define_agent() override { if( m_next ) // Because there is the next stage the appropriate // message handler will be used. so_subscribe_self().event( [=]( const In & evt ) { auto r = m_handler( evt ); if( r ) so_5::send( m_next, r ); } ); else // There is no next stage. A very simple message handler // will be used for that case. so_subscribe_self().event( [=]( const In & evt ) { m_handler( evt ); } ); } private : const stage_handler_t< In, Out > m_handler; const mbox_t m_next; }; // // A specialization of a_stage_point_t for the case of terminal stage of // a pipeline. This type will be used for stage handlers with void // return type. // template< typename In > class a_stage_point_t< In, void > final : public agent_t { public : a_stage_point_t( context_t ctx, stage_handler_t< In, void > handler, mbox_t next_stage ) : agent_t{ ctx } , m_handler{ move( handler ) } { if( next_stage ) throw std::runtime_error( "sink point cannot have next stage" ); } void so_define_agent() override { so_subscribe_self().event( [=]( const In & evt ) { m_handler( evt ); } ); } private : const stage_handler_t< In, void > m_handler; }; 

यदि आप SObjectizer की मूल बातें समझते हैं तो यह बहुत तुच्छ है। यदि नहीं तो कुछ शब्दों में समझाना काफी कठिन होगा (इसलिए बेझिझक टिप्पणी में सवाल पूछ सकते हैं)।


a_stage_point_t एजेंट का मुख्य कार्यान्वयन प्रकार के संदेश के लिए एक सदस्यता बनाता है। जब इस प्रकार का संदेश आता है तो स्टेज हैंडलर कहलाता है। यदि स्टेज हैंडलर वास्तविक परिणाम देता है, तो परिणाम अगले चरण में भेजा जाता है (यदि वह चरण मौजूद है)।


मामले के लिए a_stage_point_t का एक संस्करण भी है जब संबंधित चरण टर्मिनल चरण है और अगला चरण नहीं हो सकता है।


a_stage_point_t का कार्यान्वयन थोड़ा जटिल लग सकता है, लेकिन मेरा विश्वास करो, यह मेरे द्वारा लिखे गए सबसे सरल एजेंटों में से एक है।


make_pipeline () फ़ंक्शन


यह अंतिम पाइपलाइन-निर्माण कार्य, make_pipeline() पर चर्चा करने का समय है:


 template< typename In, typename Out, typename... Args > mbox_t make_pipeline( // SObjectizer Environment to work in. so_5::environment_t & env, // Definition of a pipeline. stage_t< In, Out > && sink, // Optional args to be passed to make_coop() function. Args &&... args ) { auto coop = env.make_coop( forward< Args >(args)... ); auto mbox = sink.m_builder( *coop, mbox_t{} ); env.register_coop( move(coop) ); return mbox; } 

यहां न कोई जादू है और न ही कोई आश्चर्य। हमें पाइपलाइन के अंतर्निहित एजेंटों के लिए एक नया कॉप बनाने की आवश्यकता है, उस कॉप को एजेंटों के साथ एक शीर्ष-स्तरीय स्टेज बिल्डर को कॉल करके भरें, और फिर उस कॉप को सोबिजाइज़र में पंजीकृत करें। वह सब।


make_pipeline() का परिणाम पाइप लाइन के बाएं-सबसे (पहले) चरण का make_pipeline() । उस mbox का उपयोग पाइपलाइन को संदेश भेजने के लिए किया जाना चाहिए।


सिमुलेशन और इसके साथ प्रयोग


तो अब हमारे पास हमारे एप्लिकेशन लॉजिक के लिए डेटा प्रकार और फ़ंक्शंस हैं और उन फ़ंक्शंस को डेटा-प्रोसेसिंग पाइपलाइन में चलाने के लिए उपकरण हैं। चलो इसे करते हैं और एक परिणाम देखते हैं:


 int main() { // Launch SObjectizer in a separate thread. wrapped_env_t sobj; // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) ); // Send messages to a pipeline in a loop with 10ms delays. for( uint8_t i = 0; i < static_cast< uint8_t >(250); i += 10 ) { send< raw_value >( pipeline, raw_measure{ 0, 0, i } ); std::this_thread::sleep_for( chrono::milliseconds{10} ); } } 

यदि हम उस उदाहरण को चलाते हैं तो हम निम्नलिखित आउटपुट देखेंगे:


 archiving (0,0) distributing (0,0) archiving (0,5) distributing (0,5) archiving (0,10) distributing (0,10) archiving (0,15) distributing (0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,30) distributing (0,30) ... archiving (0,105) distributing (0,105) archiving (0,110) distributing (0,110) === alarm (0) === alarm_distribution (0) archiving (0,115) distributing (0,115) archiving (0,120) distributing (0,120) === alarm (0) === alarm_distribution (0) 

यह काम करता है।


लेकिन ऐसा लगता है कि हमारी पाइपलाइन के चरण क्रमिक रूप से काम करते हैं, एक के बाद एक, है न?


हाँ, यह है। ऐसा इसलिए है क्योंकि सभी पाइपलाइन एजेंट डिफ़ॉल्ट SObjectizer के डिस्पैचर से बंधे हैं। और वह डिस्पैचर सभी एजेंटों के संदेश प्रसंस्करण की सेवा के लिए सिर्फ एक श्रमिक धागे का उपयोग करता है।


लेकिन इसे आसानी से बदला जा सकता है। बस make_pipeline() कॉल के लिए एक अतिरिक्त तर्क पास करें:


 // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), disp::thread_pool::make_dispatcher( sobj.environment() ).binder( disp::thread_pool::bind_params_t{}.fifo( disp::thread_pool::fifo_t::individual ) ) ); 

यह एक नया थ्रेड पूल बनाता है और उस पूल में सभी पाइपलाइन एजेंटों को बांधता है। प्रत्येक एजेंट को अन्य एजेंटों से स्वतंत्र रूप से पूल द्वारा सेवा दी जाएगी।


यदि हम संशोधित उदाहरण चलाते हैं तो हम कुछ ऐसा देख सकते हैं:


 archiving (0,0) distributing (0,0) distributing (0,5) archiving (0,5) archiving (0,10) distributing (0,10) distributing (archiving (0,15) 0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,distributing (030) ,30) ... archiving (0,distributing (0,105) 105) archiving (0,alarm_distribution (0) distributing (0,=== alarm (0) === 110) 110) archiving (distributing (0,0,115) 115) archiving (distributing (=== alarm (0) === 0alarm_distribution (0) 0,120) ,120) 

इसलिए हम देख सकते हैं कि पाइपलाइन के विभिन्न चरण समानांतर में काम करते हैं।


लेकिन क्या आगे जाकर विभिन्न डिस्पैचरों में चरणों को बांधने की क्षमता है?


हां, यह संभव है, लेकिन हमें stage() फ़ंक्शन के लिए एक और अधिभार लागू करना होगा:


 template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( disp_binder_shptr_t disp_binder, Callable handler ) { stage_builder_t builder{ [binder = std::move(disp_binder), h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent_with_binder< a_stage_point_t<In, Out> >( std::move(binder), std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; } 

This version of stage() accepts not only a stage handler but also a dispatcher binder. Dispatcher binder is a way to bind an agent to the particular dispatcher. So to assign a stage to a specific working context we can create an appropriate dispatcher and then pass the binder to that dispatcher to stage() function. Let's do that:


 // An active_obj dispatcher to be used for some stages. auto ao_disp = disp::active_obj::make_dispatcher( sobj.environment() ); // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(ao_disp.binder(), archiving), stage(ao_disp.binder(), distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(ao_disp.binder(), alarm_initiator), stage(ao_disp.binder(), []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), disp::one_thread::make_dispatcher( sobj.environment() ).binder() ); 

In that case stages archiving , distribution , alarm_initiator and alarm_distribution will work on own worker threads. All other stages will work on the same single worker thread.


The conclusion


This was an interesting experiment and I was surprised how easy SObjectizer could be used in something like reactive programming or data-flow programming.


However, I don't think that pipeline DSL can be practically meaningful. It's too simple and, maybe not flexible enough. But, I hope, it can be a base for more interesting experiments for those why need to deal with different workflows and data-processing pipelines. At least as a base for some ideas in that area. C++ language a rather good here and some (not so complicated) template magic can help to catch various errors at compile-time.


In conclusion, I want to say that we see SObjectizer not as a specialized tool for solving a particular problem, but as a basic set of tools to be used in solutions for different problems. And, more importantly, that basic set can be extended for your needs. Just take a look at SObjectizer , try it, and share your feedback. Maybe you missed something in SObjectizer? Perhaps you don't like something? Tell us , and we can try to help you.


If you want to help further development of SObjectizer, please share a reference to it or to this article somewhere you want (Reddit, HackerNews, LinkedIn, Facebook, Twitter, ...). The more attention and the more feedback, the more new features will be incorporated into SObjectizer.


And many thanks for reading this ;)


पुनश्च। The source code for that example can be found in that repository .

Source: https://habr.com/ru/post/hi460123/


All Articles