Saluran pemrosesan data deklaratif di atas aktor? Kenapa tidak

Beberapa waktu lalu, dalam sebuah diskusi di salah satu rilis SObjectizer, kami ditanya: "Apakah mungkin membuat DSL untuk menggambarkan pipa pemrosesan data?" Dengan kata lain, apakah mungkin untuk menulis sesuatu seperti itu:


A | B | C | D


dan dapatkan jalur pipa yang berfungsi di mana pesan pergi dari A ke B, lalu ke C, dan kemudian ke D. Dengan kontrol B menerima persis tipe yang dikembalikan A. Dan C menerima persis tipe yang dikembalikan B. Dan sebagainya.


Itu adalah tugas yang menarik dengan solusi yang sangat sederhana. Misalnya, begitulah pembuatan saluran pipa terlihat seperti:


 auto pipeline = make_pipeline(env, stage(A) | stage(B) | stage(C) | stage(D)); 

Atau, dalam kasus yang lebih kompleks (yang akan dibahas di bawah):


 auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) ); 

Pada artikel ini, kita akan berbicara tentang implementasi DSL pipeline tersebut. Kami akan membahas sebagian besar bagian yang terkait dengan fungsi stage() , broadcast() dan operator|() dengan beberapa contoh penggunaan templat C ++. Jadi saya berharap ini akan menarik bahkan bagi pembaca yang tidak tahu tentang SObjectizer (jika Anda belum pernah mendengar tentang SObjectizer di sini adalah gambaran umum alat ini).


Beberapa kata tentang demo yang digunakan


Contoh yang digunakan dalam artikel ini dipengaruhi oleh pengalaman saya yang lama (dan agak terlupakan) di area SCADA.


Ide dari demo adalah penanganan data yang dibaca dari beberapa sensor. Data diperoleh dari sensor dengan beberapa periode, maka data tersebut harus divalidasi (data yang salah harus diabaikan) dan dikonversi menjadi beberapa nilai aktual. Misalnya, data mentah yang dibaca dari sensor dapat berupa dua nilai integer 8-bit dan nilai-nilai tersebut harus dikonversi menjadi satu angka floating-point.


Kemudian nilai yang valid dan dikonversi harus diarsipkan, didistribusikan di suatu tempat (pada node yang berbeda untuk visualisasi, misalnya), diperiksa untuk "alarm" (jika nilai berada di luar rentang aman maka itu harus ditangani secara khusus). Operasi-operasi ini independen dan dapat dilakukan secara paralel.


Operasi yang terkait dengan alarm yang terdeteksi dapat dilakukan secara paralel juga: "alarm" harus dimulai (sehingga bagian SCADA pada node saat ini dapat bereaksi padanya) dan informasi tentang "alarm" harus didistribusikan di tempat lain (misalnya : disimpan ke database historis dan / atau divisualisasikan pada layar operator SCADA).


Logika ini dapat diekspresikan dalam bentuk tekstual seperti itu:


 optional(valid_raw_data) = validate(raw_data); if valid_raw_data is not empty then { converted_value = convert(valid_raw_data); do_async archive(converted_value); do_async distribute(converted_value); do_async { optional(suspicious_value) = check_range(converted_value); if suspicious_value is not empty then { optional(alarm) = detect_alarm(suspicious_value); if alarm is not empty then { do_async initiate_alarm(alarm); do_async distribute_alarm(alam); } } } } 

Atau, dalam bentuk grafis:



Ini contoh yang agak tiruan, tetapi ada beberapa hal menarik yang ingin saya perlihatkan. Yang pertama adalah adanya tahapan paralel dalam pipa ( broadcast() operasi broadcast() ada hanya karena itu). Yang kedua adalah kehadiran suatu negara dalam beberapa tahap. Sebagai contoh, alarm_detector adalah stateful stage.


Kemampuan saluran pipa


Saluran pipa dibangun dari tahapan terpisah. Setiap tahap adalah fungsi atau fungsi dari format berikut:


 opt<Out> func(const In &); 

atau


 void func(const In &); 

Tahapan yang mengembalikan void hanya dapat digunakan sebagai tahap terakhir dari pipa.


Tahapan terikat ke dalam rantai. Setiap tahap berikutnya menerima objek yang dikembalikan oleh tahap sebelumnya. Jika tahap sebelumnya mengembalikan nilai opt<Out> kosong opt<Out> maka tahap selanjutnya tidak dipanggil.


