使用RxJS进行反应式编程的基础

第1部分。反应性和流量


本系列文章重点介绍使用RxJS这样出色的库在JS中的反应性及其应用。

系列文章“使用RxJS进行反应式编程的基础知识”:



本文的目标读者 :基本上,在这里我将解释基础知识,因此本文主要针对该技术的初学者。 同时,我希望有经验的开发人员能够为自己学习一些新知识。 理解将需要js(es5 / es6)的知识。

动机 :当我开始使用angular时,我第一次遇到RxJS。 那时我很难理解反应机制。 在我开始工作之初,大多数文章都专门针对该库的旧版本,这增加了难度。 为了至少了解一些内容,我不得不阅读很多文档和各种手册。 仅仅过了一段时间,我才开始意识到“一切都安排好了”。 为了使他人生活更轻松,我决定将所有东西都放在架子上。

什么是反应性?


很难找到一个看似普通术语的答案。 简而言之:反应性是对任何变化做出反应的能力。 但是,我们正在谈论什么变化? 首先,关于数据更改。 考虑一个例子:

let a = 2; let b = 3; let sum = a + b; console.log(sum); // 5 a = 3; console.log(sum); // 5 -    

此示例演示了熟悉的命令式编程范例。 与命令式方法不同,被动式方法基于推送更改传播策略。 推送策略意味着在数据更改的情况下,这些相同的更改将被“推送”,并且依赖于它们的数据将被自动更新。 这是我们的示例在应用推送策略时的行为:

 let a = 2; let b = 3; let sum = a + b; console.log(sum); // 5 a = 3; console.log(sum); // 6 -   sum   

此示例显示了一种被动方法。 值得注意的是,该示例与现实无关;我给出它仅仅是为了说明方法上的差异。 现实应用程序中的反应性代码看起来会大不相同,在继续练习之前,我们应该讨论反应性的另一个重要组成部分。

数据流


如果您在Wikipedia上看到“响应式编程”一词,该站点将为我们提供以下定义:“响应式编程是一种专注于数据流和变更传播的编程范例。” 根据这个定义,我们可以得出结论,反应性是基于两个主要的“鲸鱼”。 我在上面提到了变更的分布,因此我们将不再赘述。 但是我们应该更多地讨论数据流。 让我们看下面的例子:

 const input = document.querySelector('input'); //     const eventsArray = []; input.addEventListener('keyup', event => eventsArray.push(event) ); //      eventsArray 

我们监听keyup事件,并将事件对象放入数组中。 随着时间的流逝,我们的数组可以包含数千个KeyboardEvent对象。 值得注意的是,我们的数组是按时间排序的-较晚事件的索引大于较早事件的索引。 这样的数组是简化的数据流模型。 为什么要简化? 因为数组只能存储数据。 我们还可以迭代数组并以某种方式处理其元素。 但是数组无法告诉我们已向其中添加了新元素。 为了找出是否有新数据添加到数组中,我们将不得不再次对其进行迭代。

但是,如果我们的阵列知道如何通知我们有新数据到达该怎么办? 这样的数组可以肯定地称为流。 因此,我们来定义流程。 流是按时间排序的数据数组,可以指示数据已更改。

可观察的


现在我们知道了什么是流量,让我们与它们一起工作。 在RxJS中,流由Observable类表示。 要创建自己的流,只需调用此类的构造函数并将订阅函数作为参数传递给它:

 const observable = new Observable(observer => { observer.next(1); observer.next(2); observer.complete(); }) 

通过调用Observable类的构造函数,我们创建了一个新线程。 作为参数,我们将订阅函数传递给了构造函数。 订阅函数是一个常规函数,将观察者作为参数。 观察者本人是具有三种方法的对象:

  • next-向流中添加新值
  • 错误-向流中抛出错误,此后流结束
  • 完成-终止线程

因此,我们创建了一个发出两个值并终止的线程。

订阅方式


如果我们运行前面的代码,将不会发生任何事情。 我们将只创建一个新的流,并将链接保存到可观察变量中,但是流本身永远不会发出单个值。 这是因为线程是“惰性”对象,它们自己什么也不做。 为了使我们的流开始发出值并可以处理这些值,我们需要开始“监听”该流。 您可以通过在可观察对象上调用subscription方法来实现。

 const observer = { next: value => console.log(value), // 1, 2 error: error => console.error(error), // complete: () => console.log("completed") // completed }; observable.subscribe(observer); 

我们确定了观察者并为他描述了三种方法:下一个,错误,完成。 方法仅记录作为参数传递的数据。 然后,我们调用subscription方法并将观察者传递给它。 在调用subscribe的那一刻,subscribe函数被调用,这是我们在声明流的阶段传递给构造函数的函数。 接下来,将执行订阅函数的代码,该函数将两个值传递给我们的观察者,然后终止流。

当然,许多人有一个问题,如果我们再次订阅该流,将会发生什么? 一切都将相同:流将再次将两个值传递给观察者并结束。 每次调用subscribe方法时,将调用一个预订函数,并且其所有代码将再次执行。 由此我们可以得出结论:无论我们订阅流多少次,观察者都将收到相同的数据。

退订


现在让我们尝试实现一个更复杂的示例。 我们将编写一个计时器,从订阅的那一刻开始计时,并将其发送给观察者。

 const timer = new Observable(observer => { let counter = 0; //  setInterval(() => { observer.next(counter++); //          }, 1000); }); timer.subscribe({ next: console.log //    }); 

代码很简单。 在订阅函数内部,我们声明一个计数器变量。 然后,使用闭包,我们从setInterval中的arrow函数访问变量。 然后,我们每秒将变量传递给观察者,然后将其递增。 接下来,订阅流,仅指定一种方法-下一步。 不用担心我们还没有宣布其他方法。 不需要任何观察者方法。 我们甚至可以传递一个空对象,但是在这种情况下,该线程将被浪费。

启动后,我们将看到令人垂涎的日志,这些日志将每秒显示一次。 如果需要,您可以尝试并订阅该流几次。 您将看到每个线程将独立于其他线程执行。

如果您考虑一下,我们的线程将在整个应用程序的整个生命周期中执行,因为我们没有取消setInterval的逻辑,并且在预订函数中,没有对complete方法的调用。 但是,如果我们需要线程结束怎么办?

实际上,一切都很简单。 如果查看文档,则可以看到subscription方法返回一个预订对象。 该对象具有取消订阅的方法。 我们调用它,观察者将停止从流中接收值。

 const subscription = timer.subscribe({next: console.log}); setTimeout(() => subscription.unsubscribe(), 5000); //   5  

启动后,我们将看到计数器在4处停止。但是,尽管我们已经取消订阅流,但是setInterval函数仍继续工作。 她每秒增加一次计数器,然后将其传递给虚拟观察者。 为防止这种情况发生,您必须编写取消间隔的逻辑。 为此,您需要从预订函数中返回一个新函数,在该函数中将实现取消逻辑。

 const timer = new Observable(observer => { let counter = 0; const intervalId = setInterval(() => { observer.next(counter++); }, 1000); return () => { clearInterval(intervalId); } }); 

现在我们可以松一口气了。 调用unsubscribe方法后,将调用我们的unsubscribe函数,这将清除间隔。

结论


本文展示了命令式和响应式方法之间的差异,以及创建自己的流程的示例。 在下一部分中,我将讨论存在哪些其他创建线程的方法以及如何应用它们。

Source: https://habr.com/ru/post/zh-CN438642/


All Articles