Eine deklarative Datenverarbeitungspipeline über den Akteuren? Warum nicht?

Vor einiger Zeit wurden wir in einer Diskussion über eine der Versionen von SObjectizer gefragt: "Ist es möglich, ein DSL zur Beschreibung einer Datenverarbeitungspipeline zu erstellen?" Mit anderen Worten, ist es möglich, so etwas zu schreiben:


A | B | C | D


und erhalten Sie eine funktionierende Pipeline, in der Nachrichten von A nach B und dann nach C und dann nach D geleitet werden. Mit der Kontrolle erhält B genau den Typ, den A zurückgibt. Und C empfängt genau den Typ, den B zurückgibt. Und so weiter.


Es war eine interessante Aufgabe mit einer überraschend einfachen Lösung. So kann beispielsweise die Erstellung einer Pipeline aussehen:


 auto pipeline = make_pipeline(env, stage(A) | stage(B) | stage(C) | stage(D)); 

Oder in einem komplexeren Fall (der unten diskutiert wird):


 auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) ); 

In diesem Artikel werden wir über die Implementierung eines solchen Pipeline-DSL sprechen. Wir werden hauptsächlich Teile im Zusammenhang mit stage() , broadcast() und operator|() -Funktionen mit mehreren Beispielen für die Verwendung von C ++ - Vorlagen diskutieren. Ich hoffe, es wird auch für Leser interessant sein, die nichts über SObjectizer wissen (wenn Sie noch nie von SObjectizer gehört haben, finden Sie hier eine Übersicht über dieses Tool).


Ein paar Worte zur verwendeten Demo


Das in diesem Artikel verwendete Beispiel wurde von meiner alten (und eher vergessenen) Erfahrung im SCADA-Bereich beeinflusst.


Die Idee der Demo ist der Umgang mit Daten, die von einem Sensor gelesen werden. Die Daten werden von einem Sensor mit einer gewissen Zeitspanne erfasst, dann müssen diese Daten validiert (falsche Daten sollten ignoriert werden) und in einige tatsächliche Werte umgewandelt werden. Beispielsweise können die von einem Sensor gelesenen Rohdaten zwei 8-Bit-Ganzzahlwerte sein, und diese Werte sollten in eine Gleitkommazahl umgewandelt werden.


Dann sollten die gültigen und konvertierten Werte archiviert, irgendwo verteilt (z. B. auf verschiedenen Knoten zur Visualisierung) und auf "Alarme" überprüft werden (wenn Werte außerhalb sicherer Bereiche liegen, sollte dies speziell behandelt werden). Diese Operationen sind unabhängig und können parallel ausgeführt werden.


Operationen, die sich auf den erkannten Alarm beziehen, können auch parallel ausgeführt werden: Ein "Alarm" sollte ausgelöst werden (damit der Teil von SCADA auf dem aktuellen Knoten darauf reagieren kann) und die Informationen über den "Alarm" sollten an anderer Stelle verteilt werden (z. B.) : in einer historischen Datenbank gespeichert und / oder auf dem Display des SCADA-Bedieners visualisiert).


Diese Logik kann in Textform folgendermaßen ausgedrückt werden:


 optional(valid_raw_data) = validate(raw_data); if valid_raw_data is not empty then { converted_value = convert(valid_raw_data); do_async archive(converted_value); do_async distribute(converted_value); do_async { optional(suspicious_value) = check_range(converted_value); if suspicious_value is not empty then { optional(alarm) = detect_alarm(suspicious_value); if alarm is not empty then { do_async initiate_alarm(alarm); do_async distribute_alarm(alam); } } } } 

Oder in grafischer Form:



Es ist ein ziemlich künstliches Beispiel, aber es hat einige interessante Dinge, die ich zeigen möchte. Das erste ist das Vorhandensein paralleler Stufen in einer Pipeline (Operation broadcast() existiert nur deswegen). Das zweite ist das Vorhandensein eines Staates in einigen Stadien. Zum Beispiel ist alarm_detector eine Stateful-Phase.


Pipeline-Funktionen


Eine Pipeline wird aus getrennten Stufen aufgebaut. Jede Stufe ist eine Funktion oder ein Funktor des folgenden Formats:


 opt<Out> func(const In &); 

oder


 void func(const In &); 

Stufen, die void können nur als letzte Stufe einer Pipeline verwendet werden.


Stufen sind zu einer Kette verbunden. Jede nächste Stufe erhält ein Objekt, das von der vorherigen Stufe zurückgegeben wurde. Wenn die vorherige Stufe den leeren Wert opt<Out> zurückgibt, wird die nächste Stufe nicht aufgerufen.


Es gibt eine spezielle broadcast . Es besteht aus mehreren Pipelines. Eine broadcast empfängt ein Objekt aus der vorherigen Stufe und sendet es an jede Tochterpipeline.


