La máquina de eventos protege el ciclo de vida

Descargo de responsabilidad: este artículo describe una solución no obvia a un problema no obvio. Antes de apresurarse huevos Para ponerlo en práctica, recomiendo leer el artículo hasta el final y pensarlo dos veces.

pero_por que


Hola a todos! Cuando trabajamos con código, a menudo tenemos que lidiar con el estado . Uno de esos casos es el ciclo de vida de los objetos. Administrar un objeto con varios estados posibles puede ser una tarea muy trivial. Agregue la ejecución asincrónica aquí y la tarea se complica por un orden de magnitud. Existe una solución efectiva y natural. En este artículo hablaré sobre la máquina de eventos y cómo implementarla en Go.


¿Por qué administrar el estado?


Para comenzar, definamos el concepto mismo. El ejemplo más simple de un estado: archivos y varias conexiones. No puedes simplemente tomar y leer un archivo. Primero debe abrirse, y al final preferiblemente asegúrese de cerrar Resulta que la acción actual depende del resultado de la acción anterior: la lectura depende de la apertura. El resultado guardado es el estado.


El principal problema con el estado es la complejidad. Cualquier estado complica automáticamente el código. Debe almacenar los resultados de las acciones en la memoria y agregar varias comprobaciones a la lógica. Es por eso que las arquitecturas sin estado son tan atractivas para los programadores: nadie quiere problemas dificultades Si los resultados de sus acciones no afectan la lógica de ejecución, no necesita un estado.


Sin embargo, hay una propiedad que te hace tener en cuenta las dificultades. Un estado requiere que sigas un orden específico de acciones. En general, tales situaciones deben evitarse, pero esto no siempre es posible. Un ejemplo es el ciclo de vida de los objetos del programa. Gracias a la buena gestión del estado, se puede obtener un comportamiento predecible de los objetos con un ciclo de vida complejo.


Ahora veamos cómo hacerlo bien .


Automático como una forma de resolver problemas


AK74


Cuando la gente habla de estados, las máquinas de estados finitos vienen inmediatamente a la mente. Es lógico, porque un autómata es la forma más natural de administrar un estado.


No profundizaré en la teoría de los autómatas ; hay información más que suficiente en Internet.

Si busca ejemplos de máquinas de estado finito para Go, definitivamente se encontrará con un lexer de Rob Pike . Un gran ejemplo de un autómata en el que los datos procesados ​​son el alfabeto de entrada. Esto significa que las transiciones de estado son causadas por el texto que procesa el lexer. Solución elegante a un problema específico.


Lo principal a entender es que un autómata es una solución a un problema estrictamente específico. Por lo tanto, antes de considerarlo como un remedio para todos los problemas, debe comprender completamente la tarea. Específicamente, la entidad que desea controlar:


  • estados - ciclo de vida;
  • eventos: qué causa exactamente la transición a cada estado;
  • resultado del trabajo - datos de salida;
  • modo de ejecución (síncrono / asíncrono);
  • Principales casos de uso.

El lexer es hermoso, pero solo cambia de estado debido a los datos que procesa. Pero, ¿qué pasa con la situación cuando el usuario invoca transiciones? Aquí es donde la máquina de eventos puede ayudar.


Ejemplo real


Para aclararlo, analizaré un ejemplo de la biblioteca de phono .


Para una inmersión completa en el contexto, puede leer el artículo introductorio . Esto no es necesario para este tema, pero ayudará a comprender mejor lo que estamos gestionando.

¿Y qué estamos gestionando?


phono se basa en la tubería DSP. Consiste en tres etapas de procesamiento. Cada etapa puede incluir de uno a varios componentes:


pipe_diagram


  1. pipe.Pump (bomba inglesa) es una etapa obligatoria de recepción de sonido, siempre solo un componente.
  2. pipe.Processor (controlador en inglés): una etapa opcional de procesamiento de sonido, de 0 a N componentes.
  3. pipe.Sink (sumidero inglés): una etapa obligatoria de transmisión de sonido, de 1 a N componentes.

En realidad, gestionaremos el ciclo de vida del transportador.


Ciclo de vida


Así es como se ve el diagrama del estado de la pipe.Pipe .


pipe_lifecycle