Ada panggung broadcast khusus. Itu dibangun dari beberapa pipa. Tahap broadcast menerima objek dari tahap sebelumnya dan menyiarkannya ke setiap pipa anak perusahaan.


Dari sudut pandang pipa, tahap broadcast tampak seperti fungsi dari format berikut:


 void func(const In &); 

Karena tidak ada nilai balik dari tahap broadcast tahap broadcast hanya bisa menjadi tahap terakhir dalam saluran pipa.


Mengapa tahap pipa mengembalikan nilai opsional?


Itu karena ada kebutuhan untuk menjatuhkan beberapa nilai yang masuk. Misalnya, tahap validate tidak mengembalikan apa pun jika nilai mentah salah, dan tidak ada gunanya menanganinya.


Contoh lain: tahap alarm_detector tidak mengembalikan apa pun jika nilai mencurigakan saat ini tidak menghasilkan kasus alarm baru.


Detail implementasi



Mari kita mulai dari tipe data dan fungsi yang terkait dengan logika aplikasi. Dalam contoh yang dibahas, tipe data berikut digunakan untuk meneruskan informasi dari satu tahap ke tahap lainnya:


 // Raw data from a sensor. struct raw_measure { int m_meter_id; uint8_t m_high_bits; uint8_t m_low_bits; }; // Type of input for validation stage with raw data from a sensor. struct raw_value { raw_measure m_data; }; // Type of input for conversion stage with valid raw data from a sensor. struct valid_raw_value { raw_measure m_data; }; // Data from a sensor after conversion to Celsius degrees. struct calculated_measure { int m_meter_id; float m_measure; }; // The type for result of conversion stage with converted data from a sensor. struct sensor_value { calculated_measure m_data; }; // Type with value which could mean a dangerous level of temperature. struct suspicious_value { calculated_measure m_data; }; // Type with information about detected dangerous situation. struct alarm_detected { int m_meter_id; }; 

Sebuah instance dari raw_value akan menuju ke tahap pertama dari pipeline kami. Nilai raw_value ini berisi informasi yang diperoleh dari sensor dalam bentuk objek raw_measure . Kemudian raw_value ditransformasikan menjadi valid_raw_value . Kemudian valid_raw_value ditransformasikan menjadi sensor_value dengan nilai sensor aktual dalam bentuk calulated_measure . Jika instance sensor_value berisi nilai mencurigakan, maka instance suspicious_value diproduksi. Dan suspicious_value dapat diubah menjadi instance alarm_detected nanti.


Atau, dalam bentuk grafis:



Sekarang kita bisa melihat implementasi dari tahapan pipeline kami:


 // // The first stage of a pipeline. Validation of raw data from a sensor. // // Returns valid_raw_value or nothing if value is invalid. // stage_result_t< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return make_result< valid_raw_value >( v.m_data ); else return make_empty< valid_raw_value >(); } // // The second stage of a pipeline. Conversion from raw data to a value // in Celsius degrees. // stage_result_t< sensor_value > conversion( const valid_raw_value & v ) { return make_result< sensor_value >( calculated_measure{ v.m_data.m_meter_id, 0.5f * ((static_cast< uint16_t >( v.m_data.m_high_bits ) << 8) + v.m_data.m_low_bits) } ); } // // Simulation of the data archiving. // void archiving( const sensor_value & v ) { clog << "archiving (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } // // Simulation of the data distribution. // void distribution( const sensor_value & v ) { clog << "distributing (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } // // The first stage of a child pipeline at third level of the main pipeline. // // Checking for to high value of the temperature. // // Returns suspicious_value message or nothing. // stage_result_t< suspicious_value > range_checking( const sensor_value & v ) { if( v.m_data.m_measure >= 45.0f ) return make_result< suspicious_value >( v.m_data ); else return make_empty< suspicious_value >(); } // // The next stage of a child pipeline. // // Checks for two suspicious_value-es in 25ms time window. // class alarm_detector { using clock = chrono::steady_clock; public : stage_result_t< alarm_detected > operator()( const suspicious_value & v ) { if( m_previous ) if( *m_previous + chrono::milliseconds(25) > clock::now() ) { m_previous = nullopt; return make_result< alarm_detected >( v.m_data.m_meter_id ); } m_previous = clock::now(); return make_empty< alarm_detected >(); } private : optional< clock::time_point > m_previous; }; // // One of last stages of a child pipeline. // Imitates beginning of the alarm processing. // void alarm_initiator( const alarm_detected & v ) { clog << "=== alarm (" << v.m_meter_id << ") ===" << endl; } // // Another of last stages of a child pipeline. // Imitates distribution of the alarm. // void alarm_distribution( ostream & to, const alarm_detected & v ) { to << "alarm_distribution (" << v.m_meter_id << ")" << endl; } 