Aus Sicht der Pipeline sieht die broadcast Phase wie eine Funktion des folgenden Formats aus:


 void func(const In &); 

Da es keinen Rückgabewert von der broadcast Stufe gibt, kann eine broadcast Stufe nur die letzte Stufe in einer Pipeline sein.


Warum gibt die Pipeline-Stufe einen optionalen Wert zurück?


Dies liegt daran, dass einige eingehende Werte gelöscht werden müssen. Beispielsweise gibt die validate nichts zurück, wenn ein Rohwert falsch ist und es keinen Sinn macht, damit umzugehen.


Ein weiteres Beispiel: Die Stufe alarm_detector gibt nichts zurück, wenn der aktuelle verdächtige Wert keinen neuen Alarmfall erzeugt.


Implementierungsdetails



Beginnen wir mit Datentypen und Funktionen, die sich auf die Anwendungslogik beziehen. In dem diskutierten Beispiel werden die folgenden Datentypen verwendet, um Informationen von einer Stufe zur anderen zu übergeben:


 // Raw data from a sensor. struct raw_measure { int m_meter_id; uint8_t m_high_bits; uint8_t m_low_bits; }; // Type of input for validation stage with raw data from a sensor. struct raw_value { raw_measure m_data; }; // Type of input for conversion stage with valid raw data from a sensor. struct valid_raw_value { raw_measure m_data; }; // Data from a sensor after conversion to Celsius degrees. struct calculated_measure { int m_meter_id; float m_measure; }; // The type for result of conversion stage with converted data from a sensor. struct sensor_value { calculated_measure m_data; }; // Type with value which could mean a dangerous level of temperature. struct suspicious_value { calculated_measure m_data; }; // Type with information about detected dangerous situation. struct alarm_detected { int m_meter_id; }; 

Eine Instanz von raw_value geht in die erste Phase unserer Pipeline. Dieser raw_value enthält Informationen, die von einem Sensor in Form eines raw_measure Objekts raw_measure . Dann wird raw_value in valid_raw_value . Dann valid_raw_value sensor_value mit dem Wert eines tatsächlichen Sensors in Form von calulated_measure in calulated_measure . Wenn eine Instanz von sensor_value einen verdächtigen Wert enthält, wird eine Instanz von suspicious_value erzeugt. Und dieser suspicious_value kann später in eine alarm_detected Instanz umgewandelt werden.


Oder in grafischer Form:



Jetzt können wir einen Blick auf die Implementierung unserer Pipeline-Phasen werfen:


 // // The first stage of a pipeline. Validation of raw data from a sensor. // // Returns valid_raw_value or nothing if value is invalid. // stage_result_t< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return make_result< valid_raw_value >( v.m_data ); else return make_empty< valid_raw_value >(); } // // The second stage of a pipeline. Conversion from raw data to a value // in Celsius degrees. // stage_result_t< sensor_value > conversion( const valid_raw_value & v ) { return make_result< sensor_value >( calculated_measure{ v.m_data.m_meter_id, 0.5f * ((static_cast< uint16_t >( v.m_data.m_high_bits ) << 8) + v.m_data.m_low_bits) } ); } // // Simulation of the data archiving. // void archiving( const sensor_value & v ) { clog << "archiving (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } // // Simulation of the data distribution. // void distribution( const sensor_value & v ) { clog << "distributing (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } // // The first stage of a child pipeline at third level of the main pipeline. // // Checking for to high value of the temperature. // // Returns suspicious_value message or nothing. // stage_result_t< suspicious_value > range_checking( const sensor_value & v ) { if( v.m_data.m_measure >= 45.0f ) return make_result< suspicious_value >( v.m_data ); else return make_empty< suspicious_value >(); } // // The next stage of a child pipeline. // // Checks for two suspicious_value-es in 25ms time window. // class alarm_detector { using clock = chrono::steady_clock; public : stage_result_t< alarm_detected > operator()( const suspicious_value & v ) { if( m_previous ) if( *m_previous + chrono::milliseconds(25) > clock::now() ) { m_previous = nullopt; return make_result< alarm_detected >( v.m_data.m_meter_id ); } m_previous = clock::now(); return make_empty< alarm_detected >(); } private : optional< clock::time_point > m_previous; }; // // One of last stages of a child pipeline. // Imitates beginning of the alarm processing. // void alarm_initiator( const alarm_detected & v ) { clog << "=== alarm (" << v.m_meter_id << ") ===" << endl; } // // Another of last stages of a child pipeline. // Imitates distribution of the alarm. // void alarm_distribution( ostream & to, const alarm_detected & v ) { to << "alarm_distribution (" << v.m_meter_id << ")" << endl; } 

