事件机器守护生命周期

免责声明:本文介绍了非显而易见的问题的非显而易见的解决方案。 赶之前 鸡蛋 付诸实践,我建议读完文章并三思而后行。

但是


大家好! 在处理代码时,我们常常不得不处理state 。 一种这样的情况是对象的生命周期。 管理具有几种可能状态的对象可能是一项非常艰巨的任务。 在此处添加异步执行,任务将复杂一个数量级。 有一个有效而自然的解决方案。 在本文中,我将讨论事件机器以及如何在Go中实现它。


为什么要管理国家?


首先,让我们定义概念本身。 状态的最简单示例:文件和各种连接。 您不仅可以读取文件。 它必须首先打开,最后 最好 确保关闭。 事实证明,当前动作取决于上一个动作的结果:读取取决于开头。 保存的结果是状态。


状态的主要问题是复杂性。 任何状态都会自动使代码复杂化。 您必须将操作结果存储在内存中,并向逻辑添加各种检查。 这就是为什么无状态架构对程序员如此吸引人的原因-没有人想要 麻烦 困难。 如果操作结果不影响执行逻辑,则不需要状态。


但是,有一个属性使您无法解决困难。 状态要求您遵循特定的操作顺序。 通常,应避免这种情况,但这并不总是可能的。 一个示例是程序对象的生命周期。 由于具有良好的状态管理,因此可以获得具有复杂生命周期的对象的可预测行为。


现在让我们弄清楚如何做到这一点。


自动解决问题


AK74


当人们谈论状态时,立即想到有限状态机。 这是合乎逻辑的,因为自动机是管理状态的最自然的方法。


我不会深入研究自动机理论 ; Internet上有足够的信息。

如果您寻找Go的有限状态机示例,那么您肯定会遇到Rob Pike词法分析器 。 自动机的一个很好的例子,其中处理的数据是输入字母。 这意味着状态转换是由词法分析器处理的文本引起的。 优雅地解决特定问题。


要了解的主要内容是,自动机是对严格特定问题的解决方案。 因此,在考虑将其作为所有问题的补救措施之前,您必须完全理解该任务。 具体来说,您要控制的实体:


  • 状态-生命周期;
  • 事件-到底是什么导致了向每个状态的转变?
  • 工作结果-输出数据;
  • 执行模式(同步/异步);
  • 主要用例。

词法分析器很漂亮,但是它仅由于其自身处理的数据而改变状态。 但是,当用户调用转换时,情况又如何呢? 这是事件机器可以提供帮助的地方。


真实的例子


为了更加清楚,我将分析phono库中的示例。


要完全沉浸在上下文中,您可以阅读介绍性文章 。 对于本主题,这不是必需的,但它将有助于更好地了解我们正在管理的内容。

我们要管理什么?


phono基于DSP管线。 它包括三个处理阶段。 每个阶段可能包括一个到几个组件:


pipe_diagram


  1. pipe.Pump (英文泵)是接收声音的必不可少的阶段,总是只有一个组成部分。
  2. pipe.Processor (英语处理程序)-声音处理的可选阶段,从0到N个组件。
  3. pipe.Sink (英文水槽)-声音传输的强制阶段,从1到N个分量。

实际上,我们将管理输送机的生命周期。


生命周期


这就是pipe.Pipe状态图的样子。


pipe_lifecycle


斜体表示由内部执行逻辑引起的转换。 粗体 -事件引起的过渡。 该图显示了状态分为两种类型:


  • 静态 - ready和已paused ,您只能按事件从它们中跳转
  • 活动状态 - runningpausing ,事件转换以及执行逻辑

在对代码进行详细分析之前,先简单说明一下所有状态的用法:


 // PlayWav  .wav    portaudio  -. func PlayWav(wavFile string) error { bufferSize := phono.BufferSize(512) //      w, err := wav.NewPump(wavFile, bufferSize) //  wav pump if err != nil { return err } pa := portaudio.NewSink( //  portaudio sink bufferSize, w.WavSampleRate(), w.WavNumChannels(), ) p := pipe.New( //  pipe.Pipe    ready w.WavSampleRate(), pipe.WithPump(w), pipe.WithSinks(pa), ) p.Run() //    running   p.Run() errc := p.Pause() //    pausing   p.Pause() err = pipe.Wait(errc) //     paused if err != nil { return err } errc = p.Resume() //    running   p.Resume() err = pipe.Wait(errc) //     ready if err != nil { return err } return pipe.Wait(p.Close()) //      } 

现在,首先是第一件事。


所有源代码都可以在资源库中找到

状态和事件


让我们从最重要的事情开始。


 // state      . type state interface { listen(*Pipe, target) (state, target) //    transition(*Pipe, eventMessage) (state, error) //   } // idleState  .        . type idleState interface { state } // activeState  .         //   . type activeState interface { state sendMessage(*Pipe) state //    } //  . type ( idleReady struct{} activeRunning struct{} activePausing struct{} idlePaused struct{} ) //  . var ( ready idleReady running activeRunning paused idlePaused pausing activePausing ) 

由于使用了不同的类型,还可以为每个状态分别声明转换。 这样可以避免巨大的 香肠 具有嵌套switch转换函数。 状态本身不包含任何数据或逻辑。 对于他们,您可以在包级别声明变量,这样就不必每次都这样做。 多态性需要state接口。 activeState idleState讨论activeStateidleState


我们机器的第二个最重要的部分是事件。


 // event  . type event int //  . const ( run event = iota pause resume push measure cancel ) // target      . type target struct { state idleState //   errc chan error //   ,     } // eventMessage   ,    . type eventMessage struct { event //   params params //   components []string // id  target //      } 