stage_result_t saja hal-hal seperti stage_result_t , make_result dan make_empty , kita akan membahasnya di bagian selanjutnya.


Saya harap kode tahapan itu agak sepele. Satu-satunya bagian yang memerlukan penjelasan tambahan adalah implementasi tahap alarm_detector .


Dalam contoh itu, alarm dimulai hanya jika setidaknya ada dua nilai suspicious_values dalam jendela waktu 25 ms. Jadi kita harus mengingat waktu instance suspicious_value sebelumnya pada tahap alarm_detector . Itu karena alarm_detector diimplementasikan sebagai functor stateful dengan operator panggilan fungsi.


Tahapan mengembalikan jenis SObjectizer bukannya std :: opsional


Saya katakan sebelumnya bahwa tahap dapat mengembalikan nilai opsional. Tetapi std::optional tidak digunakan dalam kode, tipe stage_result_t berbeda dapat dilihat dalam implementasi tahapan.


Itu karena beberapa spesifik SObjectizer memainkan perannya di sini. Nilai yang dikembalikan akan didistribusikan sebagai pesan antara agen SObjectizer (alias aktor). Setiap pesan dalam SObjectizer dikirim sebagai objek yang dialokasikan secara dinamis. Jadi kami memiliki beberapa "optimasi" di sini: alih-alih mengembalikan std::optional dan kemudian mengalokasikan objek pesan baru, kami hanya mengalokasikan objek pesan dan mengembalikan pointer pintar ke sana.


Faktanya, stage_result_t hanyalah sebuah typedef untuk analog shared_ptr SObjectizer:


 template< typename M > using stage_result_t = message_holder_t< M >; 

Dan make_result dan make_empty hanyalah fungsi pembantu untuk membangun stage_result_t dengan atau tanpa nilai aktual di dalamnya:


 template< typename M, typename... Args > stage_result_t< M > make_result( Args &&... args ) { return stage_result_t< M >::make(forward< Args >(args)...); } template< typename M > stage_result_t< M > make_empty() { return stage_result_t< M >(); } 

Untuk kesederhanaan, aman untuk mengatakan tahap validation dapat dinyatakan seperti itu:


 std::shared_ptr< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return std::make_shared< valid_raw_value >( v.m_data ); else return std::shared_ptr< valid_raw_value >{}; } 

Tetapi, karena spesifik SObjectizer, kita tidak dapat menggunakan std::shared_ptr dan harus berurusan dengan so_5::message_holder_t . Dan kami menyembunyikan itu di belakang stage_result_t , make_result , dan make_empty helpers.


pemisahan stage_handler_t dan stage_builder_t


Poin penting dari implementasi pipeline adalah pemisahan konsep stage handler dan stage builder . Ini dilakukan untuk kesederhanaan. Kehadiran konsep-konsep ini memungkinkan saya untuk memiliki dua langkah dalam definisi pipa.


Pada langkah pertama, seorang pengguna menjelaskan tahapan-tahapan pipa. Sebagai hasilnya, saya menerima contoh stage_t yang menampung semua tahapan pipa di dalamnya.


Pada langkah kedua, serangkaian agen SObjectizer yang mendasarinya dibuat. Agen tersebut menerima pesan dengan hasil dari tahap sebelumnya dan memanggil penangan tahap yang sebenarnya, kemudian mengirimkan hasilnya ke tahap berikutnya.


Tetapi untuk membuat set agen ini setiap tahap harus memiliki pembangun panggung . Pembangun panggung dapat dilihat sebagai pabrik yang menciptakan agen SObjectizer yang mendasarinya.


Jadi kita memiliki hubungan berikut: setiap tahap pipa menghasilkan dua objek: stage handler yang memegang logika terkait tahap , dan stage builder yang menciptakan agen SObjectizer yang mendasari untuk memanggil stage handler pada waktu yang tepat:



Pawang ditangani dengan cara berikut:


 template< typename In, typename Out > class stage_handler_t { public : using traits = handler_traits_t< In, Out >; using func_type = function< typename traits::output(const typename traits::input &) >; stage_handler_t( func_type handler ) : m_handler( move(handler) ) {} template< typename Callable > stage_handler_t( Callable handler ) : m_handler( handler ) {} typename traits::output operator()( const typename traits::input & a ) const { return m_handler( a ); } private : func_type m_handler; }; 

Di mana handler_traits_t didefinisikan dengan cara berikut:


 // // We have to deal with two types of stage handlers: // - intermediate handlers which will return some result (eg some new // message); // - terminal handlers which can return nothing (eg void instead of // stage_result_t<M>); // // This template with specialization defines `input` and `output` // aliases for both cases. // template< typename In, typename Out > struct handler_traits_t { using input = In; using output = stage_result_t< Out >; }; template< typename In > struct handler_traits_t< In, void > { using input = In; using output = void; }; 

Builder panggung diwakili oleh just std::function :


 using stage_builder_t = function< mbox_t(coop_t &, mbox_t) >; 

Jenis pembantu lambda_traits_t dan callable_traits_t


Karena tahapan dapat diwakili oleh fungsi atau fungsi bebas (seperti instance dari kelas alarm_detector atau kelas yang dihasilkan kompiler anonim yang mewakili lambdas), kami membutuhkan beberapa pembantu untuk mendeteksi jenis argumen panggung dan nilai pengembalian. Saya menggunakan kode berikut untuk tujuan itu:


 // // Helper type for `arg_type` and `result_type` alises definition. // template< typename R, typename A > struct callable_traits_typedefs_t { using arg_type = A; using result_type = R; }; // // Helper type for dealing with stateful objects with operator() // (they could be user-defined objects or generated by compiler // like lambdas). // template< typename T > struct lambda_traits_t; template< typename M, typename A, typename T > struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) const > : public callable_traits_typedefs_t< M, A > {}; template< typename A, typename T > struct lambda_traits_t< void (T::*)(const A &) const > : public callable_traits_typedefs_t< void, A > {}; template< typename M, typename A, typename T > struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) > : public callable_traits_typedefs_t< M, A > {}; template< typename A, typename T > struct lambda_traits_t< void (T::*)(const A &) > : public callable_traits_typedefs_t< void, A > {}; // // Main type for definition of `arg_type` and `result_type` aliases. // With specialization for various cases. // template< typename T > struct callable_traits_t : public lambda_traits_t< decltype(&T::operator()) > {}; template< typename M, typename A > struct callable_traits_t< stage_result_t< M >(*)(const A &) > : public callable_traits_typedefs_t< M, A > {}; template< typename A > struct callable_traits_t< void(*)(const A &) > : public callable_traits_typedefs_t< void, A > {}; 

Saya harap kode ini akan cukup dimengerti bagi pembaca dengan pengetahuan yang baik tentang C ++. Jika tidak, jangan ragu untuk bertanya kepada saya di komentar, saya akan dengan senang hati menjelaskan logika di balik lambda_traits_t dan callable_traits_t secara detail.


fungsi stage (), broadcast () dan operator | ()


Sekarang kita bisa melihat ke dalam fungsi-fungsi pembangunan pipa utama. Tetapi sebelum itu, perlu untuk melihat definisi dari kelas template stage_t :


 template< typename In, typename Out > struct stage_t { stage_builder_t m_builder; }; 

Ini adalah struct yang sangat sederhana yang hanya stage_bulder_t instance stage_bulder_t . Parameter template tidak digunakan di dalam stage_t , jadi mengapa mereka ada di sini?


Mereka diperlukan untuk pemeriksaan kompilasi waktu kompatibilitas tipe antara tahap-tahap pipa. Kami akan segera melihatnya.


