Eventmaschine schützt den Lebenszyklus

Haftungsausschluss: Dieser Artikel beschreibt eine nicht offensichtliche Lösung für ein nicht offensichtliches Problem. Vor dem Rauschen Eier Um es in die Praxis umzusetzen, empfehle ich, den Artikel bis zum Ende zu lesen und zweimal darüber nachzudenken.

aber warum


Hallo allerseits! Wenn wir mit Code arbeiten, müssen wir uns oft mit dem Status befassen. Ein solcher Fall ist der Lebenszyklus von Objekten. Das Verwalten eines Objekts mit mehreren möglichen Zuständen kann eine sehr nicht triviale Aufgabe sein. Fügen Sie hier eine asynchrone Ausführung hinzu, und die Aufgabe wird um eine Größenordnung kompliziert. Es gibt eine effektive und natürliche Lösung. In diesem Artikel werde ich über die Ereignismaschine und deren Implementierung in Go sprechen.


Warum den Staat verwalten?


Lassen Sie uns zunächst das Konzept selbst definieren. Das einfachste Beispiel für einen Status: Dateien und verschiedene Verbindungen. Sie können nicht einfach eine Datei nehmen und lesen. Es muss zuerst geöffnet werden und am Ende vorzugsweise Achten Sie darauf, zu schließen. Es stellt sich heraus, dass die aktuelle Aktion vom Ergebnis der vorherigen Aktion abhängt: Das Lesen hängt von der Öffnung ab. Das gespeicherte Ergebnis ist der Status.


Das Hauptproblem mit dem Staat ist die Komplexität. Jeder Status verkompliziert den Code automatisch. Sie müssen die Ergebnisse von Aktionen im Speicher speichern und der Logik verschiedene Prüfungen hinzufügen. Deshalb sind zustandslose Architekturen für Programmierer so attraktiv - niemand will Ärger Schwierigkeiten. Wenn die Ergebnisse Ihrer Aktionen keinen Einfluss auf die Ausführungslogik haben, benötigen Sie keinen Status.


Es gibt jedoch eine Eigenschaft, mit der Sie mit den Schwierigkeiten rechnen müssen. Für einen Status müssen Sie eine bestimmte Reihenfolge von Aktionen einhalten. Im Allgemeinen sollten solche Situationen vermieden werden, dies ist jedoch nicht immer möglich. Ein Beispiel ist der Lebenszyklus von Programmobjekten. Dank eines guten Zustandsmanagements kann ein vorhersagbares Verhalten von Objekten mit einem komplexen Lebenszyklus erzielt werden.


Lassen Sie uns nun herausfinden, wie man es cool macht .


Automatisch, um Probleme zu lösen


AK74


Wenn Menschen über Zustände sprechen, fallen ihnen sofort endliche Zustandsmaschinen ein. Dies ist logisch, da ein Automat die natürlichste Art ist, einen Zustand zu verwalten.


Ich werde mich nicht mit der Theorie der Automaten befassen , da es im Internet mehr als genug Informationen gibt.

Wenn Sie nach Beispielen für Finite-State-Maschinen für Go suchen, werden Sie auf jeden Fall einen Lexer von Rob Pike treffen. Ein gutes Beispiel für einen Automaten, bei dem die verarbeiteten Daten das Eingabealphabet sind. Dies bedeutet, dass Zustandsübergänge durch den vom Lexer verarbeiteten Text verursacht werden. Elegante Lösung für ein bestimmtes Problem.


Das Wichtigste zu verstehen ist, dass ein Automat eine Lösung für ein streng spezifisches Problem ist. Bevor Sie es als Abhilfe für alle Probleme betrachten, müssen Sie die Aufgabe daher vollständig verstehen. Insbesondere die Entität, die Sie steuern möchten:


  • Zustände - Lebenszyklus;
  • Ereignisse - was genau bewirkt den Übergang in jeden Zustand;
  • Arbeitsergebnis - Ausgabedaten;
  • Ausführungsmodus (synchron / asynchron);
  • Hauptanwendungsfälle.

Der Lexer ist wunderschön, ändert jedoch nur den Status aufgrund von Daten, die er selbst verarbeitet. Aber was ist mit der Situation, wenn der Benutzer Übergänge aufruft? Hier kann die Eventmaschine helfen.


Echtes Beispiel


Um es klarer zu machen, werde ich ein Beispiel aus der phono analysieren.


Um vollständig in den Kontext einzutauchen, können Sie den einleitenden Artikel lesen. Dies ist für dieses Thema nicht erforderlich, hilft jedoch dabei, besser zu verstehen, was wir verwalten.

Und was verwalten wir?


phono basiert auf der DSP-Pipeline. Es besteht aus drei Verarbeitungsstufen. Jede Stufe kann eine bis mehrere Komponenten umfassen:


pipe_diagram


  1. pipe.Pump (englische Pumpe) ist eine obligatorische Stufe für den pipe.Pump , immer nur eine Komponente.
  2. pipe.Processor (englischer Handler) - eine optionale Stufe der Klangverarbeitung von 0 bis N Komponenten.
  3. pipe.Sink (englische Spüle) - eine obligatorische Stufe der Schallübertragung von 1 bis N Komponenten.

Eigentlich werden wir den Fördererlebenszyklus verwalten.


Lebenszyklus


So sieht das pipe.Pipe Zustandsdiagramm aus.


pipe_lifecycle


Kursivschrift kennzeichnet Übergänge, die durch die interne Ausführungslogik verursacht werden. Fett - Übergänge, die durch Ereignisse verursacht werden. Das Diagramm zeigt, dass die Zustände in zwei Typen unterteilt sind:


  • Ruhezustände - ready und paused , Sie können nur nach Ereignis von ihnen springen
  • aktive Zustände - running und pausing , Übergänge nach Ereignis und aufgrund der Ausführungslogik

Vor einer detaillierten Analyse des Codes ein klares Beispiel für die Verwendung aller Zustände:


 // PlayWav  .wav    portaudio  -. func PlayWav(wavFile string) error { bufferSize := phono.BufferSize(512) //      w, err := wav.NewPump(wavFile, bufferSize) //  wav pump if err != nil { return err } pa := portaudio.NewSink( //  portaudio sink bufferSize, w.WavSampleRate(), w.WavNumChannels(), ) p := pipe.New( //  pipe.Pipe    ready w.WavSampleRate(), pipe.WithPump(w), pipe.WithSinks(pa), ) p.Run() //    running   p.Run() errc := p.Pause() //    pausing   p.Pause() err = pipe.Wait(errc) //     paused if err != nil { return err } errc = p.Resume() //    running   p.Resume() err = pipe.Wait(errc) //     ready if err != nil { return err } return pipe.Wait(p.Close()) //      } 

Nun, das Wichtigste zuerst.


Der gesamte Quellcode ist im Repository verfügbar.

Staaten und Ereignisse


Beginnen wir mit dem Wichtigsten.


 // state      . type state interface { listen(*Pipe, target) (state, target) //    transition(*Pipe, eventMessage) (state, error) //   } // idleState  .        . type idleState interface { state } // activeState  .         //   . type activeState interface { state sendMessage(*Pipe) state //    } //  . type ( idleReady struct{} activeRunning struct{} activePausing struct{} idlePaused struct{} ) //  . var ( ready idleReady running activeRunning paused idlePaused pausing activePausing ) 

Dank separater Typen werden Übergänge auch für jeden Status separat deklariert. Dies vermeidet das riesige Würstchen Übergangsfunktionen mit verschachtelten switch . Die Zustände selbst enthalten keine Daten oder Logik. Für sie können Sie Variablen auf Paketebene deklarieren, um dies nicht jedes Mal zu tun. Die state wird für den Polymorphismus benötigt. activeState idleState etwas später über activeState und idleState sprechen.


Der zweitwichtigste Teil unserer Maschine sind Ereignisse.


 // event  . type event int //  . const ( run event = iota pause resume push measure cancel ) // target      . type target struct { state idleState //   errc chan error //   ,     } // eventMessage   ,    . type eventMessage struct { event //   params params //   components []string // id  target //      } 

Betrachten Sie ein einfaches Beispiel, um zu verstehen, warum der target benötigt wird. Wir haben ein neues Förderband geschaffen, es ist ready . Führen Sie es nun mit p.Run() . Das run wird an die Maschine gesendet, die Pipeline geht in den running . Wie kann man herausfinden, wann der Förderer fertig ist? Hier hilft uns der Zieltyp. Es zeigt an, welcher Ruhezustand nach dem Ereignis zu erwarten ist. In unserem Beispiel wird die Pipeline nach Abschluss der Arbeiten wieder in den ready . Das gleiche im Diagramm:



Nun mehr zu den Arten von Staaten. Genauer gesagt über die activeState idleState und activeState . Schauen wir uns die listen(*Pipe, target) (state, target) für verschiedene Arten von Stufen an:


 // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } 

pipe.Pipe hat verschiedene Funktionen, um auf einen Übergang zu warten! Was ist dort?


 // idle     .    . func (p *Pipe) idle(s idleState, t target) (state, target) { if s == t.state || s == ready { t = t.dismiss() //  ,  target } for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { e.target.handle(err) } else if e.hasTarget() { t.dismiss() t = e.target } } if s != newState { return newState, t // ,    } } } // active     .     , //   . func (p *Pipe) active(s activeState, t target) (state, target) { for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { //  ? e.target.handle(err) // ,    } else if e.hasTarget() { // ,  target t.dismiss() //   t = e.target //   } case <-p.provide: //     newState = s.sendMessage(p) //    case err, ok := <-p.errc: //   if ok { //   ,  interrupt(p.cancel) //   t.handle(err) //    } //    ,  return ready, t //    ready } if s != newState { return newState, t // ,    } } } 

