La machine d'événement garde le cycle de vie

Avertissement: cet article décrit une solution non évidente à un problème non évident. Avant de se précipiter oeufs mettre en pratique, je recommande de lire l'article à la fin et d'y réfléchir à deux fois.

mais_pourquoi


Bonjour à tous! Lorsque vous travaillez avec du code, nous devons souvent faire face à l' état . Un tel cas est le cycle de vie des objets. La gestion d'un objet avec plusieurs états possibles peut être une tâche très simple. Ajoutez ici une exécution asynchrone et la tâche est compliquée par un ordre de grandeur. Il existe une solution efficace et naturelle. Dans cet article, je vais parler de la machine d'événements et de la façon de l'implémenter dans Go.


Pourquoi gérer l'État?


Pour commencer, définissons le concept lui-même. L'exemple le plus simple d'un état: fichiers et diverses connexions. Vous ne pouvez pas simplement prendre et lire un fichier. Il doit d'abord être ouvert, et à la fin de préférence assurez-vous de fermer. Il s'avère que l'action en cours dépend du résultat de l'action précédente: la lecture dépend de l'ouverture. Le résultat enregistré est l'état.


Le principal problème avec l'État est la complexité. Tout état complique automatiquement le code. Vous devez stocker les résultats des actions en mémoire et ajouter diverses vérifications à la logique. C'est pourquoi les architectures sans état sont si attrayantes pour les programmeurs - personne ne veut des ennuis difficultés. Si les résultats de vos actions n'affectent pas la logique d'exécution, vous n'avez pas besoin d'un état.


Cependant, il existe une propriété qui vous fait tenir compte des difficultés. Un état vous oblige à suivre un ordre spécifique d'actions. En général, de telles situations doivent être évitées, mais ce n'est pas toujours possible. Un exemple est le cycle de vie des objets de programme. Grâce à une bonne gestion des états, on peut obtenir un comportement prévisible des objets avec un cycle de vie complexe.


Voyons maintenant comment le faire cool .


Automatique pour résoudre les problèmes


AK74


Lorsque les gens parlent d'états, les machines à états finis viennent immédiatement à l'esprit. C’est logique, car un automate est le moyen le plus naturel de gérer un état.


Je ne m'attarderai pas sur la théorie des automates , il y a plus qu'assez d'informations sur Internet.

Si vous cherchez des exemples de machines à états finis pour Go, vous rencontrerez certainement un lexer de Rob Pike . Un excellent exemple d'automate dans lequel les données traitées sont l'alphabet d'entrée. Cela signifie que les transitions d'état sont provoquées par le texte que le lexeur traite. Solution élégante à un problème spécifique.


La principale chose à comprendre est qu'un automate est une solution à un problème strictement spécifique. Par conséquent, avant de le considérer comme un remède à tous les problèmes, vous devez bien comprendre la tâche. Plus précisément, l'entité que vous souhaitez contrôler:


  • états - cycle de vie;
  • événements - ce qui provoque exactement la transition vers chaque état;
  • résultat du travail - données de sortie;
  • mode d'exécution (synchrone / asynchrone);
  • principaux cas d'utilisation.

Le lexer est beau, mais il ne change d'état qu'en raison des données qu'il traite lui-même. Mais qu'en est-il de la situation où l'utilisateur invoque des transitions? C'est là que la machine événementielle peut vous aider.


Exemple réel


Pour plus de clarté, j'analyserai un exemple de la bibliothèque phono .


Pour une immersion complète dans le contexte, vous pouvez lire l' article d'introduction . Ce n'est pas nécessaire pour ce sujet, mais cela aidera à mieux comprendre ce que nous gérons.

Et que gérons-nous?


phono est basé sur le pipeline DSP. Il se compose de trois étapes de traitement. Chaque étape peut comprendre un à plusieurs composants:


pipe_diagram


  1. pipe.Pump (pompe anglaise) est une étape obligatoire de réception du son, toujours un seul composant.
  2. pipe.Processor (gestionnaire anglais) - une étape facultative de traitement du son, de 0 à N composants.
  3. pipe.Sink (English sink) - une étape obligatoire de transmission du son, de 1 à N composants.

En fait, nous gérerons le cycle de vie du convoyeur.


Cycle de vie


Voici à quoi ressemble le diagramme d'état du pipe.Pipe .


pipe_lifecycle


