使用RxJS进行反应式编程的基础。 第2部分。操作员和管道



上一篇文章中,我们研究了什么是溪流以及它们与之一起吃。 在新部分中,我们将熟悉RxJS提供的用于创建流的方法,什么是运算符,管道以及如何使用它们。

系列文章“使用RxJS进行反应式编程的基础知识”:



RxJS具有丰富的API 。 该文档描述了一百多种方法。 为了稍微了解它们,我们将编写一个简单的应用程序,在实践中,我们将看到反应式代码的外观。 您会发现,如果您从反应性的角度看这些相同的任务,这些任务以前看起来像是例行的工作,需要编写大量代码,但它们是一个不错的解决方案。 但是在开始实践之前,我们将研究如何以图形方式表示流程,并熟悉用于创建和处理流程的便捷方法。

线程的图形表示


为了清楚地说明特定流程的行为,我将使用反应式方法中采用的符号。 回顾上一篇文章中的示例:

const observable = new Observable((observer) => { observer.next(1); observer.next(2); observer.complete(); }); 

以下是其图形表示形式:



流程通常描绘为一条直线。 如果流发出任何值,则将其在线上显示为圆圈。 显示屏中的一条直线是结束流的信号。 要显示错误,请使用符号-“×”。

 const observable = new Observable((observer) => { observer.error(); }); 



一线流


在我的实践中,我很少需要直接创建自己的Observable实例。 大多数创建线程的方法已经在RxJS中。 要创建一个发出值1和2的流,使用of方法就足够了:

 const observable = of(1, 2); 

of方法接受任意数量的参数,并返回Observable的完成实例。 订阅后,它将发出接收到的值并完成:



如果要将数组表示为流,则可以使用from方法。 from方法作为参数需要任何可迭代的对象(数组,字符串等)或promise,并将该对象投影到流上。 这是从字符串获得的流的样子:

 const observable = from('abc'); 



因此,您可以在流中包装一个Promise:

 const promise = new Promise((resolve, reject) => { resolve(1); }); const observable = from(promise); 



注意:经常将线程与promise进行比较。 实际上,它们只有一个共同点-一种推动变更传播的策略 。 其余的是完全不同的实体。 Promise不能产生多个值。 它只能执行解决或拒绝操作,即 只有两个状态。 流可以传输多个值,并且可以重用。

您还记得与第一篇文章间隔时间的示例吗? 此流是一个计时器,用于计算从订阅时刻起的秒数。

 const timer = new Observable(observer => { let counter = 0; const intervalId = setInterval(() => { observer.next(counter++); }, 1000); return () => { clearInterval(intervalId); } }); 

这是您可以在一行中实现相同内容的方法:

 const timer = interval(1000); 



最后,是一种允许您为DOM元素创建事件流的方法:

 const observable = fromEvent(domElementRef, 'keyup'); 

作为值,此流将接收和发射keyup事件对象。

管道和操作员


Pipe是在5.5版的RxJS中添加的Observable类方法。 多亏了它,我们可以构建运算符链来顺序处理流中接收到的值。 管道是连接操作员的单向通道。 运算符本身是RxJS中描述的常规函数​​,用于处理流中的值。

例如,他们可以转换该值并将其进一步传递到流,或者它们可以充当过滤器,并且如果它们不满足指定条件,则不会跳过任何值。

让我们看看实际的运算符。 使用map运算符将流中的每个值乘以2:

 of(1,2,3).pipe( map(value => value * 2) ).subscribe({ next: console.log }); 

这是应用地图运算符之前流的外观:



在map语句之后:



让我们使用过滤器运算符。 该语句的工作原理与Array类中的filter函数类似。 该方法将函数作为第一个参数,该参数描述一个条件。 如果流中的值满足条件,则将其传递:

 of(1, 2, 3).pipe( //     filter(value => value % 2 !== 0), map(value = value * 2) ).subscribe({ next: console.log }); 

这是我们流的整个方案的外观:



过滤后:



后图:



注意:管道!==订阅。 管道方法声明流行为,但不订阅。 在调用subscribe方法之前,流将无法开始工作。

我们正在编写一个应用程序


既然我们已经弄清楚了什么是管道和操作员,那么您就可以开始练习了。 我们的应用程序将执行一个简单的任务:根据所有者的昵称显示打开的github存储库列表。

将有一些要求:

  • 如果输入的字符串少于3个字符,则不要执行API请求;
  • 为了不满足用户输入的每个字符的请求,应在访问API之前将反跳延迟设置为700毫秒。

要搜索存储库,我们将使用github API 。 我建议自己在stackblitz上运行示例。 在那里,我列出了完成的实现。 文章末尾提供了链接。

让我们从html标记开始。 让我们描述输入和ul元素:

 <input type="text"> <ul></ul> 

然后,在js或ts文件中,我们使用浏览器API获取到当前元素的链接:

 const input = document.querySelector('input'); const ul = document.querySelector('ul'); 

我们还需要一种将执行对github API的请求的方法。 以下是getUsersRepsFromAPI函数的代码,该函数接受用户的昵称并使用访存执行ajax请求。 然后返回一个promise,将成功的响应转换为json:

 const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return fetch(url) .then(response => { if(response.ok) { return response.json(); } throw new Error(''); }); } 