Mari kita lihat fungsi pembangunan saluran pipa yang paling sederhana, stage() :


 template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( Callable handler ) { stage_builder_t builder{ [h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent< a_stage_point_t<In, Out> >( std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; } 

Ini menerima penangan panggung yang sebenarnya sebagai parameter tunggal. Ini bisa menjadi pointer ke fungsi atau fungsi lambda atau functor. Jenis-jenis input dan output stage disimpulkan secara otomatis karena "templat magic" di balik templat callable_traits_t .


Sebuah instance dari builder panggung dibuat di dalam dan instance itu dikembalikan dalam objek stage_t baru sebagai hasil dari fungsi stage() . Pawang panggung yang sebenarnya ditangkap oleh lambda pembangun panggung , kemudian akan digunakan untuk pembangunan agen SObjectizer yang mendasarinya (kita akan membicarakannya di bagian berikutnya).


Fungsi selanjutnya untuk ditinjau adalah operator|() yang menggabungkan dua tahap bersama dan mengembalikan tahap baru:


 template< typename In, typename Out1, typename Out2 > stage_t< In, Out2 > operator|( stage_t< In, Out1 > && prev, stage_t< Out1, Out2 > && next ) { return { stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } }; } 

Cara paling sederhana untuk menjelaskan logika operator|() adalah mencoba menggambar. Anggaplah kita memiliki ungkapan:


 stage(A) | stage(B) | stage(C) | stage(B) 

Ungkapan ini akan diubah seperti itu:



Di sana kita juga dapat melihat bagaimana pengecekan tipe waktu kompilasi bekerja: definisi operator|() mensyaratkan bahwa tipe keluaran dari tahap pertama adalah input dari tahap kedua. Jika ini bukan masalahnya, kode tidak akan dikompilasi.


Dan sekarang kita bisa melihat fungsi pembangunan saluran pipa yang paling kompleks, broadcast() . Fungsi itu sendiri agak sederhana:


 template< typename In, typename Out, typename... Rest > stage_t< In, void > broadcast( stage_t< In, Out > && first, Rest &&... stages ) { stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; return { std::move(builder) }; } 

Perbedaan utama antara tahap biasa dan tahap siaran adalah bahwa tahap siaran harus memiliki vektor pembangun panggung anak perusahaan. Jadi kita harus membuat vektor itu dan mengirimkannya ke pembangun panggung utama siaran-panggung. Karena itu, kita dapat melihat panggilan ke collect_sink_builders di daftar tangkap lambda di dalam fungsi broadcast() :


 stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] 

Jika kita melihat ke collect_sink_builder kita akan melihat kode berikut:


 // // Serie of helper functions for building description for // `broadcast` stage. // // Those functions are used for collecting // `builders` functions for every child pipeline. // // Please note that this functions checks that each child pipeline has the // same In type. // template< typename In, typename Out, typename... Rest > void move_sink_builder_to( vector< stage_builder_t > & receiver, stage_t< In, Out > && first, Rest &&... rest ) { receiver.emplace_back( move( first.m_builder ) ); if constexpr( 0u != sizeof...(rest) ) move_sink_builder_to<In>( receiver, forward< Rest >(rest)... ); } template< typename In, typename Out, typename... Rest > vector< stage_builder_t > collect_sink_builders( stage_t< In, Out > && first, Rest &&... stages ) { vector< stage_builder_t > receiver; receiver.reserve( 1 + sizeof...(stages) ); move_sink_builder_to<In>( receiver, move(first), std::forward<Rest>(stages)... ); return receiver; } 

Pengecekan tipe waktu kompilasi juga berfungsi di sini: itu karena panggilan untuk move_sink_builder_to secara eksplisit diparameterisasi berdasarkan tipe 'In'. Ini berarti bahwa panggilan dalam bentuk collect_sink_builders(stage_t<In1, Out1>, stage_t<In2, Out2>, ...) akan menyebabkan kesalahan kompilasi karena kompiler melarang panggilan move_sink_builder_to<In1>(receiver, stage_t<In2, Out2>, ...) .


Saya juga dapat mencatat bahwa karena jumlah saluran pipa anak perusahaan untuk broadcast() diketahui pada waktu kompilasi kita dapat menggunakan std::array bukan std::vector dan dapat menghindari beberapa alokasi memori. Tetapi std::vector digunakan di sini hanya untuk kesederhanaan.


Hubungan antara tahapan dan agen / mbox SObjectizer


Gagasan di balik pelaksanaan pipa adalah penciptaan agen terpisah untuk setiap tahap pipa. Agen menerima pesan masuk, meneruskannya ke penangan tahap yang sesuai, menganalisis hasilnya dan, jika hasilnya tidak kosong, mengirimkan hasilnya sebagai pesan masuk ke tahap berikutnya. Itu dapat diilustrasikan oleh diagram urutan berikut:



Beberapa hal terkait SObjectizer harus didiskusikan, setidaknya secara singkat. Jika Anda tidak tertarik pada detail seperti itu, Anda dapat melewati bagian di bawah ini dan langsung menuju ke kesimpulan.


Coop adalah sekelompok agen untuk bekerja bersama


Agen dimasukkan ke dalam SObjectizer tidak secara individu tetapi dalam kelompok bernama coops. Koperasi adalah sekelompok agen yang harus bekerja sama dan tidak masuk akal untuk melanjutkan pekerjaan jika salah satu agen grup hilang.


Jadi pengenalan agen untuk SObjectizer seperti pembuatan instance coop, mengisi instance tersebut dengan agen yang sesuai dan kemudian mendaftarkan coop di SObjectizer.


Karena itu argumen pertama untuk pembangun panggung adalah referensi ke kandang baru. make_pipeline() ini dibuat di fungsi make_pipeline() (dibahas di bawah), kemudian diisi oleh pembuat stage dan kemudian didaftarkan (lagi di fungsi make_pipeline() ).


Kotak pesan


SObjectizer mengimplementasikan beberapa model yang terkait dengan konkurensi. Model Aktor hanyalah salah satunya. Karena itu, SObjectizer dapat berbeda secara signifikan dari kerangka aktor lain. Salah satu perbedaannya adalah skema pengalamatan untuk pesan.


Pesan di SObjectizer ditujukan bukan untuk aktor, tetapi kotak pesan (mboxes). Aktor harus berlangganan pesan dari mbox. Jika aktor berlangganan jenis pesan tertentu dari mbox, ia akan menerima pesan jenis itu:



Fakta ini sangat penting karena perlu untuk mengirim pesan dari satu tahap ke tahap lainnya. Ini berarti bahwa setiap tahap harus memiliki mbox-nya dan mbox itu harus diketahui untuk tahap sebelumnya.


Setiap aktor (agen alias) di SObjectizer memiliki mbox langsung . Mbox ini hanya dikaitkan dengan agen pemilik dan tidak dapat digunakan oleh agen lain. Kotak langsung agen yang dibuat untuk tahap akan digunakan untuk interaksi tahap.


Fitur spesifik SObjectizer ini menentukan beberapa detail implementasi pipeline.


Yang pertama adalah fakta bahwa pembangun panggung memiliki prototipe berikut:


 mbox_t builder(coop_t &, mbox_t); 

Ini berarti pembangun panggung menerima mbox dari tahap berikutnya dan harus membuat agen baru yang akan mengirim hasil panggung ke mbox itu. Mbox agen baru harus dikembalikan oleh pembangun panggung . Mbox itu akan digunakan untuk pembuatan agen untuk tahap sebelumnya.


Yang kedua adalah fakta bahwa agen untuk tahapan dibuat dalam urutan cadangan. Itu berarti bahwa jika kita memiliki pipa:


 stage(A) | stage(B) | stage(C) 

Agen untuk tahap C akan dibuat terlebih dahulu, kemudian mbox-nya akan digunakan untuk pembuatan agen untuk tahap B, dan kemudian mbox dari agen tahap-B akan digunakan untuk pembuatan agen untuk tahap A.


Perlu juga dicatat bahwa operator|() tidak membuat agen:


 stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } 