Somit können wir verschiedene Kanäle in verschiedenen Zuständen hören. Auf diese Weise können Sie beispielsweise während einer Pause keine Nachrichten senden. Wir hören nur nicht auf den entsprechenden Kanal.


Konstruktor und Startmaschine



 // New      . //      ready. func New(sampleRate phono.SampleRate, options ...Option) *Pipe { p := &Pipe{ UID: phono.NewUID(), sampleRate: sampleRate, log: log.GetLogger(), processors: make([]*processRunner, 0), sinks: make([]*sinkRunner, 0), metrics: make(map[string]measurable), params: make(map[string][]phono.ParamFunc), feedback: make(map[string][]phono.ParamFunc), events: make(chan eventMessage, 1), //    cancel: make(chan struct{}), //     provide: make(chan struct{}), consume: make(chan message), } for _, option := range options { //   option(p)() } go p.loop() //    return p } 

Neben der Initialisierung und den Funktionsoptionen beginnt eine separate Goroutine mit dem Hauptzyklus. Schau ihn dir an:


 // loop ,     nil . func (p *Pipe) loop() { var s state = ready //   t := target{} for s != nil { s, t = s.listen(p, t) //      p.log.Debug(fmt.Sprintf("%v is %T", p, s)) } t.dismiss() close(p.events) //    } // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idleReady) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) return nil, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case run: if err := p.start(); err != nil { return s, err } return running, nil } return s, ErrInvalidState } 

Der Förderer wird im Vorgriff auf Ereignisse erstellt und eingefroren.


Zeit zu arbeiten


Rufen Sie p.Run() !



 // Run   run  . //     pipe.Close  . func (p *Pipe) Run() chan error { runEvent := eventMessage{ event: run, target: target{ state: ready, //    errc: make(chan error, 1), }, } p.events <- runEvent return runEvent.target.errc } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activeRunning) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case pause: return pausing, nil } return s, ErrInvalidState } // sendMessage   . func (s activeRunning) sendMessage(p *Pipe) state { p.consume <- p.newMessage() return s } 

running generiert Nachrichten und wird ausgeführt, bis die Pipeline abgeschlossen ist.


Pause


Während der Ausführung des Förderers können wir ihn anhalten. In diesem Zustand generiert die Pipeline keine neuen Nachrichten. Rufen Sie dazu die Methode p.Pause() .



 // Pause   pause  . //     pipe.Close  . func (p *Pipe) Pause() chan error { pauseEvent := eventMessage{ event: pause, target: target{ state: paused, //    errc: make(chan error, 1), }, } p.events <- pauseEvent return pauseEvent.target.errc } // listen     pausing. func (s activePausing) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activePausing) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil } return s, ErrInvalidState } // sendMessage   .   -, //      .    //    ,      .  , // ,   , : // 1.     // 2.      func (s activePausing) sendMessage(p *Pipe) state { m := p.newMessage() if len(m.feedback) == 0 { m.feedback = make(map[string][]phono.ParamFunc) } var wg sync.WaitGroup //     wg.Add(len(p.sinks)) //   Sink for _, sink := range p.sinks { param := phono.ReceivedBy(&wg, sink.ID()) // - m.feedback = m.feedback.add(param) } p.consume <- m //   wg.Wait() // ,     return paused } 

Sobald alle Empfänger die Nachricht erhalten, wechselt die Pipeline paused Zustand. Wenn die Nachricht die letzte ist, erfolgt der Übergang in den ready .


Zurück zur Arbeit!


Rufen Sie p.Resume() , um den paused Status zu p.Resume() .



 // Resume   resume  . //     pipe.Close  . func (p *Pipe) Resume() chan error { resumeEvent := eventMessage{ event: resume, target: target{ state: ready, errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } // listen     paused. func (s idlePaused) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idlePaused) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case resume: return running, nil } return s, ErrInvalidState } 

Hier ist alles trivial, die Pipeline geht wieder in running .


Machen Sie es sich bequem


Der Förderer kann aus jedem Zustand angehalten werden. Dafür gibt es p.Close() .



 // Close   cancel  . //      . //    ,   . func (p *Pipe) Close() chan error { resumeEvent := eventMessage{ event: cancel, target: target{ state: nil, //   errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } 

Wer braucht das?


Nicht für jedermann. Um genau zu verstehen, wie der Status verwaltet wird, müssen Sie Ihre Aufgabe verstehen. Es gibt genau zwei Umstände, unter denen Sie eine ereignisbasierte asynchrone Maschine verwenden können:


  1. Komplexer Lebenszyklus - Es gibt drei oder mehr Zustände mit nichtlinearen Übergängen.
  2. Es wird eine asynchrone Ausführung verwendet.

Obwohl die Ereignismaschine das Problem löst, ist es ein ziemlich kompliziertes Muster. Daher sollte es mit großer Sorgfalt und nur nach einem vollständigen Verständnis aller Vor- und Nachteile verwendet werden.


Referenzen


Source: https://habr.com/ru/post/de431048/


All Articles