Überspringen stage_result_t make_result make_empty wie stage_result_t , make_result und make_empty , wir werden es im nächsten Abschnitt besprechen.


Ich hoffe, dass der Code dieser Stufen eher trivial ist. Der einzige Teil, der einer zusätzlichen Erläuterung bedarf, ist die Implementierung der Stufe alarm_detector .


In diesem Beispiel wird ein Alarm nur ausgelöst, wenn im Zeitfenster von 25 ms mindestens zwei suspicious_values Werte vorhanden sind. Wir müssen uns also die Zeit der vorherigen suspicious_value Instanz im Stadium alarm_detector . alarm_detector liegt daran, dass alarm_detector als Stateful Functor mit einem Funktionsaufrufoperator alarm_detector ist.


Stufen geben den Typ von SObjectizer anstelle von std :: optional zurück


Ich habe vorhin gesagt, dass die Stufe einen optionalen Wert zurückgeben könnte. Aber std::optional wird im Code nicht verwendet, der unterschiedliche Typ stage_result_t kann in der Implementierung von Stufen gesehen werden.


Dies liegt daran, dass einige der spezifischen Funktionen von SObjectizer hier eine Rolle spielen. Die zurückgegebenen Werte werden als Nachrichten zwischen den Agenten von SObjectizer (auch als Akteure bezeichnet) verteilt. Jede Nachricht in SObjectizer wird als dynamisch zugewiesenes Objekt gesendet. Wir haben hier also eine Art "Optimierung": Anstatt std::optional und dann ein neues Nachrichtenobjekt zuzuweisen, weisen wir einfach ein Nachrichtenobjekt zu und geben einen intelligenten Zeiger darauf zurück.


Tatsächlich ist stage_result_t nur ein typedef für SObjectizers shared_ptr-Analog:


 template< typename M > using stage_result_t = message_holder_t< M >; 

Und make_result und make_empty sind nur make_result make_empty stage_result_t mit oder ohne tatsächlichen Wert:


 template< typename M, typename... Args > stage_result_t< M > make_result( Args &&... args ) { return stage_result_t< M >::make(forward< Args >(args)...); } template< typename M > stage_result_t< M > make_empty() { return stage_result_t< M >(); } 

Der Einfachheit halber kann man mit Sicherheit sagen, dass die validation folgendermaßen ausgedrückt werden könnte:


 std::shared_ptr< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return std::make_shared< valid_raw_value >( v.m_data ); else return std::shared_ptr< valid_raw_value >{}; } 

Aufgrund der Besonderheiten von SObjectizer können wir std::shared_ptr jedoch nicht verwenden und müssen so_5::message_holder_t Typ so_5::message_holder_t befassen. Und wir verstecken das spezifisch hinter den stage_result_t , make_result und make_empty .


Stage_handler_t und Stage_builder_t Trennung


Ein wichtiger Punkt bei der Pipeline-Implementierung ist die Trennung von Stage-Handler- und Stage-Builder- Konzepten. Dies geschieht der Einfachheit halber. Das Vorhandensein dieser Konzepte ermöglichte mir zwei Schritte in der Pipeline-Definition.


Im ersten Schritt beschreibt ein Benutzer Pipeline-Phasen. Als Ergebnis erhalte ich eine Instanz von stage_t , die alle Pipeline-Stufen enthält.


Im zweiten Schritt wird eine Reihe von zugrunde liegenden SObjectizer-Agenten erstellt. Diese Agenten empfangen Nachrichten mit Ergebnissen der vorherigen Phasen und rufen die tatsächlichen Phasenhandler auf . Anschließend senden sie die Ergebnisse an die nächsten Phasen.


Um diese Gruppe von Agenten zu erstellen, muss jede Stufe einen Bühnenbauer haben . Stage Builder kann als Factory betrachtet werden, die den Agenten eines zugrunde liegenden SObjectizers erstellt.


Wir haben also die folgende Beziehung: Jede Pipeline-Stufe erzeugt zwei Objekte: Stage-Handler mit stufenbezogener Logik und Stage-Builder , der den Agenten eines zugrunde liegenden SObjectizers erstellt, um den Stage-Handler zum richtigen Zeitpunkt aufzurufen:



Der Stage Handler wird folgendermaßen dargestellt:


 template< typename In, typename Out > class stage_handler_t { public : using traits = handler_traits_t< In, Out >; using func_type = function< typename traits::output(const typename traits::input &) >; stage_handler_t( func_type handler ) : m_handler( move(handler) ) {} template< typename Callable > stage_handler_t( Callable handler ) : m_handler( handler ) {} typename traits::output operator()( const typename traits::input & a ) const { return m_handler( a ); } private : func_type m_handler; }; 

