使用RxJS进行反应式编程的基础。 第3部分。高阶可观察物



在本文中,我们将研究如何在一个线程中处理另一个线程,为什么需要它,以及高阶可观察变量(以下称HOO)运算符将如何帮助我们。

系列文章“使用RxJS进行反应式编程的基础知识”:



在使用线程时,通常会出现一种情况,即有必要将另一个的结果作为值传输到线程。 例如,我们想要执行一个ajax请求并在当前线程中处理其响应,或者运行多个并行请求,以实现池化。 我认为许多人已经习惯使用诸如Promise之类的机制来解决此类问题。 但是可以使用RxJS解决它们吗? 当然,一切都比您想像的容易得多!

注意 :为了理解本文的理论部分,您无需阅读以前的文章,只需知道什么是可观察的,运算符和管道即可。 在实际部分中,我们将从第二篇文章中完善示例,您可以在此处找到。

问题


让我们想象一下以下任务:我们需要每秒找出服务器是否可访问。 我们该如何解决呢?

首先,使用timer方法创建流:

timer(0, 1000).subscribe({ next: console.log }); 

计时器方法在原理上与间隔非常相似。 但是与它不同的是,它允许您设置线程启动超时,该超时由第一个参数发送。 第二个参数指示生成新值的时间间隔。 如果未指定第二个参数,计时器将仅生成一个值并终止流。

由于您和我没有服务器,因此建议只编写一个模拟对服务器请求的函数:

 const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) } 

这种方法有什么作用? 它返回使用timer方法创建的流,该流在经过一秒钟并终止后发出一个值。 由于timer方法仅生成一个数字,因此我们使用mapTo运算符将其替换为字符串“ success”。

这是由makeRequest方法创建的流的样子:



现在我们可以选择:在流内部调用makeRequest方法还是将此职责分配给观察者?

第一种方法是可取的,因为在这种情况下,我们将能够充分利用RxJS的运算符的潜力,并减轻观察者不必要的负担。 我们使用timer方法按时间间隔执行请求:

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log }); 

当我们运行这样的代码时,我们将在console.log中看到,不是一条带有“ success”文本的消息,而是一个Observable类型的对象:



答案是完全可以预期的,因为在地图中我们返回流。 为了使流工作,您需要订阅它。 好吧,让我们看看如何不做

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); }); 

上面示例的问题是我们在订阅中获得了订阅。 但是,如果我们要在一个链中提出多个请求,该怎么办? 或者,如果在某个时候我们需要退订内部流程,该怎么办? 在这种情况下,我们的代码将越来越类似于“面条”。 为了解决此问题,RxJS具有称为HOO的特殊运算符。

HOO


HOO是一种特殊类型的语句,它接受流作为值。 一种此类运算符是mergeAll方法。

当流到达mergeAll时,它对其进行预订。 运营商订阅的流称为内部流。 操作员从中接收其他流作为值的流称为外部。

当内部线程生成一个值时,mergeAll将该值推入外部线程。 因此,我们摆脱了手动订阅的需要。 如果我们取消订阅外部流程,那么mergeAll将自动取消订阅内部流程。

让我们看看如何使用mergeAll重写示例:

 timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log }); 

在上面的示例中,外部流是由timer语句创建的。 并且在地图运算符中创建的流是内部的。 每个创建的线程都属于mergeAll语句。



map + mergeAll组合经常使用,因此在RxJS中有一个mergeMap方法:

 timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log }); 

当外部线程生成一个值时,mergeMap运算符将调用传递给它的回调函数,从而生成一个新线程。 然后mergeMap订阅生成的流。



mergeAll / mergeMap运算符的独特之处在于,如果另一个流归于它,那么它也将对其进行订阅。 因此,在外部流中,我们可以一次从多个内部值中获取值。 让我们看下面的例子:

  timer(0, 1000) 

这是没有mergeMap运算符的外部流的外观:



因此,使用mergeMap:

 timer(0, 1000).pipe( mergeMap(() => interval(1000)) ) 



每秒,我们创建一个新的内部线程,mergeMap对其进行订阅。 因此,我们有许多内部线程同时工作,其值属于外部线程:





注意 :使用mergeMap时要小心,每个新的内部线程都将起作用,直到您取消订阅外部线程为止。 在上面的示例中,内部线程的数量每秒钟都在增长,最后可能有太多的线程,计算机无法应付负载。

concatAll / concatMap


当您不关心内部线程的执行顺序时,mergeMap方法非常有用,但是如果需要的话,该怎么办? 假设我们希望下一个服务器请求仅在收到前一个服务器的响应后才执行?