Les italiques indiquent les transitions causées par la logique d'exécution interne. Gras - transitions causées par les événements. Le diagramme montre que les états sont divisés en 2 types:


  • états de repos - ready et en paused , vous ne pouvez les sauter que par événement
  • états actifs - en running et en pausing , transitions par événement et en raison de la logique d'exécution

Avant une analyse détaillée du code, un exemple clair de l'utilisation de tous les états:


 // PlayWav  .wav    portaudio  -. func PlayWav(wavFile string) error { bufferSize := phono.BufferSize(512) //      w, err := wav.NewPump(wavFile, bufferSize) //  wav pump if err != nil { return err } pa := portaudio.NewSink( //  portaudio sink bufferSize, w.WavSampleRate(), w.WavNumChannels(), ) p := pipe.New( //  pipe.Pipe    ready w.WavSampleRate(), pipe.WithPump(w), pipe.WithSinks(pa), ) p.Run() //    running   p.Run() errc := p.Pause() //    pausing   p.Pause() err = pipe.Wait(errc) //     paused if err != nil { return err } errc = p.Resume() //    running   p.Resume() err = pipe.Wait(errc) //     ready if err != nil { return err } return pipe.Wait(p.Close()) //      } 

Maintenant, tout d'abord.


Tout le code source est disponible dans le référentiel .

États et événements


Commençons par la chose la plus importante.


 // state      . type state interface { listen(*Pipe, target) (state, target) //    transition(*Pipe, eventMessage) (state, error) //   } // idleState  .        . type idleState interface { state } // activeState  .         //   . type activeState interface { state sendMessage(*Pipe) state //    } //  . type ( idleReady struct{} activeRunning struct{} activePausing struct{} idlePaused struct{} ) //  . var ( ready idleReady running activeRunning paused idlePaused pausing activePausing ) 

Grâce à des types distincts, les transitions sont également déclarées séparément pour chaque état. Cela évite l'énorme saucisses fonctions de transition avec des switch imbriquées. Les états eux-mêmes ne contiennent aucune donnée ni logique. Pour eux, vous pouvez déclarer des variables au niveau du package afin de ne pas le faire à chaque fois. L'interface d' state est nécessaire pour le polymorphisme. activeState parlerons d' activeState et idleState peu plus tard.


Les événements sont la deuxième partie la plus importante de notre machine.


 // event  . type event int //  . const ( run event = iota pause resume push measure cancel ) // target      . type target struct { state idleState //   errc chan error //   ,     } // eventMessage   ,    . type eventMessage struct { event //   params params //   components []string // id  target //      } 

Pour comprendre pourquoi le type target est nécessaire, considérons un exemple simple. Nous avons créé un nouveau convoyeur, il est ready . Maintenant, exécutez-le avec p.Run() . L'événement d' run est envoyé à la machine, le pipeline passe à l'état d' running . Comment savoir quand le convoyeur est terminé? C'est là que le type target nous aidera. Il indique à quel état de repos s'attendre après l'événement. Dans notre exemple, une fois le travail terminé, le pipeline passera à nouveau à l'état ready . La même chose dans le diagramme:



Maintenant, plus sur les types d'états. Plus précisément, sur les activeState idleState et activeState . Regardons les fonctions d' listen(*Pipe, target) (state, target) pour différents types d'étapes:


 // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } 

pipe.Pipe a différentes fonctions pour attendre une transition! Qu'y a-t-il?


 // idle     .    . func (p *Pipe) idle(s idleState, t target) (state, target) { if s == t.state || s == ready { t = t.dismiss() //  ,  target } for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { e.target.handle(err) } else if e.hasTarget() { t.dismiss() t = e.target } } if s != newState { return newState, t // ,    } } } // active     .     , //   . func (p *Pipe) active(s activeState, t target) (state, target) { for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { //  ? e.target.handle(err) // ,    } else if e.hasTarget() { // ,  target t.dismiss() //   t = e.target //   } case <-p.provide: //     newState = s.sendMessage(p) //    case err, ok := <-p.errc: //   if ok { //   ,  interrupt(p.cancel) //   t.handle(err) //    } //    ,  return ready, t //    ready } if s != newState { return newState, t // ,    } } } 

Ainsi, nous pouvons écouter différentes chaînes dans différents états. Par exemple, cela vous permet de ne pas envoyer de messages pendant une pause - nous n'écoutons tout simplement pas le canal correspondant.