要了解为什么需要target类型,请考虑一个简单的示例。 我们创建了一个新的传送带,它已经ready 。 现在使用p.Run()运行它。 run事件发送到计算机,管道进入running状态。 如何找出输送机何时完成? 这是target类型将为我们提供帮助的地方。 它指示事件后期望的休息状态。 在我们的示例中,工作完成后,管道将再次进入ready状态。 图中的相同内容:



现在更多关于状态类型。 更确切地说,关于idleStateactiveState 。 让我们看一下不同类型的阶段的listen(*Pipe, target) (state, target)功能:


 // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } 

pipe.Pipe具有不同的功能,以等待转换! 那里有什么?


 // idle     .    . func (p *Pipe) idle(s idleState, t target) (state, target) { if s == t.state || s == ready { t = t.dismiss() //  ,  target } for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { e.target.handle(err) } else if e.hasTarget() { t.dismiss() t = e.target } } if s != newState { return newState, t // ,    } } } // active     .     , //   . func (p *Pipe) active(s activeState, t target) (state, target) { for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { //  ? e.target.handle(err) // ,    } else if e.hasTarget() { // ,  target t.dismiss() //   t = e.target //   } case <-p.provide: //     newState = s.sendMessage(p) //    case err, ok := <-p.errc: //   if ok { //   ,  interrupt(p.cancel) //   t.handle(err) //    } //    ,  return ready, t //    ready } if s != newState { return newState, t // ,    } } } 

因此,我们可以收听处于不同状态的不同频道。 例如,这允许您在暂停期间不发送消息-我们只是不收听相应的频道。


机器的构造器和启动



 // New      . //      ready. func New(sampleRate phono.SampleRate, options ...Option) *Pipe { p := &Pipe{ UID: phono.NewUID(), sampleRate: sampleRate, log: log.GetLogger(), processors: make([]*processRunner, 0), sinks: make([]*sinkRunner, 0), metrics: make(map[string]measurable), params: make(map[string][]phono.ParamFunc), feedback: make(map[string][]phono.ParamFunc), events: make(chan eventMessage, 1), //    cancel: make(chan struct{}), //     provide: make(chan struct{}), consume: make(chan message), } for _, option := range options { //   option(p)() } go p.loop() //    return p } 

除了初始化和功能选项之外 ,还有一个单独的goroutine与主循环一起开始。 好吧,看着他:


 // loop ,     nil . func (p *Pipe) loop() { var s state = ready //   t := target{} for s != nil { s, t = s.listen(p, t) //      p.log.Debug(fmt.Sprintf("%v is %T", p, s)) } t.dismiss() close(p.events) //    } // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idleReady) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) return nil, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case run: if err := p.start(); err != nil { return s, err } return running, nil } return s, ErrInvalidState } 

传送带是在发生事件时冻结的。


上班时间


致电p.Run()



 // Run   run  . //     pipe.Close  . func (p *Pipe) Run() chan error { runEvent := eventMessage{ event: run, target: target{ state: ready, //    errc: make(chan error, 1), }, } p.events <- runEvent return runEvent.target.errc } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activeRunning) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case pause: return pausing, nil } return s, ErrInvalidState } // sendMessage   . func (s activeRunning) sendMessage(p *Pipe) state { p.consume <- p.newMessage() return s } 

running生成消息并一直运行,直到管道完成。


暂停


在执行传送带期间,我们可以暂停它。 在这种状态下,管道将不会生成新消息。 为此,请调用p.Pause()方法。



 // Pause   pause  . //     pipe.Close  . func (p *Pipe) Pause() chan error { pauseEvent := eventMessage{ event: pause, target: target{ state: paused, //    errc: make(chan error, 1), }, } p.events <- pauseEvent return pauseEvent.target.errc } // listen     pausing. func (s activePausing) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activePausing) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil } return s, ErrInvalidState } // sendMessage   .   -, //      .    //    ,      .  , // ,   , : // 1.     // 2.      func (s activePausing) sendMessage(p *Pipe) state { m := p.newMessage() if len(m.feedback) == 0 { m.feedback = make(map[string][]phono.ParamFunc) } var wg sync.WaitGroup //     wg.Add(len(p.sinks)) //   Sink for _, sink := range p.sinks { param := phono.ReceivedBy(&wg, sink.ID()) // - m.feedback = m.feedback.add(param) } p.consume <- m //   wg.Wait() // ,     return paused } 

一旦所有收件人都收到邮件,管道将paused状态。 如果消息是最后一条消息,则将发生到ready状态的转换。


回去上班!


要退出paused状态,请调用p.Resume()



 // Resume   resume  . //     pipe.Close  . func (p *Pipe) Resume() chan error { resumeEvent := eventMessage{ event: resume, target: target{ state: ready, errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } // listen     paused. func (s idlePaused) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idlePaused) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case resume: return running, nil } return s, ErrInvalidState } 

这里的一切微不足道,管道再次进入running状态。


卷起


传送带可以从任何状态停止。 有p.Close()



 // Close   cancel  . //      . //    ,   . func (p *Pipe) Close() chan error { resumeEvent := eventMessage{ event: cancel, target: target{ state: nil, //   errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } 

谁需要这个?


不适合所有人。 要确切了解如何管理状态,您需要了解您的任务。 可以在两种情况下使用基于事件的异步计算机:


  1. 复杂的生命周期-存在具有非线性转换的三个或更多状态。
  2. 使用异步执行。

尽管事件机解决了该问题,但这是一个相当复杂的模式。 因此,只有在全面了解所有优缺点之后,才应格外小心地使用它。


参考文献


Source: https://habr.com/ru/post/zh-CN431048/


All Articles