接下来,我们编写一个方法,该方法将列出存储库的名称:

 const recordRepsToList = (reps) => { for (let i = 0; i < reps.length; i++) { //    ,    if (!ul.children[i]) { const newEl = document.createElement('li'); ul.appendChild(newEl); } //      const li = ul.children[i]; li.innerHTML = reps[i].name; } //    while (ul.children.length > reps.length) { ul.removeChild(ul.lastChild); } } 

准备工作已经完成。 现在该看看实际使用的RxJS。 我们需要监听输入的keyup事件。 首先,我们必须了解,以被动方式,我们要与流程一起工作。 幸运的是,RxJS已经提供了类似的选项。 记住我上面提到的fromEvent方法。 我们使用它:

 const keyUp = fromEvent(input, 'keyup'); keyUp.subscribe({ next: console.log }); 

现在,我们的事件以流的形式呈现。 如果我们查看控制台中显示的内容,我们将看到一个KeyboardEvent类型的对象。 但是我们需要一个用户输入的值。 这是管道方法和地图运算符派上用场的地方:

 fromEvent(input, 'keyup').pipe( map(event => event.target.value) ).subscribe({ next: console.log }); 

我们着手执行要求。 首先,当输入的值包含两个以上字符时,我们将执行查询。 为此,请使用过滤器运算符:

 fromEvent(input, 'keyup').pipe( map(event => event.target.value), filter(value => value.length > 2) ) 

我们处理了第一个要求。 我们继续第二个。 我们需要实施去抖动。 RxJS有一个debounceTime语句。 该运算符作为第一个参数采用毫秒数,在该毫秒数内,值将在传递之前保持不变。 在这种情况下,每个新值都会重置计时器。 因此,在输出处我们得到了最后一个值,此后经过了700毫秒。

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(value => value.length > 2) ) 

没有debounceTime的情况下,我们的流可能是这样的:



这就是通过此语句传递的相同流的样子:



使用debounceTime,我们将不太可能使用该API,这将节省流量并减轻服务器的负担。

为了进行其他优化,我建议使用另一个运算符-distingutilChanged。 这种方法可以避免重复。 最好使用一个示例来展示其工作:

 from('aaabccc').pipe( distinctUntilChanged() ) 

如果没有distinctUntilChanged:



带有distinctUntilChanged:



在debounceTime语句之后立即添加此语句。 因此,如果由于某种原因新值与前一个值一致,我们将不会访问API。 当用户输入新字符然后再次擦除它们时,可能会发生类似情况。 由于我们实施了延迟,因此只有最后一个值会落入流中,这是我们已经拥有的答案。

转到服务器


现在我们已经可以描述请求的逻辑和响应的处理。 虽然我们只能与承诺合作。 因此,我们描述了另一个将调用getUsersRepsFromAPI方法的地图运算符。 在观察者中,我们描述了承诺的处理逻辑:

 /*  !     RxJS    promise,      */ fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), map(value => getUsersRepsFromAPI(value)) ).subscribe({ next: promise => promise.then(reps => recordRepsToList(reps)) }); 

目前,我们已经实现了我们想要的一切。 但是我们的示例有一个很大的缺点:没有错误处理。 我们的观察者只得到一个诺言,却不知道会出问题。

当然,我们可以在下一个方法中实现承诺,但是由于这个原因,我们的代码将越来越像“回调地狱”。 如果突然我们需要再执行一个请求,那么代码的复杂性将会增加。

注意:在RxJS代码中使用promise被认为是反模式。 与可观察的相比,承诺有许多缺点。 它无法撤消,也不能重复使用。 如果您有选择,请选择可观察的。 Observable类的toPromise方法也是如此。 实现此方法是为了与无法与流一起使用的库兼容。

我们可以使用from方法将promise投影到流上,但是此方法充满了对subscription方法的其他调用,并且还会导致代码的增长和复杂性。

可以使用mergeMap运算符解决此问题:

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log }) 

现在我们不需要编写承诺处理逻辑。 from方法生成一个承诺流,并由mergeMap运算符对其进行处理。 如果成功实现了诺言,则调用下一个方法,观察者将收到完成的对象。 如果发生错误,将调用error方法,并且我们的观察者将在控制台中输出错误。

mergeMap运算符与我们之前使用的运算符略有不同;它属于所谓的高阶Observables ,我将在下一篇文章中进行讨论。 但是,展望未来,我会说mergeMap方法本身订阅了流。

错误处理


如果我们的线程收到错误,则它将终止。 而且,如果我们在发生错误后尝试与应用程序进行交互,则由于线程已完成,因此我们将不会得到任何反应。

在这里catchError运算符将为我们提供帮助。 仅当流中发生错误时才引发catchError。 它允许您拦截它,对其进行处理并返回流中的常规值,这不会导致其完成。

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))), catchError(err => of([])) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log }) 

我们在catchError中捕获错误,而是返回带有空数组的流。 现在,当发生错误时,我们将清除存储库列表。 但是随后流程再次结束。

问题是catchError用新的替换了我们原来的流。 然后我们的观察者只听他的话。 当of流发出一个空数组时,将调用complete方法。

为了不替换原始线程,我们从mergeMap运算符内部的from线程调用catchError运算符。

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log }) 

因此,我们的原始流将不会注意到任何东西。 而不是错误,它将获得一个空数组。

结论


我们终于开始练习,并了解管道和操作员的用途。 我们研究了如何使用RxJS提供的丰富API减少代码。 当然,我们的应用程序尚未完成,在下一部分中,我们将分析如何在一个线程中处理另一个线程,以及如何取消我们的http请求,以节省应用程序的更多流量和资源。 为了让您看到其中的区别,我列出了一个不使用RxJS的示例,您可以在此处看到它。 在此链接中,您将找到当前应用程序的完整代码。 为了生成电路,我使用了RxJS可视化工具

希望本文能帮助您更好地了解RxJS的工作原理。 祝您学习顺利!

Source: https://habr.com/ru/post/zh-CN444290/


All Articles