Constructeur et démarrage de la machine



 // New      . //      ready. func New(sampleRate phono.SampleRate, options ...Option) *Pipe { p := &Pipe{ UID: phono.NewUID(), sampleRate: sampleRate, log: log.GetLogger(), processors: make([]*processRunner, 0), sinks: make([]*sinkRunner, 0), metrics: make(map[string]measurable), params: make(map[string][]phono.ParamFunc), feedback: make(map[string][]phono.ParamFunc), events: make(chan eventMessage, 1), //    cancel: make(chan struct{}), //     provide: make(chan struct{}), consume: make(chan message), } for _, option := range options { //   option(p)() } go p.loop() //    return p } 

En plus de l'initialisation et des options fonctionnelles , il y a le début d'une goroutine séparée avec le cycle principal. Eh bien, regardez-le:


 // loop ,     nil . func (p *Pipe) loop() { var s state = ready //   t := target{} for s != nil { s, t = s.listen(p, t) //      p.log.Debug(fmt.Sprintf("%v is %T", p, s)) } t.dismiss() close(p.events) //    } // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idleReady) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) return nil, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case run: if err := p.start(); err != nil { return s, err } return running, nil } return s, ErrInvalidState } 

Le convoyeur est créé et figé en prévision des événements.


Le temps de travailler


Appelez p.Run() !



 // Run   run  . //     pipe.Close  . func (p *Pipe) Run() chan error { runEvent := eventMessage{ event: run, target: target{ state: ready, //    errc: make(chan error, 1), }, } p.events <- runEvent return runEvent.target.errc } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activeRunning) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case pause: return pausing, nil } return s, ErrInvalidState } // sendMessage   . func (s activeRunning) sendMessage(p *Pipe) state { p.consume <- p.newMessage() return s } 

running génère des messages et s'exécute jusqu'à la fin du pipeline.


Pause


Pendant l'exécution du convoyeur, nous pouvons le mettre en pause. Dans cet état, le pipeline ne générera pas de nouveaux messages. Pour ce faire, appelez la méthode p.Pause() .



 // Pause   pause  . //     pipe.Close  . func (p *Pipe) Pause() chan error { pauseEvent := eventMessage{ event: pause, target: target{ state: paused, //    errc: make(chan error, 1), }, } p.events <- pauseEvent return pauseEvent.target.errc } // listen     pausing. func (s activePausing) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activePausing) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil } return s, ErrInvalidState } // sendMessage   .   -, //      .    //    ,      .  , // ,   , : // 1.     // 2.      func (s activePausing) sendMessage(p *Pipe) state { m := p.newMessage() if len(m.feedback) == 0 { m.feedback = make(map[string][]phono.ParamFunc) } var wg sync.WaitGroup //     wg.Add(len(p.sinks)) //   Sink for _, sink := range p.sinks { param := phono.ReceivedBy(&wg, sink.ID()) // - m.feedback = m.feedback.add(param) } p.consume <- m //   wg.Wait() // ,     return paused } 

Dès que tous les destinataires reçoivent le message, le pipeline passe à paused état paused . Si le message est le dernier, la transition vers l'état ready se produit.


De retour au travail!


Pour quitter l'état paused , appelez p.Resume() .



 // Resume   resume  . //     pipe.Close  . func (p *Pipe) Resume() chan error { resumeEvent := eventMessage{ event: resume, target: target{ state: ready, errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } // listen     paused. func (s idlePaused) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idlePaused) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case resume: return running, nil } return s, ErrInvalidState } 

Tout est trivial ici, le pipeline passe à nouveau en état de running .


Détendez-vous


Le convoyeur peut être arrêté depuis n'importe quel état. Il y a p.Close() .



 // Close   cancel  . //      . //    ,   . func (p *Pipe) Close() chan error { resumeEvent := eventMessage{ event: cancel, target: target{ state: nil, //   errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } 

Qui en a besoin?


Pas pour tout le monde. Pour comprendre exactement comment gérer l'état, vous devez comprendre votre tâche. Il existe exactement deux circonstances dans lesquelles vous pouvez utiliser une machine asynchrone basée sur des événements:


  1. Cycle de vie complexe - il y a trois états ou plus avec des transitions non linéaires.
  2. L'exécution asynchrone est utilisée.

Bien que la machine événementielle résout le problème, c'est un modèle assez compliqué. Par conséquent, il doit être utilisé avec grand soin et uniquement après une compréhension complète de tous les avantages et inconvénients.


Les références


Source: https://habr.com/ru/post/fr431048/


All Articles