operator|() membuat pembangun yang hanya memanggil pembangun lain tetapi tidak memperkenalkan agen tambahan. Jadi untuk kasus ini:


 stage(A) | stage(B) 

hanya dua agen yang akan dibuat (untuk A-stage dan B-stage) dan kemudian mereka akan dihubungkan bersama dalam pembangun panggung yang dibuat oleh operator|() .


Tidak ada agen untuk implementasi broadcast()


Cara yang jelas untuk mengimplementasikan tahap penyiaran adalah membuat agen khusus yang akan menerima pesan masuk dan kemudian mengirim ulang pesan itu ke daftar mboxes tujuan. Cara itu digunakan dalam implementasi pertama dari pipeline DSL yang dijelaskan.


Tetapi proyek pendamping kami, so5extra , sekarang memiliki varian khusus mbox: broadcasting. Mbox itu melakukan apa yang diperlukan di sini: ia mengambil pesan baru dan mengirimkannya ke sekumpulan mbox tujuan.


Karena itu tidak perlu membuat agen penyiaran yang terpisah, kita bisa menggunakan mbox penyiaran dari so5extra:


 // // A special mbox for broadcasting of a message to a set of destination // mboxes. // using broadcast_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>; ... // // Inside the broadcast() function: // stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); // That is the creation of broadcasting mbox instance. return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; 