Wobei handler_traits_t folgendermaßen definiert werden:


 // // We have to deal with two types of stage handlers: // - intermediate handlers which will return some result (eg some new // message); // - terminal handlers which can return nothing (eg void instead of // stage_result_t<M>); // // This template with specialization defines `input` and `output` // aliases for both cases. // template< typename In, typename Out > struct handler_traits_t { using input = In; using output = stage_result_t< Out >; }; template< typename In > struct handler_traits_t< In, void > { using input = In; using output = void; }; 

Der Stage Builder wird nur durch std::function :


 using stage_builder_t = function< mbox_t(coop_t &, mbox_t) >; 

Hilfstypen lambda_traits_t und callable_traits_t


Da Stufen durch freie Funktionen oder Funktoren dargestellt werden können (z. B. Instanzen der Klasse alarm_detector oder von anonymen Compilern generierte Klassen, die Lambdas darstellen), benötigen wir einige Helfer, um die Argumente und den Rückgabewert der Stufe zu erkennen. Zu diesem Zweck habe ich den folgenden Code verwendet:


 // // Helper type for `arg_type` and `result_type` alises definition. // template< typename R, typename A > struct callable_traits_typedefs_t { using arg_type = A; using result_type = R; }; // // Helper type for dealing with stateful objects with operator() // (they could be user-defined objects or generated by compiler // like lambdas). // template< typename T > struct lambda_traits_t; template< typename M, typename A, typename T > struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) const > : public callable_traits_typedefs_t< M, A > {}; template< typename A, typename T > struct lambda_traits_t< void (T::*)(const A &) const > : public callable_traits_typedefs_t< void, A > {}; template< typename M, typename A, typename T > struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) > : public callable_traits_typedefs_t< M, A > {}; template< typename A, typename T > struct lambda_traits_t< void (T::*)(const A &) > : public callable_traits_typedefs_t< void, A > {}; // // Main type for definition of `arg_type` and `result_type` aliases. // With specialization for various cases. // template< typename T > struct callable_traits_t : public lambda_traits_t< decltype(&T::operator()) > {}; template< typename M, typename A > struct callable_traits_t< stage_result_t< M >(*)(const A &) > : public callable_traits_typedefs_t< M, A > {}; template< typename A > struct callable_traits_t< void(*)(const A &) > : public callable_traits_typedefs_t< void, A > {}; 

Ich hoffe, dass dieser Code für Leser mit guten C ++ - Kenntnissen durchaus verständlich ist. Wenn nicht, können Sie mich gerne in den Kommentaren fragen. Gerne erkläre ich Ihnen die Logik hinter lambda_traits_t und lambda_traits_t im Detail.


Stage () -, Broadcast () - und Operator | () -Funktionen


Jetzt können wir uns die wichtigsten Funktionen zum Aufbau von Pipelines ansehen. stage_t die Definition einer Vorlagenklasse stage_t :


 template< typename In, typename Out > struct stage_t { stage_builder_t m_builder; }; 

Es ist eine sehr einfache Struktur, die nur stage_bulder_t Instanz stage_bulder_t . Vorlagenparameter werden in stage_t nicht verwendet. Warum sind sie hier also vorhanden?


Sie sind für die Überprüfung der Typkompatibilität zwischen Pipeline-Stufen zur Kompilierungszeit erforderlich. Wir werden das bald sehen.


