
在
上一篇文章中,我们研究了什么是溪流以及它们与之一起吃。 在新部分中,我们将熟悉RxJS提供的用于创建流的方法,什么是运算符,管道以及如何使用它们。
系列文章“使用RxJS进行反应式编程的基础知识”:
RxJS具有丰富的
API 。 该文档描述了一百多种方法。 为了稍微了解它们,我们将编写一个简单的应用程序,在实践中,我们将看到反应式代码的外观。 您会发现,如果您从反应性的角度看这些相同的任务,这些任务以前看起来像是例行的工作,需要编写大量代码,但它们是一个不错的解决方案。 但是在开始实践之前,我们将研究如何以图形方式表示流程,并熟悉用于创建和处理流程的便捷方法。
线程的图形表示
为了清楚地说明特定流程的行为,我将使用反应式方法中采用的符号。 回顾上一篇文章中的示例:
const observable = new Observable((observer) => { observer.next(1); observer.next(2); observer.complete(); });
以下是其图形表示形式:

流程通常描绘为一条直线。 如果流发出任何值,则将其在线上显示为圆圈。 显示屏中的一条直线是结束流的信号。 要显示错误,请使用符号-“×”。
const observable = new Observable((observer) => { observer.error(); });

一线流
在我的实践中,我很少需要直接创建自己的Observable实例。 大多数创建线程的方法已经在RxJS中。 要创建一个发出值1和2的流,使用of方法就足够了:
const observable = of(1, 2);
of方法接受任意数量的参数,并返回Observable的完成实例。 订阅后,它将发出接收到的值并完成:

如果要将数组表示为流,则可以使用from方法。 from方法作为参数需要任何可迭代的对象(数组,字符串等)或promise,并将该对象投影到流上。 这是从字符串获得的流的样子:
const observable = from('abc');

因此,您可以在流中包装一个Promise:
const promise = new Promise((resolve, reject) => { resolve(1); }); const observable = from(promise);
注意:经常将线程与promise进行比较。 实际上,它们只有一个共同点-一种
推动变更传播的
策略 。 其余的是完全不同的实体。 Promise不能产生多个值。 它只能执行解决或拒绝操作,即 只有两个状态。 流可以传输多个值,并且可以重用。
您还记得与
第一篇文章间隔时间的示例吗? 此流是一个计时器,用于计算从订阅时刻起的秒数。
const timer = new Observable(observer => { let counter = 0; const intervalId = setInterval(() => { observer.next(counter++); }, 1000); return () => { clearInterval(intervalId); } });
这是您可以在一行中实现相同内容的方法:
const timer = interval(1000);

最后,是一种允许您为DOM元素创建事件流的方法:
const observable = fromEvent(domElementRef, 'keyup');
作为值,此流将接收和发射keyup事件对象。
管道和操作员
Pipe是在5.5版的RxJS中添加的Observable类方法。 多亏了它,我们可以构建运算符链来顺序处理流中接收到的值。 管道是连接操作员的单向通道。 运算符本身是RxJS中描述的常规函数,用于处理流中的值。
例如,他们可以转换该值并将其进一步传递到流,或者它们可以充当过滤器,并且如果它们不满足指定条件,则不会跳过任何值。
让我们看看实际的运算符。 使用map运算符将流中的每个值乘以2:
of(1,2,3).pipe( map(value => value * 2) ).subscribe({ next: console.log });
这是应用地图运算符之前流的外观:

在map语句之后:

让我们使用过滤器运算符。 该语句的工作原理与Array类中的filter函数类似。 该方法将函数作为第一个参数,该参数描述一个条件。 如果流中的值满足条件,则将其传递:
of(1, 2, 3).pipe(
这是我们流的整个方案的外观:

过滤后:

后图:
注意:管道!==订阅。 管道方法声明流行为,但不订阅。 在调用subscribe方法之前,流将无法开始工作。
我们正在编写一个应用程序
既然我们已经弄清楚了什么是管道和操作员,那么您就可以开始练习了。 我们的应用程序将执行一个简单的任务:根据所有者的昵称显示打开的github存储库列表。
将有一些要求:
- 如果输入的字符串少于3个字符,则不要执行API请求;
- 为了不满足用户输入的每个字符的请求,应在访问API之前将反跳延迟设置为700毫秒。
要搜索存储库,我们将使用
github API 。 我建议自己在
stackblitz上运行示例。 在那里,我列出了完成的实现。 文章末尾提供了链接。
让我们从html标记开始。 让我们描述输入和ul元素:
<input type="text"> <ul></ul>
然后,在js或ts文件中,我们使用浏览器API获取到当前元素的链接:
const input = document.querySelector('input'); const ul = document.querySelector('ul');
我们还需要一种将执行对github API的请求的方法。 以下是getUsersRepsFromAPI函数的代码,该函数接受用户的昵称并使用访存执行ajax请求。 然后返回一个promise,将成功的响应转换为json:
const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return fetch(url) .then(response => { if(response.ok) { return response.json(); } throw new Error(''); }); }
接下来,我们编写一个方法,该方法将列出存储库的名称:
const recordRepsToList = (reps) => { for (let i = 0; i < reps.length; i++) {
准备工作已经完成。 现在该看看实际使用的RxJS。 我们需要监听输入的keyup事件。 首先,我们必须了解,以被动方式,我们要与流程一起工作。 幸运的是,RxJS已经提供了类似的选项。 记住我上面提到的fromEvent方法。 我们使用它:
const keyUp = fromEvent(input, 'keyup'); keyUp.subscribe({ next: console.log });
现在,我们的事件以流的形式呈现。 如果我们查看控制台中显示的内容,我们将看到一个KeyboardEvent类型的对象。 但是我们需要一个用户输入的值。 这是管道方法和地图运算符派上用场的地方:
fromEvent(input, 'keyup').pipe( map(event => event.target.value) ).subscribe({ next: console.log });
我们着手执行要求。 首先,当输入的值包含两个以上字符时,我们将执行查询。 为此,请使用过滤器运算符:
fromEvent(input, 'keyup').pipe( map(event => event.target.value), filter(value => value.length > 2) )
我们处理了第一个要求。 我们继续第二个。 我们需要实施去抖动。 RxJS有一个debounceTime语句。 该运算符作为第一个参数采用毫秒数,在该毫秒数内,值将在传递之前保持不变。 在这种情况下,每个新值都会重置计时器。 因此,在输出处我们得到了最后一个值,此后经过了700毫秒。
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(value => value.length > 2) )
没有debounceTime的情况下,我们的流可能是这样的:

这就是通过此语句传递的相同流的样子:

使用debounceTime,我们将不太可能使用该API,这将节省流量并减轻服务器的负担。
为了进行其他优化,我建议使用另一个运算符-distingutilChanged。 这种方法可以避免重复。 最好使用一个示例来展示其工作:
from('aaabccc').pipe( distinctUntilChanged() )
如果没有distinctUntilChanged:

带有distinctUntilChanged:

在debounceTime语句之后立即添加此语句。 因此,如果由于某种原因新值与前一个值一致,我们将不会访问API。 当用户输入新字符然后再次擦除它们时,可能会发生类似情况。 由于我们实施了延迟,因此只有最后一个值会落入流中,这是我们已经拥有的答案。
转到服务器
现在我们已经可以描述请求的逻辑和响应的处理。 虽然我们只能与承诺合作。 因此,我们描述了另一个将调用getUsersRepsFromAPI方法的地图运算符。 在观察者中,我们描述了承诺的处理逻辑:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), map(value => getUsersRepsFromAPI(value)) ).subscribe({ next: promise => promise.then(reps => recordRepsToList(reps)) });
目前,我们已经实现了我们想要的一切。 但是我们的示例有一个很大的缺点:没有错误处理。 我们的观察者只得到一个诺言,却不知道会出问题。
当然,我们可以在下一个方法中实现承诺,但是由于这个原因,我们的代码将越来越像“回调地狱”。 如果突然我们需要再执行一个请求,那么代码的复杂性将会增加。
注意:在RxJS代码中使用promise被认为是反模式。 与可观察的相比,承诺有许多缺点。 它无法撤消,也不能重复使用。 如果您有选择,请选择可观察的。 Observable类的toPromise方法也是如此。 实现此方法是为了与无法与流一起使用的库兼容。
我们可以使用from方法将promise投影到流上,但是此方法充满了对subscription方法的其他调用,并且还会导致代码的增长和复杂性。
可以使用mergeMap运算符解决此问题:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
现在我们不需要编写承诺处理逻辑。 from方法生成一个承诺流,并由mergeMap运算符对其进行处理。 如果成功实现了诺言,则调用下一个方法,观察者将收到完成的对象。 如果发生错误,将调用error方法,并且我们的观察者将在控制台中输出错误。
mergeMap运算符与我们之前使用的运算符略有不同;它属于所谓的
高阶Observables ,我将在下一篇文章中进行讨论。 但是,展望未来,我会说mergeMap方法本身订阅了流。
错误处理
如果我们的线程收到错误,则它将终止。 而且,如果我们在发生错误后尝试与应用程序进行交互,则由于线程已完成,因此我们将不会得到任何反应。
在这里catchError运算符将为我们提供帮助。 仅当流中发生错误时才引发catchError。 它允许您拦截它,对其进行处理并返回流中的常规值,这不会导致其完成。
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))), catchError(err => of([])) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
我们在catchError中捕获错误,而是返回带有空数组的流。 现在,当发生错误时,我们将清除存储库列表。 但是随后流程再次结束。
问题是catchError用新的替换了我们原来的流。 然后我们的观察者只听他的话。 当of流发出一个空数组时,将调用complete方法。
为了不替换原始线程,我们从mergeMap运算符内部的from线程调用catchError运算符。
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
因此,我们的原始流将不会注意到任何东西。 而不是错误,它将获得一个空数组。
结论
我们终于开始练习,并了解管道和操作员的用途。 我们研究了如何使用RxJS提供的丰富API减少代码。 当然,我们的应用程序尚未完成,在下一部分中,我们将分析如何在一个线程中处理另一个线程,以及如何取消我们的http请求,以节省应用程序的更多流量和资源。 为了让您看到其中的区别,我列出了一个不使用RxJS的示例,您可以
在此处看到它。 在
此链接中,您将找到当前应用程序的完整代码。 为了生成电路,我使用了
RxJS可视化工具 。
希望本文能帮助您更好地了解RxJS的工作原理。 祝您学习顺利!