Implementasi agen tahap


Sekarang kita bisa melihat implementasi agen tahap:


 // // An agent which will be used as intermediate or terminal pipeline stage. // It will receive input message, call the stage handler and pass // handler result to the next stage (if any). // template< typename In, typename Out > class a_stage_point_t final : public agent_t { public : a_stage_point_t( context_t ctx, stage_handler_t< In, Out > handler, mbox_t next_stage ) : agent_t{ ctx } , m_handler{ move( handler ) } , m_next{ move(next_stage) } {} void so_define_agent() override { if( m_next ) // Because there is the next stage the appropriate // message handler will be used. so_subscribe_self().event( [=]( const In & evt ) { auto r = m_handler( evt ); if( r ) so_5::send( m_next, r ); } ); else // There is no next stage. A very simple message handler // will be used for that case. so_subscribe_self().event( [=]( const In & evt ) { m_handler( evt ); } ); } private : const stage_handler_t< In, Out > m_handler; const mbox_t m_next; }; // // A specialization of a_stage_point_t for the case of terminal stage of // a pipeline. This type will be used for stage handlers with void // return type. // template< typename In > class a_stage_point_t< In, void > final : public agent_t { public : a_stage_point_t( context_t ctx, stage_handler_t< In, void > handler, mbox_t next_stage ) : agent_t{ ctx } , m_handler{ move( handler ) } { if( next_stage ) throw std::runtime_error( "sink point cannot have next stage" ); } void so_define_agent() override { so_subscribe_self().event( [=]( const In & evt ) { m_handler( evt ); } ); } private : const stage_handler_t< In, void > m_handler; }; 

Agak sepele jika Anda memahami dasar-dasar SObjectizer. Jika tidak, akan sangat sulit untuk dijelaskan dalam beberapa kata (jadi silakan bertanya di komentar).


Implementasi utama agen a_stage_point_t menciptakan langganan ke pesan bertipe In. Ketika pesan jenis ini tiba, pawang panggung dipanggil. Jika penangan panggung mengembalikan hasil aktual, hasilnya dikirim ke tahap berikutnya (jika tahap itu ada).


Ada juga versi a_stage_point_t untuk kasus ketika tahap yang sesuai adalah tahap terminal dan tidak mungkin ada tahap berikutnya.


Implementasi a_stage_point_t dapat terlihat sedikit rumit tetapi percayalah, ini adalah salah satu agen paling sederhana yang saya tulis.


fungsi make_pipeline ()


