Beberapa waktu lalu, dalam sebuah diskusi di salah satu rilis SObjectizer, kami ditanya: "Apakah mungkin membuat DSL untuk menggambarkan pipa pemrosesan data?" Dengan kata lain, apakah mungkin untuk menulis sesuatu seperti itu:
A | B | C | D
dan dapatkan jalur pipa yang berfungsi di mana pesan pergi dari A ke B, lalu ke C, dan kemudian ke D. Dengan kontrol B menerima persis tipe yang dikembalikan A. Dan C menerima persis tipe yang dikembalikan B. Dan sebagainya.
Itu adalah tugas yang menarik dengan solusi yang sangat sederhana. Misalnya, begitulah pembuatan saluran pipa terlihat seperti:
auto pipeline = make_pipeline(env, stage(A) | stage(B) | stage(C) | stage(D));
Atau, dalam kasus yang lebih kompleks (yang akan dibahas di bawah):
auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) );
Pada artikel ini, kita akan berbicara tentang implementasi DSL pipeline tersebut. Kami akan membahas sebagian besar bagian yang terkait dengan fungsi stage()
, broadcast()
dan operator|()
dengan beberapa contoh penggunaan templat C ++. Jadi saya berharap ini akan menarik bahkan bagi pembaca yang tidak tahu tentang SObjectizer (jika Anda belum pernah mendengar tentang SObjectizer di sini adalah gambaran umum alat ini).
Beberapa kata tentang demo yang digunakan
Contoh yang digunakan dalam artikel ini dipengaruhi oleh pengalaman saya yang lama (dan agak terlupakan) di area SCADA.
Ide dari demo adalah penanganan data yang dibaca dari beberapa sensor. Data diperoleh dari sensor dengan beberapa periode, maka data tersebut harus divalidasi (data yang salah harus diabaikan) dan dikonversi menjadi beberapa nilai aktual. Misalnya, data mentah yang dibaca dari sensor dapat berupa dua nilai integer 8-bit dan nilai-nilai tersebut harus dikonversi menjadi satu angka floating-point.
Kemudian nilai yang valid dan dikonversi harus diarsipkan, didistribusikan di suatu tempat (pada node yang berbeda untuk visualisasi, misalnya), diperiksa untuk "alarm" (jika nilai berada di luar rentang aman maka itu harus ditangani secara khusus). Operasi-operasi ini independen dan dapat dilakukan secara paralel.
Operasi yang terkait dengan alarm yang terdeteksi dapat dilakukan secara paralel juga: "alarm" harus dimulai (sehingga bagian SCADA pada node saat ini dapat bereaksi padanya) dan informasi tentang "alarm" harus didistribusikan di tempat lain (misalnya : disimpan ke database historis dan / atau divisualisasikan pada layar operator SCADA).
Logika ini dapat diekspresikan dalam bentuk tekstual seperti itu:
optional(valid_raw_data) = validate(raw_data); if valid_raw_data is not empty then { converted_value = convert(valid_raw_data); do_async archive(converted_value); do_async distribute(converted_value); do_async { optional(suspicious_value) = check_range(converted_value); if suspicious_value is not empty then { optional(alarm) = detect_alarm(suspicious_value); if alarm is not empty then { do_async initiate_alarm(alarm); do_async distribute_alarm(alam); } } } }
Atau, dalam bentuk grafis:

Ini contoh yang agak tiruan, tetapi ada beberapa hal menarik yang ingin saya perlihatkan. Yang pertama adalah adanya tahapan paralel dalam pipa ( broadcast()
operasi broadcast()
ada hanya karena itu). Yang kedua adalah kehadiran suatu negara dalam beberapa tahap. Sebagai contoh, alarm_detector adalah stateful stage.
Kemampuan saluran pipa
Saluran pipa dibangun dari tahapan terpisah. Setiap tahap adalah fungsi atau fungsi dari format berikut:
opt<Out> func(const In &);
atau
void func(const In &);
Tahapan yang mengembalikan void
hanya dapat digunakan sebagai tahap terakhir dari pipa.
Tahapan terikat ke dalam rantai. Setiap tahap berikutnya menerima objek yang dikembalikan oleh tahap sebelumnya. Jika tahap sebelumnya mengembalikan nilai opt<Out>
kosong opt<Out>
maka tahap selanjutnya tidak dipanggil.
Ada panggung broadcast
khusus. Itu dibangun dari beberapa pipa. Tahap broadcast
menerima objek dari tahap sebelumnya dan menyiarkannya ke setiap pipa anak perusahaan.
Dari sudut pandang pipa, tahap broadcast
tampak seperti fungsi dari format berikut:
void func(const In &);
Karena tidak ada nilai balik dari tahap broadcast
tahap broadcast
hanya bisa menjadi tahap terakhir dalam saluran pipa.
Mengapa tahap pipa mengembalikan nilai opsional?
Itu karena ada kebutuhan untuk menjatuhkan beberapa nilai yang masuk. Misalnya, tahap validate
tidak mengembalikan apa pun jika nilai mentah salah, dan tidak ada gunanya menanganinya.
Contoh lain: tahap alarm_detector
tidak mengembalikan apa pun jika nilai mencurigakan saat ini tidak menghasilkan kasus alarm baru.
Detail implementasi
Mari kita mulai dari tipe data dan fungsi yang terkait dengan logika aplikasi. Dalam contoh yang dibahas, tipe data berikut digunakan untuk meneruskan informasi dari satu tahap ke tahap lainnya:
Sebuah instance dari raw_value
akan menuju ke tahap pertama dari pipeline kami. Nilai raw_value
ini berisi informasi yang diperoleh dari sensor dalam bentuk objek raw_measure
. Kemudian raw_value
ditransformasikan menjadi valid_raw_value
. Kemudian valid_raw_value
ditransformasikan menjadi sensor_value
dengan nilai sensor aktual dalam bentuk calulated_measure
. Jika instance sensor_value
berisi nilai mencurigakan, maka instance suspicious_value
diproduksi. Dan suspicious_value
dapat diubah menjadi instance alarm_detected
nanti.
Atau, dalam bentuk grafis:

Sekarang kita bisa melihat implementasi dari tahapan pipeline kami:
stage_result_t
saja hal-hal seperti stage_result_t
, make_result
dan make_empty
, kita akan membahasnya di bagian selanjutnya.
Saya harap kode tahapan itu agak sepele. Satu-satunya bagian yang memerlukan penjelasan tambahan adalah implementasi tahap alarm_detector
.
Dalam contoh itu, alarm dimulai hanya jika setidaknya ada dua nilai suspicious_values
dalam jendela waktu 25 ms. Jadi kita harus mengingat waktu instance suspicious_value
sebelumnya pada tahap alarm_detector
. Itu karena alarm_detector
diimplementasikan sebagai functor stateful dengan operator panggilan fungsi.
Tahapan mengembalikan jenis SObjectizer bukannya std :: opsional
Saya katakan sebelumnya bahwa tahap dapat mengembalikan nilai opsional. Tetapi std::optional
tidak digunakan dalam kode, tipe stage_result_t
berbeda dapat dilihat dalam implementasi tahapan.
Itu karena beberapa spesifik SObjectizer memainkan perannya di sini. Nilai yang dikembalikan akan didistribusikan sebagai pesan antara agen SObjectizer (alias aktor). Setiap pesan dalam SObjectizer dikirim sebagai objek yang dialokasikan secara dinamis. Jadi kami memiliki beberapa "optimasi" di sini: alih-alih mengembalikan std::optional
dan kemudian mengalokasikan objek pesan baru, kami hanya mengalokasikan objek pesan dan mengembalikan pointer pintar ke sana.
Faktanya, stage_result_t
hanyalah sebuah typedef untuk analog shared_ptr SObjectizer:
template< typename M > using stage_result_t = message_holder_t< M >;
Dan make_result
dan make_empty
hanyalah fungsi pembantu untuk membangun stage_result_t
dengan atau tanpa nilai aktual di dalamnya:
template< typename M, typename... Args > stage_result_t< M > make_result( Args &&... args ) { return stage_result_t< M >::make(forward< Args >(args)...); } template< typename M > stage_result_t< M > make_empty() { return stage_result_t< M >(); }
Untuk kesederhanaan, aman untuk mengatakan tahap validation
dapat dinyatakan seperti itu:
std::shared_ptr< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return std::make_shared< valid_raw_value >( v.m_data ); else return std::shared_ptr< valid_raw_value >{}; }
Tetapi, karena spesifik SObjectizer, kita tidak dapat menggunakan std::shared_ptr
dan harus berurusan dengan so_5::message_holder_t
. Dan kami menyembunyikan itu di belakang stage_result_t
, make_result
, dan make_empty
helpers.
pemisahan stage_handler_t dan stage_builder_t
Poin penting dari implementasi pipeline adalah pemisahan konsep stage handler dan stage builder . Ini dilakukan untuk kesederhanaan. Kehadiran konsep-konsep ini memungkinkan saya untuk memiliki dua langkah dalam definisi pipa.
Pada langkah pertama, seorang pengguna menjelaskan tahapan-tahapan pipa. Sebagai hasilnya, saya menerima contoh stage_t
yang menampung semua tahapan pipa di dalamnya.
Pada langkah kedua, serangkaian agen SObjectizer yang mendasarinya dibuat. Agen tersebut menerima pesan dengan hasil dari tahap sebelumnya dan memanggil penangan tahap yang sebenarnya, kemudian mengirimkan hasilnya ke tahap berikutnya.
Tetapi untuk membuat set agen ini setiap tahap harus memiliki pembangun panggung . Pembangun panggung dapat dilihat sebagai pabrik yang menciptakan agen SObjectizer yang mendasarinya.
Jadi kita memiliki hubungan berikut: setiap tahap pipa menghasilkan dua objek: stage handler yang memegang logika terkait tahap , dan stage builder yang menciptakan agen SObjectizer yang mendasari untuk memanggil stage handler pada waktu yang tepat:

Pawang ditangani dengan cara berikut:
template< typename In, typename Out > class stage_handler_t { public : using traits = handler_traits_t< In, Out >; using func_type = function< typename traits::output(const typename traits::input &) >; stage_handler_t( func_type handler ) : m_handler( move(handler) ) {} template< typename Callable > stage_handler_t( Callable handler ) : m_handler( handler ) {} typename traits::output operator()( const typename traits::input & a ) const { return m_handler( a ); } private : func_type m_handler; };
Di mana handler_traits_t
didefinisikan dengan cara berikut:
Builder panggung diwakili oleh just std::function
:
using stage_builder_t = function< mbox_t(coop_t &, mbox_t) >;
Jenis pembantu lambda_traits_t dan callable_traits_t
Karena tahapan dapat diwakili oleh fungsi atau fungsi bebas (seperti instance dari kelas alarm_detector
atau kelas yang dihasilkan kompiler anonim yang mewakili lambdas), kami membutuhkan beberapa pembantu untuk mendeteksi jenis argumen panggung dan nilai pengembalian. Saya menggunakan kode berikut untuk tujuan itu:
Saya harap kode ini akan cukup dimengerti bagi pembaca dengan pengetahuan yang baik tentang C ++. Jika tidak, jangan ragu untuk bertanya kepada saya di komentar, saya akan dengan senang hati menjelaskan logika di balik lambda_traits_t
dan callable_traits_t
secara detail.
fungsi stage (), broadcast () dan operator | ()
Sekarang kita bisa melihat ke dalam fungsi-fungsi pembangunan pipa utama. Tetapi sebelum itu, perlu untuk melihat definisi dari kelas template stage_t
:
template< typename In, typename Out > struct stage_t { stage_builder_t m_builder; };
Ini adalah struct yang sangat sederhana yang hanya stage_bulder_t
instance stage_bulder_t
. Parameter template tidak digunakan di dalam stage_t
, jadi mengapa mereka ada di sini?
Mereka diperlukan untuk pemeriksaan kompilasi waktu kompatibilitas tipe antara tahap-tahap pipa. Kami akan segera melihatnya.
Mari kita lihat fungsi pembangunan saluran pipa yang paling sederhana, stage()
:
template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( Callable handler ) { stage_builder_t builder{ [h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent< a_stage_point_t<In, Out> >( std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; }
Ini menerima penangan panggung yang sebenarnya sebagai parameter tunggal. Ini bisa menjadi pointer ke fungsi atau fungsi lambda atau functor. Jenis-jenis input dan output stage disimpulkan secara otomatis karena "templat magic" di balik templat callable_traits_t
.
Sebuah instance dari builder panggung dibuat di dalam dan instance itu dikembalikan dalam objek stage_t
baru sebagai hasil dari fungsi stage()
. Pawang panggung yang sebenarnya ditangkap oleh lambda pembangun panggung , kemudian akan digunakan untuk pembangunan agen SObjectizer yang mendasarinya (kita akan membicarakannya di bagian berikutnya).
Fungsi selanjutnya untuk ditinjau adalah operator|()
yang menggabungkan dua tahap bersama dan mengembalikan tahap baru:
template< typename In, typename Out1, typename Out2 > stage_t< In, Out2 > operator|( stage_t< In, Out1 > && prev, stage_t< Out1, Out2 > && next ) { return { stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } }; }
Cara paling sederhana untuk menjelaskan logika operator|()
adalah mencoba menggambar. Anggaplah kita memiliki ungkapan:
stage(A) | stage(B) | stage(C) | stage(B)
Ungkapan ini akan diubah seperti itu:

Di sana kita juga dapat melihat bagaimana pengecekan tipe waktu kompilasi bekerja: definisi operator|()
mensyaratkan bahwa tipe keluaran dari tahap pertama adalah input dari tahap kedua. Jika ini bukan masalahnya, kode tidak akan dikompilasi.
Dan sekarang kita bisa melihat fungsi pembangunan saluran pipa yang paling kompleks, broadcast()
. Fungsi itu sendiri agak sederhana:
template< typename In, typename Out, typename... Rest > stage_t< In, void > broadcast( stage_t< In, Out > && first, Rest &&... stages ) { stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; return { std::move(builder) }; }
Perbedaan utama antara tahap biasa dan tahap siaran adalah bahwa tahap siaran harus memiliki vektor pembangun panggung anak perusahaan. Jadi kita harus membuat vektor itu dan mengirimkannya ke pembangun panggung utama siaran-panggung. Karena itu, kita dapat melihat panggilan ke collect_sink_builders
di daftar tangkap lambda di dalam fungsi broadcast()
:
stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)]
Jika kita melihat ke collect_sink_builder
kita akan melihat kode berikut:
Pengecekan tipe waktu kompilasi juga berfungsi di sini: itu karena panggilan untuk move_sink_builder_to
secara eksplisit diparameterisasi berdasarkan tipe 'In'. Ini berarti bahwa panggilan dalam bentuk collect_sink_builders(stage_t<In1, Out1>, stage_t<In2, Out2>, ...)
akan menyebabkan kesalahan kompilasi karena kompiler melarang panggilan move_sink_builder_to<In1>(receiver, stage_t<In2, Out2>, ...)
.
Saya juga dapat mencatat bahwa karena jumlah saluran pipa anak perusahaan untuk broadcast()
diketahui pada waktu kompilasi kita dapat menggunakan std::array
bukan std::vector
dan dapat menghindari beberapa alokasi memori. Tetapi std::vector
digunakan di sini hanya untuk kesederhanaan.
Hubungan antara tahapan dan agen / mbox SObjectizer
Gagasan di balik pelaksanaan pipa adalah penciptaan agen terpisah untuk setiap tahap pipa. Agen menerima pesan masuk, meneruskannya ke penangan tahap yang sesuai, menganalisis hasilnya dan, jika hasilnya tidak kosong, mengirimkan hasilnya sebagai pesan masuk ke tahap berikutnya. Itu dapat diilustrasikan oleh diagram urutan berikut:

Beberapa hal terkait SObjectizer harus didiskusikan, setidaknya secara singkat. Jika Anda tidak tertarik pada detail seperti itu, Anda dapat melewati bagian di bawah ini dan langsung menuju ke kesimpulan.
Coop adalah sekelompok agen untuk bekerja bersama
Agen dimasukkan ke dalam SObjectizer tidak secara individu tetapi dalam kelompok bernama coops. Koperasi adalah sekelompok agen yang harus bekerja sama dan tidak masuk akal untuk melanjutkan pekerjaan jika salah satu agen grup hilang.
Jadi pengenalan agen untuk SObjectizer seperti pembuatan instance coop, mengisi instance tersebut dengan agen yang sesuai dan kemudian mendaftarkan coop di SObjectizer.
Karena itu argumen pertama untuk pembangun panggung adalah referensi ke kandang baru. make_pipeline()
ini dibuat di fungsi make_pipeline()
(dibahas di bawah), kemudian diisi oleh pembuat stage dan kemudian didaftarkan (lagi di fungsi make_pipeline()
).
Kotak pesan
SObjectizer mengimplementasikan beberapa model yang terkait dengan konkurensi. Model Aktor hanyalah salah satunya. Karena itu, SObjectizer dapat berbeda secara signifikan dari kerangka aktor lain. Salah satu perbedaannya adalah skema pengalamatan untuk pesan.
Pesan di SObjectizer ditujukan bukan untuk aktor, tetapi kotak pesan (mboxes). Aktor harus berlangganan pesan dari mbox. Jika aktor berlangganan jenis pesan tertentu dari mbox, ia akan menerima pesan jenis itu:

Fakta ini sangat penting karena perlu untuk mengirim pesan dari satu tahap ke tahap lainnya. Ini berarti bahwa setiap tahap harus memiliki mbox-nya dan mbox itu harus diketahui untuk tahap sebelumnya.
Setiap aktor (agen alias) di SObjectizer memiliki mbox langsung . Mbox ini hanya dikaitkan dengan agen pemilik dan tidak dapat digunakan oleh agen lain. Kotak langsung agen yang dibuat untuk tahap akan digunakan untuk interaksi tahap.
Fitur spesifik SObjectizer ini menentukan beberapa detail implementasi pipeline.
Yang pertama adalah fakta bahwa pembangun panggung memiliki prototipe berikut:
mbox_t builder(coop_t &, mbox_t);
Ini berarti pembangun panggung menerima mbox dari tahap berikutnya dan harus membuat agen baru yang akan mengirim hasil panggung ke mbox itu. Mbox agen baru harus dikembalikan oleh pembangun panggung . Mbox itu akan digunakan untuk pembuatan agen untuk tahap sebelumnya.
Yang kedua adalah fakta bahwa agen untuk tahapan dibuat dalam urutan cadangan. Itu berarti bahwa jika kita memiliki pipa:
stage(A) | stage(B) | stage(C)
Agen untuk tahap C akan dibuat terlebih dahulu, kemudian mbox-nya akan digunakan untuk pembuatan agen untuk tahap B, dan kemudian mbox dari agen tahap-B akan digunakan untuk pembuatan agen untuk tahap A.
Perlu juga dicatat bahwa operator|()
tidak membuat agen:
stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } }
operator|()
membuat pembangun yang hanya memanggil pembangun lain tetapi tidak memperkenalkan agen tambahan. Jadi untuk kasus ini:
stage(A) | stage(B)
hanya dua agen yang akan dibuat (untuk A-stage dan B-stage) dan kemudian mereka akan dihubungkan bersama dalam pembangun panggung yang dibuat oleh operator|()
.
Tidak ada agen untuk implementasi broadcast()
Cara yang jelas untuk mengimplementasikan tahap penyiaran adalah membuat agen khusus yang akan menerima pesan masuk dan kemudian mengirim ulang pesan itu ke daftar mboxes tujuan. Cara itu digunakan dalam implementasi pertama dari pipeline DSL yang dijelaskan.
Tetapi proyek pendamping kami, so5extra , sekarang memiliki varian khusus mbox: broadcasting. Mbox itu melakukan apa yang diperlukan di sini: ia mengambil pesan baru dan mengirimkannya ke sekumpulan mbox tujuan.
Karena itu tidak perlu membuat agen penyiaran yang terpisah, kita bisa menggunakan mbox penyiaran dari so5extra:
Implementasi agen tahap
Sekarang kita bisa melihat implementasi agen tahap:
Agak sepele jika Anda memahami dasar-dasar SObjectizer. Jika tidak, akan sangat sulit untuk dijelaskan dalam beberapa kata (jadi silakan bertanya di komentar).
Implementasi utama agen a_stage_point_t
menciptakan langganan ke pesan bertipe In. Ketika pesan jenis ini tiba, pawang panggung dipanggil. Jika penangan panggung mengembalikan hasil aktual, hasilnya dikirim ke tahap berikutnya (jika tahap itu ada).
Ada juga versi a_stage_point_t
untuk kasus ketika tahap yang sesuai adalah tahap terminal dan tidak mungkin ada tahap berikutnya.
Implementasi a_stage_point_t
dapat terlihat sedikit rumit tetapi percayalah, ini adalah salah satu agen paling sederhana yang saya tulis.
fungsi make_pipeline ()
Saatnya untuk membahas fungsi pembangunan saluran pipa terakhir, make_pipeline()
:
template< typename In, typename Out, typename... Args > mbox_t make_pipeline(
Tidak ada keajaiban atau kejutan di sini. Kita hanya perlu membuat kandang baru untuk agen yang mendasari pipa, mengisi kandang itu dengan agen dengan memanggil pembangun tahap tingkat atas , dan kemudian mendaftarkan kandang itu ke dalam SObjectizer. Itu saja.
Hasil dari make_pipeline()
adalah mbox dari tahap paling kiri (pertama) dari pipeline. Mbox itu harus digunakan untuk mengirim pesan ke saluran pipa.
Simulasi dan eksperimen dengannya
Jadi sekarang kita memiliki tipe data dan fungsi untuk logika aplikasi kita dan alat untuk menghubungkan fungsi-fungsi tersebut ke dalam pipa pemrosesan data. Mari kita lakukan dan lihat hasilnya:
int main() {
Jika kita menjalankan contoh itu, kita akan melihat output berikut:
archiving (0,0) distributing (0,0) archiving (0,5) distributing (0,5) archiving (0,10) distributing (0,10) archiving (0,15) distributing (0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,30) distributing (0,30) ... archiving (0,105) distributing (0,105) archiving (0,110) distributing (0,110) === alarm (0) === alarm_distribution (0) archiving (0,115) distributing (0,115) archiving (0,120) distributing (0,120) === alarm (0) === alarm_distribution (0)
Itu bekerja.
Tetapi tampaknya tahap-tahap dari saluran pipa kami bekerja secara berurutan, satu demi satu, bukan?
Ya, benar. Ini karena semua agen pipa terikat ke operator SObjectizer default. Dan operator itu hanya menggunakan satu utas pekerja untuk melayani pemrosesan pesan semua agen.
Tapi ini bisa dengan mudah diubah. Cukup berikan argumen tambahan untuk panggilan make_pipeline()
:
Ini menciptakan kumpulan utas baru dan mengikat semua agen pipa ke kumpulan itu. Setiap agen akan dilayani oleh kolam independen dari agen lain.
Jika kita menjalankan contoh yang dimodifikasi, kita dapat melihat sesuatu seperti itu:
archiving (0,0) distributing (0,0) distributing (0,5) archiving (0,5) archiving (0,10) distributing (0,10) distributing (archiving (0,15) 0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,distributing (030) ,30) ... archiving (0,distributing (0,105) 105) archiving (0,alarm_distribution (0) distributing (0,=== alarm (0) === 110) 110) archiving (distributing (0,0,115) 115) archiving (distributing (=== alarm (0) === 0alarm_distribution (0) 0,120) ,120)
Jadi kita dapat melihat bahwa berbagai tahapan kerja pipeline paralel.
Tetapi apakah mungkin untuk melangkah lebih jauh dan memiliki kemampuan untuk mengikat tahapan ke operator yang berbeda?
Ya, itu mungkin, tapi kami harus mengimplementasikan fungsi stage()
berlebih:
template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( disp_binder_shptr_t disp_binder, Callable handler ) { stage_builder_t builder{ [binder = std::move(disp_binder), h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent_with_binder< a_stage_point_t<In, Out> >( std::move(binder), std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; }
This version of stage()
accepts not only a stage handler but also a dispatcher binder. Dispatcher binder is a way to bind an agent to the particular dispatcher. So to assign a stage to a specific working context we can create an appropriate dispatcher and then pass the binder to that dispatcher to stage()
function. Let's do that:
In that case stages archiving
, distribution
, alarm_initiator
and alarm_distribution
will work on own worker threads. All other stages will work on the same single worker thread.
The conclusion
This was an interesting experiment and I was surprised how easy SObjectizer could be used in something like reactive programming or data-flow programming.
However, I don't think that pipeline DSL can be practically meaningful. It's too simple and, maybe not flexible enough. But, I hope, it can be a base for more interesting experiments for those why need to deal with different workflows and data-processing pipelines. At least as a base for some ideas in that area. C++ language a rather good here and some (not so complicated) template magic can help to catch various errors at compile-time.
In conclusion, I want to say that we see SObjectizer not as a specialized tool for solving a particular problem, but as a basic set of tools to be used in solutions for different problems. And, more importantly, that basic set can be extended for your needs. Just take a look at SObjectizer , try it, and share your feedback. Maybe you missed something in SObjectizer? Perhaps you don't like something? Tell us , and we can try to help you.
If you want to help further development of SObjectizer, please share a reference to it or to this article somewhere you want (Reddit, HackerNews, LinkedIn, Facebook, Twitter, ...). The more attention and the more feedback, the more new features will be incorporated into SObjectizer.
And many thanks for reading this ;)
PS. The source code for that example can be found in that repository .