为此,HOO操作符concatAll / concatMap是合适的。 订阅了内部线程的该运算符将等待其完成,然后才订阅下一个线程。

如果在执行一个线程期间有一个新线程降入该线程,则将其放入队列中,直到前一个线程完成为止。

 // ,  1     const firstInnerObservable = timer(1000).pipe( mapTo(1) ); // ,  2     const secondInnerObservable = timer(500).pipe( mapTo(2) ); of( firstInnerObservable, secondInnerObservable ).pipe( concatAll() ).subscribe({ next: console.log }); 

在上面的示例中,我们使用timer方法创建了两个线程。 为了清楚起见,我使用mapTo运算符显示不同的值。 第一个线程将生成1,第二个线程将生成2。使用of方法创建一个外部线程,该方法将上述两个可观察值作为输入。

concatAll语句首先接收firstInnerObservable,对其进行订阅,然后等待其完成,只有在第一个订阅完成之后,它才会对secondInnerObservable进行订阅。 外部流如下所示:



如果将concatAll替换为mergeAll,则流将如下所示:

 of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log }); 



switchAll / switchMap


该运算符与以前的运算符的不同之处在于,当它接收到新的流时,它将立即从先前的流中取消订阅并订阅新的流。

以上面的示例为例,将concatAll替换为switchAll,然后查看外部流的行为:

 of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log }); 



仅第二个内部流中的值进入外部流。 这是因为switchMap在收到第二个线程时从第一个取消订阅。

什么时候需要? 例如,当执行数据搜索时。 如果来自服务器的响应尚未到达,并且我们已经发送了一个新请求,那么等待上一个请求就没有意义。

排气/排气图


exhaust与switchAll语句完全相反,它的行为类似于concatAll。 订阅流的此方法等待其完成。 如果出现新的流,则将其简单丢弃。

 of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log }); 



在上面的示例中,我们没有得出任何结论,因为那时操作员正在等待第一个线程的完成,而只是丢弃了第二个线程。

我认为许多人有一个问题,什么时候需要这种行为? 登录表单就是一个很好的例子。 在当前请求完成之前,向服务器发送多个请求是没有意义的。

我们正在完成申请


我们回想起第二篇文章中示例 。 在其中,我们在GitHub上实现了搜索,并使用了mergeMap运算符将请求发送到服务器。 现在我们知道该运算符的功能了,真的适合我们的情况吗?

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

假设GitHub服务器将严重过载,那么处理我们的响应将花费很多时间。 在这种情况下可能会出什么问题?

假设用户输入了一些数据,没有等待答案,而是输入了新的数据。 在这种情况下,我们会将第二个请求发送到服务器。 但是,没有人保证第一个请求的答案会更早到达。

由于mergeMap运算符并不关心处理内部线程的顺序,因此在第一个请求晚于第二个请求执行的情况下,我们将删除实际数据。 因此,我建议将mergeMap方法替换为switchMap:

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

现在,如果用户输入新数据,则switchMap将取消订阅先前的流并订阅新的流。

值得注意的是,我们的http请求将继续挂起,直到服务器给出答案为止。 但是,由于我们已取消订阅内部流,因此答案将不会落入外部流。

注意 :如果使用Angular并使用HttpClient处理http,则不必担心取消请求本身。 退订时,HttpClient可以为您执行此操作。

取消http


提取api可以使用AbortController取消http请求。 与switchMap运算符结合使用时,此功能将节省用户流量。

让我们重写一下示例。 并创建一个将fetch调用包装为可观察的方法:

 const createCancellableRequest = (url) => { //      const controller = new AbortController(); const signal = controller.signal; return new Observable(observer => { fetch(url, { signal }) .then(response => { if (response.ok) { return response.json(); } throw new Error(''); }) //     .then(result => observer.next(result)) //   .then(() => observer.complete()) //   ,     .catch(error => observer.error(error)); // ,    return () => { //   controller.abort(); }; }); }; 

还要更改getUsersRepsFromApi方法:

 const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); } 

现在该方法返回的不是Promise,而是可观察的。 因此,我们从switchMap中的包装器中删除了:

 switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) ) 

注意 :在RxJS 6.5版中,他们添加了fromFetch语句 ,该语句本身在后台调用了abort方法,因此您不再需要编写自己的“自行车”。

仅此而已! 所有示例代码都可以在此处找到。

结论


今天,我们研究了HOO是什么,以及该类别中一些非常有用的运算符。 当然,这些远非所有人。 有关更多详细信息,我建议访问RxJS 文档

在下一篇文章中,我打算考虑“热”和“冷”可观察物之间的区别。

最后:不要使用订阅中的订阅,因为有HOO!

Source: https://habr.com/ru/post/zh-CN450050/


All Articles