Saatnya untuk membahas fungsi pembangunan saluran pipa terakhir, make_pipeline() :


 template< typename In, typename Out, typename... Args > mbox_t make_pipeline( // SObjectizer Environment to work in. so_5::environment_t & env, // Definition of a pipeline. stage_t< In, Out > && sink, // Optional args to be passed to make_coop() function. Args &&... args ) { auto coop = env.make_coop( forward< Args >(args)... ); auto mbox = sink.m_builder( *coop, mbox_t{} ); env.register_coop( move(coop) ); return mbox; } 

Tidak ada keajaiban atau kejutan di sini. Kita hanya perlu membuat kandang baru untuk agen yang mendasari pipa, mengisi kandang itu dengan agen dengan memanggil pembangun tahap tingkat atas , dan kemudian mendaftarkan kandang itu ke dalam SObjectizer. Itu saja.


Hasil dari make_pipeline() adalah mbox dari tahap paling kiri (pertama) dari pipeline. Mbox itu harus digunakan untuk mengirim pesan ke saluran pipa.


Simulasi dan eksperimen dengannya


Jadi sekarang kita memiliki tipe data dan fungsi untuk logika aplikasi kita dan alat untuk menghubungkan fungsi-fungsi tersebut ke dalam pipa pemrosesan data. Mari kita lakukan dan lihat hasilnya:


 int main() { // Launch SObjectizer in a separate thread. wrapped_env_t sobj; // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) ); // Send messages to a pipeline in a loop with 10ms delays. for( uint8_t i = 0; i < static_cast< uint8_t >(250); i += 10 ) { send< raw_value >( pipeline, raw_measure{ 0, 0, i } ); std::this_thread::sleep_for( chrono::milliseconds{10} ); } } 

Jika kita menjalankan contoh itu, kita akan melihat output berikut:


 archiving (0,0) distributing (0,0) archiving (0,5) distributing (0,5) archiving (0,10) distributing (0,10) archiving (0,15) distributing (0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,30) distributing (0,30) ... archiving (0,105) distributing (0,105) archiving (0,110) distributing (0,110) === alarm (0) === alarm_distribution (0) archiving (0,115) distributing (0,115) archiving (0,120) distributing (0,120) === alarm (0) === alarm_distribution (0) 

Itu bekerja.


Tetapi tampaknya tahap-tahap dari saluran pipa kami bekerja secara berurutan, satu demi satu, bukan?


Ya, benar. Ini karena semua agen pipa terikat ke operator SObjectizer default. Dan operator itu hanya menggunakan satu utas pekerja untuk melayani pemrosesan pesan semua agen.


Tapi ini bisa dengan mudah diubah. Cukup berikan argumen tambahan untuk panggilan make_pipeline() :


 // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), disp::thread_pool::make_dispatcher( sobj.environment() ).binder( disp::thread_pool::bind_params_t{}.fifo( disp::thread_pool::fifo_t::individual ) ) ); 

Ini menciptakan kumpulan utas baru dan mengikat semua agen pipa ke kumpulan itu. Setiap agen akan dilayani oleh kolam independen dari agen lain.


Jika kita menjalankan contoh yang dimodifikasi, kita dapat melihat sesuatu seperti itu:


 archiving (0,0) distributing (0,0) distributing (0,5) archiving (0,5) archiving (0,10) distributing (0,10) distributing (archiving (0,15) 0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,distributing (030) ,30) ... archiving (0,distributing (0,105) 105) archiving (0,alarm_distribution (0) distributing (0,=== alarm (0) === 110) 110) archiving (distributing (0,0,115) 115) archiving (distributing (=== alarm (0) === 0alarm_distribution (0) 0,120) ,120) 

Jadi kita dapat melihat bahwa berbagai tahapan kerja pipeline paralel.


Tetapi apakah mungkin untuk melangkah lebih jauh dan memiliki kemampuan untuk mengikat tahapan ke operator yang berbeda?


Ya, itu mungkin, tapi kami harus mengimplementasikan fungsi stage() berlebih:


 template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( disp_binder_shptr_t disp_binder, Callable handler ) { stage_builder_t builder{ [binder = std::move(disp_binder), h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent_with_binder< a_stage_point_t<In, Out> >( std::move(binder), std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; } 

This version of stage() accepts not only a stage handler but also a dispatcher binder. Dispatcher binder is a way to bind an agent to the particular dispatcher. So to assign a stage to a specific working context we can create an appropriate dispatcher and then pass the binder to that dispatcher to stage() function. Let's do that:


 // An active_obj dispatcher to be used for some stages. auto ao_disp = disp::active_obj::make_dispatcher( sobj.environment() ); // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(ao_disp.binder(), archiving), stage(ao_disp.binder(), distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(ao_disp.binder(), alarm_initiator), stage(ao_disp.binder(), []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), disp::one_thread::make_dispatcher( sobj.environment() ).binder() ); 

In that case stages archiving , distribution , alarm_initiator and alarm_distribution will work on own worker threads. All other stages will work on the same single worker thread.


The conclusion


This was an interesting experiment and I was surprised how easy SObjectizer could be used in something like reactive programming or data-flow programming.


However, I don't think that pipeline DSL can be practically meaningful. It's too simple and, maybe not flexible enough. But, I hope, it can be a base for more interesting experiments for those why need to deal with different workflows and data-processing pipelines. At least as a base for some ideas in that area. C++ language a rather good here and some (not so complicated) template magic can help to catch various errors at compile-time.


In conclusion, I want to say that we see SObjectizer not as a specialized tool for solving a particular problem, but as a basic set of tools to be used in solutions for different problems. And, more importantly, that basic set can be extended for your needs. Just take a look at SObjectizer , try it, and share your feedback. Maybe you missed something in SObjectizer? Perhaps you don't like something? Tell us , and we can try to help you.


If you want to help further development of SObjectizer, please share a reference to it or to this article somewhere you want (Reddit, HackerNews, LinkedIn, Facebook, Twitter, ...). The more attention and the more feedback, the more new features will be incorporated into SObjectizer.


And many thanks for reading this ;)


PS. The source code for that example can be found in that repository .

Source: https://habr.com/ru/post/id460123/


All Articles