La cursiva indica transiciones causadas por la lógica de ejecución interna. Negrita : transiciones causadas por eventos. El diagrama muestra que los estados se dividen en 2 tipos:


  • estados inactivos : ready y en paused , solo puede saltar de ellos por evento
  • estados activos : running y pausing , transiciones por evento y debido a la lógica de ejecución

Antes de un análisis detallado del código, un claro ejemplo del uso de todos los estados:


 // PlayWav  .wav    portaudio  -. func PlayWav(wavFile string) error { bufferSize := phono.BufferSize(512) //      w, err := wav.NewPump(wavFile, bufferSize) //  wav pump if err != nil { return err } pa := portaudio.NewSink( //  portaudio sink bufferSize, w.WavSampleRate(), w.WavNumChannels(), ) p := pipe.New( //  pipe.Pipe    ready w.WavSampleRate(), pipe.WithPump(w), pipe.WithSinks(pa), ) p.Run() //    running   p.Run() errc := p.Pause() //    pausing   p.Pause() err = pipe.Wait(errc) //     paused if err != nil { return err } errc = p.Resume() //    running   p.Resume() err = pipe.Wait(errc) //     ready if err != nil { return err } return pipe.Wait(p.Close()) //      } 

Ahora, lo primero es lo primero.


Todo el código fuente está disponible en el repositorio .

Estados y eventos


Comencemos con lo más importante.


 // state      . type state interface { listen(*Pipe, target) (state, target) //    transition(*Pipe, eventMessage) (state, error) //   } // idleState  .        . type idleState interface { state } // activeState  .         //   . type activeState interface { state sendMessage(*Pipe) state //    } //  . type ( idleReady struct{} activeRunning struct{} activePausing struct{} idlePaused struct{} ) //  . var ( ready idleReady running activeRunning paused idlePaused pausing activePausing ) 

Gracias a tipos separados, las transiciones también se declaran por separado para cada estado. Esto evita lo enorme salchichas funciones de transición con switch anidadas. Los estados en sí no contienen ningún dato o lógica. Para ellos, puede declarar variables a nivel de paquete para que no lo haga todo el tiempo. La interfaz de state es necesaria para el polimorfismo. activeState de activeState e idleState poco más tarde.


La segunda parte más importante de nuestra máquina son los eventos.


 // event  . type event int //  . const ( run event = iota pause resume push measure cancel ) // target      . type target struct { state idleState //   errc chan error //   ,     } // eventMessage   ,    . type eventMessage struct { event //   params params //   components []string // id  target //      } 

Para comprender por qué se necesita el tipo de target , considere un ejemplo simple. Hemos creado un nuevo transportador, está ready . Ahora ejecútelo con p.Run() . El evento de run se envía a la máquina, la tubería pasa al estado de running . ¿Cómo saber cuándo se termina el transportador? Aquí es donde el tipo de target nos ayudará. Indica qué estado de descanso esperar después del evento. En nuestro ejemplo, una vez completado el trabajo, la tubería volverá a entrar en el estado ready . Lo mismo en el diagrama:



Ahora más sobre los tipos de estados. Más precisamente, sobre las activeState idleState y activeState . Veamos las funciones de listen(*Pipe, target) (state, target) para diferentes tipos de etapas:


 // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } 

pipe.Pipe tiene diferentes funciones para esperar una transición. Que hay


 // idle     .    . func (p *Pipe) idle(s idleState, t target) (state, target) { if s == t.state || s == ready { t = t.dismiss() //  ,  target } for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { e.target.handle(err) } else if e.hasTarget() { t.dismiss() t = e.target } } if s != newState { return newState, t // ,    } } } // active     .     , //   . func (p *Pipe) active(s activeState, t target) (state, target) { for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { //  ? e.target.handle(err) // ,    } else if e.hasTarget() { // ,  target t.dismiss() //   t = e.target //   } case <-p.provide: //     newState = s.sendMessage(p) //    case err, ok := <-p.errc: //   if ok { //   ,  interrupt(p.cancel) //   t.handle(err) //    } //    ,  return ready, t //    ready } if s != newState { return newState, t // ,    } } } 

Por lo tanto, podemos escuchar diferentes canales en diferentes estados. Por ejemplo, esto le permite no enviar mensajes durante una pausa, simplemente no escuchamos el canal correspondiente.