Schauen wir uns die einfachste Pipeline-Funktion an, die stage() :


 template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( Callable handler ) { stage_builder_t builder{ [h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent< a_stage_point_t<In, Out> >( std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; } 

Es erhält einen tatsächlichen Stage-Handler als einzelnen Parameter. Es kann ein Zeiger auf eine Funktion oder eine Lambda-Funktion oder einen Funktor sein. Die Arten der Eingabe und Ausgabe der Bühne werden aufgrund der "Vorlagenmagie" hinter der Vorlage callable_traits_t automatisch abgeleitet.


Eine Instanz des Stage Builders wird im Inneren erstellt und diese Instanz wird als Ergebnis der Funktion stage() in einem neuen stage_t Objekt zurückgegeben. Ein tatsächlicher Stage-Handler wird vom Stage-Builder Lambda erfasst und dann für die Erstellung eines zugrunde liegenden SObjectizer-Agenten verwendet (darüber werden wir im nächsten Abschnitt sprechen).


Die nächste zu überprüfende Funktion ist der operator|() , der zwei Stufen miteinander verkettet und eine neue Stufe zurückgibt:


 template< typename In, typename Out1, typename Out2 > stage_t< In, Out2 > operator|( stage_t< In, Out1 > && prev, stage_t< Out1, Out2 > && next ) { return { stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } }; } 

Der einfachste Weg, die Logik des operator|() zu erklären, besteht darin, ein Bild zu zeichnen. Nehmen wir an, wir haben den Ausdruck:


 stage(A) | stage(B) | stage(C) | stage(B) 

Dieser Ausdruck wird folgendermaßen transformiert:



Dort können wir auch sehen, wie die Typprüfung zur Kompilierungszeit funktioniert: Die Definition des operator|() erfordert, dass der Ausgabetyp der ersten Stufe die Eingabe der zweiten Stufe ist. Ist dies nicht der Fall, wird der Code nicht kompiliert.


Und jetzt können wir uns die komplexeste Funktion zum Aufbau von Pipelines ansehen, die broadcast() . Die Funktion selbst ist ziemlich einfach:


 template< typename In, typename Out, typename... Rest > stage_t< In, void > broadcast( stage_t< In, Out > && first, Rest &&... stages ) { stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; return { std::move(builder) }; } 

Der Hauptunterschied zwischen einer gewöhnlichen Bühne und einer Rundfunkbühne besteht darin, dass die Rundfunkbühne einen Vektor von Nebenbühnenbauern enthalten muss . Also müssen wir diesen Vektor erstellen und ihn an den Hauptbühnenbauer der Rundfunkbühne übergeben. Aus diesem Grund können wir einen Aufruf von collect_sink_builders in der Capture-Liste eines Lambdas in der Funktion collect_sink_builders broadcast() :


 stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] 

Wenn wir uns collect_sink_builder ansehen, collect_sink_builder wir den folgenden Code:


 // // Serie of helper functions for building description for // `broadcast` stage. // // Those functions are used for collecting // `builders` functions for every child pipeline. // // Please note that this functions checks that each child pipeline has the // same In type. // template< typename In, typename Out, typename... Rest > void move_sink_builder_to( vector< stage_builder_t > & receiver, stage_t< In, Out > && first, Rest &&... rest ) { receiver.emplace_back( move( first.m_builder ) ); if constexpr( 0u != sizeof...(rest) ) move_sink_builder_to<In>( receiver, forward< Rest >(rest)... ); } template< typename In, typename Out, typename... Rest > vector< stage_builder_t > collect_sink_builders( stage_t< In, Out > && first, Rest &&... stages ) { vector< stage_builder_t > receiver; receiver.reserve( 1 + sizeof...(stages) ); move_sink_builder_to<In>( receiver, move(first), std::forward<Rest>(stages)... ); return receiver; } 

Die Typprüfung zur Kompilierungszeit funktioniert auch hier: Ein Aufruf von move_sink_builder_to explizit durch den Typ 'In' parametrisiert. collect_sink_builders(stage_t<In1, Out1>, stage_t<In2, Out2>, ...) bedeutet, dass ein Aufruf in der Form collect_sink_builders(stage_t<In1, Out1>, stage_t<In2, Out2>, ...) zu einem Kompilierungsfehler führt, da der Compiler einen Aufruf von move_sink_builder_to<In1>(receiver, stage_t<In2, Out2>, ...) .


Ich kann auch feststellen, dass wir, da die Anzahl der Nebenpipelines für broadcast() zur Kompilierungszeit bekannt ist, std::array anstelle von std::vector und einige Speicherzuordnungen vermeiden können. Der Einfachheit halber wird hier jedoch std::vector verwendet.


Beziehung zwischen Stufen und Agenten / mboxen von SObjectizer


Die Idee hinter der Implementierung der Pipeline ist die Erstellung eines separaten Agenten für jede Pipeline-Phase. Ein Agent empfängt eine eingehende Nachricht, leitet sie an den entsprechenden Stage-Handler weiter , analysiert das Ergebnis und sendet das Ergebnis als eingehende Nachricht an die nächste Stufe, wenn das Ergebnis nicht leer ist. Dies kann durch das folgende Sequenzdiagramm veranschaulicht werden:



Einige SObjectizer-bezogene Dinge müssen zumindest kurz besprochen werden. Wenn Sie kein Interesse an solchen Details haben, können Sie die folgenden Abschnitte überspringen und direkt zum Abschluss gehen.


Coop ist eine Gruppe von Agenten, die zusammenarbeiten


Agenten werden nicht einzeln, sondern in Gruppen mit dem Namen coops in SObjectizer eingeführt. Ein Coop ist eine Gruppe von Agenten, die zusammenarbeiten sollten, und es macht keinen Sinn, die Arbeit fortzusetzen, wenn einer der Agenten der Gruppe fehlt.


Die Einführung von Agenten in SObjectizer sieht also so aus, als würde eine Coop-Instanz erstellt, diese Instanz mit den entsprechenden Agenten gefüllt und dann die Coop in SObjectizer registriert.


Aus diesem Grund ist das erste Argument für einen Bühnenbauer ein Verweis auf einen neuen Coop. Dieser Coop wird in der Funktion make_pipeline() erstellt ( make_pipeline() unten), dann von Stage Buildern make_pipeline() und dann registriert (erneut in der Funktion make_pipeline() ).


Meldungsfelder


SObjectizer implementiert mehrere Parallelitätsmodelle. Das Actor Model ist nur einer von ihnen. Aus diesem Grund kann sich SObjectizer erheblich von anderen Akteur-Frameworks unterscheiden. Einer der Unterschiede ist das Adressierungsschema für Nachrichten.


Nachrichten in SObjectizer richten sich nicht an Akteure, sondern an Nachrichtenfelder (mboxes). Schauspieler müssen Nachrichten von einer mbox abonnieren. Wenn ein Akteur einen bestimmten Nachrichtentyp von einer mbox abonniert, erhält er Nachrichten dieses Typs:



Diese Tatsache ist entscheidend, da Nachrichten von einer Stufe zur anderen gesendet werden müssen. Dies bedeutet, dass jede Stufe ihre mbox haben sollte und dass mbox für die vorherige Stufe bekannt sein sollte.


Jeder Schauspieler (auch bekannt als Agent) in SObjectizer hat die direkte mbox . Diese mbox ist nur dem Eigentümeragenten zugeordnet und kann von keinem anderen Agenten verwendet werden. Die direkten mboxen von Agenten, die für Stufen erstellt wurden, werden für die Stufeninteraktion verwendet.


Die spezifische Funktion dieses SObjectizers bestimmt einige Details der Pipeline-Implementierung.


Das erste ist die Tatsache, dass Stage Builder den folgenden Prototyp hat:


 mbox_t builder(coop_t &, mbox_t); 

Dies bedeutet, dass der Stage Builder eine mbox der nächsten Stufe empfängt und einen neuen Agenten erstellen sollte, der die Ergebnisse der Stufe an diese mbox sendet. Eine mbox des neuen Agenten sollte vom Stage Builder zurückgegeben werden . Diese mbox wird für die Erstellung eines Agenten für die vorherige Stufe verwendet.


Das zweite ist die Tatsache, dass Agenten für Stufen in Reservereihenfolge erstellt werden. Es bedeutet, dass wenn wir eine Pipeline haben:


 stage(A) | stage(B) | stage(C) 

Zuerst wird ein Agent für Stufe C erstellt, dann wird seine mbox für die Erstellung eines Agenten für Stufe B verwendet, und dann wird die mbox des Agenten der Stufe B für die Erstellung eines Agenten für Stufe A verwendet.


Beachten Sie auch, dass der operator|() keine Agenten erstellt:


 stage_builder_t{ [prev, next]( coop_t & coop, mbox_t next_stage ) -> mbox_t { auto m = next.m_builder( coop, std::move(next_stage) ); return prev.m_builder( coop, std::move(m) ); } } 

Der operator|() erstellt einen Builder, der nur andere Builder aufruft, aber keine zusätzlichen Agenten einführt. Also für den Fall:


 stage(A) | stage(B) 

Es werden nur zwei Agenten erstellt (für A-Stufe und B-Stufe), die dann im vom operator|() erstellten Stage Builder miteinander verknüpft werden.


Es gibt keinen Agenten für broadcast() Implementierung von broadcast()


Eine naheliegende Möglichkeit, eine Broadcast-Phase zu implementieren, besteht darin, einen speziellen Agenten zu erstellen, der eine eingehende Nachricht empfängt und diese Nachricht dann erneut an eine Liste von Ziel-Mboxes sendet. Dieser Weg wurde bei der ersten Implementierung der beschriebenen Pipeline DSL verwendet.


Aber unser Begleitprojekt, so5extra , hat jetzt eine spezielle Variante von mbox: Broadcasting one. Diese mbox macht genau das, was hier erforderlich ist: Sie nimmt eine neue Nachricht auf und sendet sie an eine Reihe von Ziel-mboxen.


Aus diesem Grund muss kein separater Broadcast-Agent erstellt werden. Wir können einfach die Broadcast-Mbox von so5extra verwenden:


 // // A special mbox for broadcasting of a message to a set of destination // mboxes. // using broadcast_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>; ... // // Inside the broadcast() function: // stage_builder_t builder{ [broadcasts = collect_sink_builders( move(first), forward< Rest >(stages)...)] ( coop_t & coop, mbox_t ) -> mbox_t { vector< mbox_t > mboxes; mboxes.reserve( broadcasts.size() ); for( const auto & b : broadcasts ) mboxes.emplace_back( b( coop, mbox_t{} ) ); // That is the creation of broadcasting mbox instance. return broadcast_mbox_t::make( coop.environment(), std::move(mboxes) ); } }; 

Implementierung von Stage-Agent


Jetzt können wir uns die Implementierung von Stage Agent ansehen:


 // // An agent which will be used as intermediate or terminal pipeline stage. // It will receive input message, call the stage handler and pass // handler result to the next stage (if any). // template< typename In, typename Out > class a_stage_point_t final : public agent_t { public : a_stage_point_t( context_t ctx, stage_handler_t< In, Out > handler, mbox_t next_stage ) : agent_t{ ctx } , m_handler{ move( handler ) } , m_next{ move(next_stage) } {} void so_define_agent() override { if( m_next ) // Because there is the next stage the appropriate // message handler will be used. so_subscribe_self().event( [=]( const In & evt ) { auto r = m_handler( evt ); if( r ) so_5::send( m_next, r ); } ); else // There is no next stage. A very simple message handler // will be used for that case. so_subscribe_self().event( [=]( const In & evt ) { m_handler( evt ); } ); } private : const stage_handler_t< In, Out > m_handler; const mbox_t m_next; }; // // A specialization of a_stage_point_t for the case of terminal stage of // a pipeline. This type will be used for stage handlers with void // return type. // template< typename In > class a_stage_point_t< In, void > final : public agent_t { public : a_stage_point_t( context_t ctx, stage_handler_t< In, void > handler, mbox_t next_stage ) : agent_t{ ctx } , m_handler{ move( handler ) } { if( next_stage ) throw std::runtime_error( "sink point cannot have next stage" ); } void so_define_agent() override { so_subscribe_self().event( [=]( const In & evt ) { m_handler( evt ); } ); } private : const stage_handler_t< In, void > m_handler; }; 

Es ist ziemlich trivial, wenn Sie die Grundlagen des SObjectizers verstehen. Wenn nicht, wird es ziemlich schwierig sein, es in wenigen Worten zu erklären (also zögern Sie nicht, Fragen in den Kommentaren zu stellen).


Die Hauptimplementierung des Agenten a_stage_point_t erstellt ein Abonnement für eine Nachricht vom Typ In. Wenn eine Nachricht dieses Typs eintrifft, wird der Stage-Handler aufgerufen. Wenn der Stage-Handler ein tatsächliches Ergebnis zurückgibt, wird das Ergebnis an die nächste Stufe gesendet (falls diese Stufe vorhanden ist).


Es gibt auch eine Version von a_stage_point_t für den Fall, dass die entsprechende Stufe die Endstufe ist und es nicht die nächste Stufe geben kann.


Die Implementierung von a_stage_point_t kann etwas kompliziert aussehen, aber glauben Sie mir, es ist einer der einfachsten Agenten, die ich geschrieben habe.


Funktion make_pipeline ()


Es ist Zeit, die letzte Pipeline-Funktion zu diskutieren, die make_pipeline() :


 template< typename In, typename Out, typename... Args > mbox_t make_pipeline( // SObjectizer Environment to work in. so_5::environment_t & env, // Definition of a pipeline. stage_t< In, Out > && sink, // Optional args to be passed to make_coop() function. Args &&... args ) { auto coop = env.make_coop( forward< Args >(args)... ); auto mbox = sink.m_builder( *coop, mbox_t{} ); env.register_coop( move(coop) ); return mbox; } 

Hier gibt es weder Magie noch Überraschungen. Wir müssen nur einen neuen Coop für die zugrunde liegenden Agenten der Pipeline erstellen, diesen Coop mit Agenten füllen, indem wir einen Stage Builder der obersten Ebene aufrufen, und diesen Coop dann in SObjectizer registrieren. Das alles.


Das Ergebnis von make_pipeline() ist die mbox der am weitesten links stehenden (ersten) Stufe der Pipeline. Diese mbox sollte zum Senden von Nachrichten an die Pipeline verwendet werden.


Die Simulation und Experimente damit


Jetzt haben wir Datentypen und Funktionen für unsere Anwendungslogik und die Tools zum Verketten dieser Funktionen zu einer Datenverarbeitungspipeline. Lassen Sie es uns tun und ein Ergebnis sehen:


 int main() { // Launch SObjectizer in a separate thread. wrapped_env_t sobj; // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ) ); // Send messages to a pipeline in a loop with 10ms delays. for( uint8_t i = 0; i < static_cast< uint8_t >(250); i += 10 ) { send< raw_value >( pipeline, raw_measure{ 0, 0, i } ); std::this_thread::sleep_for( chrono::milliseconds{10} ); } } 

Wenn wir dieses Beispiel ausführen, sehen wir die folgende Ausgabe:


 archiving (0,0) distributing (0,0) archiving (0,5) distributing (0,5) archiving (0,10) distributing (0,10) archiving (0,15) distributing (0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,30) distributing (0,30) ... archiving (0,105) distributing (0,105) archiving (0,110) distributing (0,110) === alarm (0) === alarm_distribution (0) archiving (0,115) distributing (0,115) archiving (0,120) distributing (0,120) === alarm (0) === alarm_distribution (0) 

Es funktioniert.


Aber es scheint, dass Phasen unserer Pipeline nacheinander ablaufen, nicht wahr?


Ja, das ist es. Dies liegt daran, dass alle Pipeline-Agenten an den Dispatcher des Standard-SObjectizers gebunden sind. Und dieser Dispatcher verwendet nur einen Arbeitsthread, um die Nachrichtenverarbeitung aller Agenten zu bedienen.


Dies kann jedoch leicht geändert werden. make_pipeline() einfach ein zusätzliches Argument an den make_pipeline() von make_pipeline() :


 // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(archiving), stage(distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(alarm_initiator), stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), disp::thread_pool::make_dispatcher( sobj.environment() ).binder( disp::thread_pool::bind_params_t{}.fifo( disp::thread_pool::fifo_t::individual ) ) ); 

Dadurch wird ein neuer Thread-Pool erstellt und alle Pipeline-Agenten an diesen Pool gebunden. Jeder Agent wird vom Pool unabhängig von anderen Agenten bedient.


Wenn wir das modifizierte Beispiel ausführen, können wir so etwas sehen:


 archiving (0,0) distributing (0,0) distributing (0,5) archiving (0,5) archiving (0,10) distributing (0,10) distributing (archiving (0,15) 0,15) archiving (0,20) distributing (0,20) archiving (0,25) distributing (0,25) archiving (0,distributing (030) ,30) ... archiving (0,distributing (0,105) 105) archiving (0,alarm_distribution (0) distributing (0,=== alarm (0) === 110) 110) archiving (distributing (0,0,115) 115) archiving (distributing (=== alarm (0) === 0alarm_distribution (0) 0,120) ,120) 

Wir können also sehen, dass verschiedene Phasen der Pipeline parallel arbeiten.


Aber ist es möglich, weiter zu gehen und Stufen an verschiedene Disponenten zu binden?


Ja, es ist möglich, aber wir müssen eine weitere Überladung für die Funktion stage() implementieren:


 template< typename Callable, typename In = typename callable_traits_t< Callable >::arg_type, typename Out = typename callable_traits_t< Callable >::result_type > stage_t< In, Out > stage( disp_binder_shptr_t disp_binder, Callable handler ) { stage_builder_t builder{ [binder = std::move(disp_binder), h = std::move(handler)]( coop_t & coop, mbox_t next_stage) -> mbox_t { return coop.make_agent_with_binder< a_stage_point_t<In, Out> >( std::move(binder), std::move(h), std::move(next_stage) ) ->so_direct_mbox(); } }; return { std::move(builder) }; } 

This version of stage() accepts not only a stage handler but also a dispatcher binder. Dispatcher binder is a way to bind an agent to the particular dispatcher. So to assign a stage to a specific working context we can create an appropriate dispatcher and then pass the binder to that dispatcher to stage() function. Let's do that:


 // An active_obj dispatcher to be used for some stages. auto ao_disp = disp::active_obj::make_dispatcher( sobj.environment() ); // Make a pipeline. auto pipeline = make_pipeline( sobj.environment(), stage(validation) | stage(conversion) | broadcast( stage(ao_disp.binder(), archiving), stage(ao_disp.binder(), distribution), stage(range_checking) | stage(alarm_detector{}) | broadcast( stage(ao_disp.binder(), alarm_initiator), stage(ao_disp.binder(), []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), disp::one_thread::make_dispatcher( sobj.environment() ).binder() ); 

In that case stages archiving , distribution , alarm_initiator and alarm_distribution will work on own worker threads. All other stages will work on the same single worker thread.


The conclusion


This was an interesting experiment and I was surprised how easy SObjectizer could be used in something like reactive programming or data-flow programming.


However, I don't think that pipeline DSL can be practically meaningful. It's too simple and, maybe not flexible enough. But, I hope, it can be a base for more interesting experiments for those why need to deal with different workflows and data-processing pipelines. At least as a base for some ideas in that area. C++ language a rather good here and some (not so complicated) template magic can help to catch various errors at compile-time.


In conclusion, I want to say that we see SObjectizer not as a specialized tool for solving a particular problem, but as a basic set of tools to be used in solutions for different problems. And, more importantly, that basic set can be extended for your needs. Just take a look at SObjectizer , try it, and share your feedback. Maybe you missed something in SObjectizer? Perhaps you don't like something? Tell us , and we can try to help you.


If you want to help further development of SObjectizer, please share a reference to it or to this article somewhere you want (Reddit, HackerNews, LinkedIn, Facebook, Twitter, ...). The more attention and the more feedback, the more new features will be incorporated into SObjectizer.


And many thanks for reading this ;)


PS. The source code for that example can be found in that repository .

Source: https://habr.com/ru/post/de460123/


All Articles