Constructor y arranque de la máquina.



 // New      . //      ready. func New(sampleRate phono.SampleRate, options ...Option) *Pipe { p := &Pipe{ UID: phono.NewUID(), sampleRate: sampleRate, log: log.GetLogger(), processors: make([]*processRunner, 0), sinks: make([]*sinkRunner, 0), metrics: make(map[string]measurable), params: make(map[string][]phono.ParamFunc), feedback: make(map[string][]phono.ParamFunc), events: make(chan eventMessage, 1), //    cancel: make(chan struct{}), //     provide: make(chan struct{}), consume: make(chan message), } for _, option := range options { //   option(p)() } go p.loop() //    return p } 

Además de la inicialización y las opciones funcionales , existe el inicio de una rutina de rutina separada con el ciclo principal. Bueno, míralo:


 // loop ,     nil . func (p *Pipe) loop() { var s state = ready //   t := target{} for s != nil { s, t = s.listen(p, t) //      p.log.Debug(fmt.Sprintf("%v is %T", p, s)) } t.dismiss() close(p.events) //    } // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idleReady) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) return nil, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case run: if err := p.start(); err != nil { return s, err } return running, nil } return s, ErrInvalidState } 

El transportador se crea y se congela en previsión de eventos.


Hora de trabajar


Llame a p.Run() !



 // Run   run  . //     pipe.Close  . func (p *Pipe) Run() chan error { runEvent := eventMessage{ event: run, target: target{ state: ready, //    errc: make(chan error, 1), }, } p.events <- runEvent return runEvent.target.errc } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activeRunning) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case pause: return pausing, nil } return s, ErrInvalidState } // sendMessage   . func (s activeRunning) sendMessage(p *Pipe) state { p.consume <- p.newMessage() return s } 

running genera mensajes y se ejecuta hasta que se complete la canalización.


Pausa


Durante la ejecución del transportador, podemos pausarlo. En este estado, la canalización no generará nuevos mensajes. Para hacer esto, llame al método p.Pause() .



 // Pause   pause  . //     pipe.Close  . func (p *Pipe) Pause() chan error { pauseEvent := eventMessage{ event: pause, target: target{ state: paused, //    errc: make(chan error, 1), }, } p.events <- pauseEvent return pauseEvent.target.errc } // listen     pausing. func (s activePausing) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activePausing) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil } return s, ErrInvalidState } // sendMessage   .   -, //      .    //    ,      .  , // ,   , : // 1.     // 2.      func (s activePausing) sendMessage(p *Pipe) state { m := p.newMessage() if len(m.feedback) == 0 { m.feedback = make(map[string][]phono.ParamFunc) } var wg sync.WaitGroup //     wg.Add(len(p.sinks)) //   Sink for _, sink := range p.sinks { param := phono.ReceivedBy(&wg, sink.ID()) // - m.feedback = m.feedback.add(param) } p.consume <- m //   wg.Wait() // ,     return paused } 

Tan pronto como todos los destinatarios reciban el mensaje, la canalización entrará paused estado de paused . Si el mensaje es el último, se producirá la transición al estado ready .


De vuelta al trabajo!


Para salir del estado en paused , llame a p.Resume() .



 // Resume   resume  . //     pipe.Close  . func (p *Pipe) Resume() chan error { resumeEvent := eventMessage{ event: resume, target: target{ state: ready, errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } // listen     paused. func (s idlePaused) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idlePaused) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case resume: return running, nil } return s, ErrInvalidState } 

Aquí todo es trivial, la tubería vuelve a running .


Acurrucarse


El transportador se puede detener desde cualquier estado. Hay p.Close() .



 // Close   cancel  . //      . //    ,   . func (p *Pipe) Close() chan error { resumeEvent := eventMessage{ event: cancel, target: target{ state: nil, //   errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } 

¿Quién necesita esto?


No es para todos. Para comprender exactamente cómo administrar el estado, debe comprender su tarea. Hay exactamente dos circunstancias en las que puede usar una máquina asincrónica basada en eventos:


  1. Ciclo de vida complejo: hay tres o más estados con transiciones no lineales.
  2. Se utiliza la ejecución asincrónica.

Aunque la máquina de eventos resuelve el problema, es un patrón bastante complicado. Por lo tanto, debe usarse con mucho cuidado y solo después de una comprensión completa de todos los pros y los contras.


Referencias


Source: https://habr.com/ru/post/es431048/


All Articles