Noções básicas de programação reativa usando RxJS. Parte 3. Observáveis ​​de ordem superior



Neste artigo, veremos como é possível processar outro em um thread, por que é necessário e como os operadores de Observadores de Ordem Superior (doravante referidos como HOO) nos ajudarão nisso.

Série de artigos "Fundamentos da programação reativa usando RxJS":



Ao trabalhar com encadeamentos, geralmente surge uma situação em que é necessário transferir os resultados de outro para um encadeamento como um valor. Por exemplo, queremos executar uma solicitação ajax e processar sua resposta no encadeamento atual ou executar várias solicitações paralelas, implementar o pool. Eu acho que muitas pessoas estão acostumadas a resolver esses problemas usando um mecanismo como a promessa. Mas é possível resolvê-los usando o RxJS? Claro, e tudo é muito mais fácil do que você pensa!

Nota : para entender a parte teórica do artigo, você não precisa ler os artigos anteriores, basta saber o que são observáveis, operadores e tubulações. Na parte prática, refinaremos o exemplo do segundo artigo , que você pode encontrar aqui .

O problema


Vamos imaginar a seguinte tarefa: precisamos descobrir a cada segundo se o servidor está acessível. Como podemos resolver isso?

Primeiro, crie um fluxo usando o método do timer:

timer(0, 1000).subscribe({ next: console.log }); 

O método do temporizador é muito semelhante em princípio ao intervalo . Porém, diferentemente, permite definir o tempo limite de início do encadeamento, que é transmitido pelo primeiro parâmetro. O segundo parâmetro indica o intervalo através do qual um novo valor será gerado. Se o segundo parâmetro não for especificado, o timer gerará apenas um valor e encerrará o fluxo.

Como você e eu não temos um servidor, sugiro apenas escrever uma função que emule uma solicitação ao servidor:

 const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) } 

O que esse método faz? Ele retorna um fluxo criado usando o método timer, que emite um valor após um segundo e termina. Como o método timer gera apenas um número, usamos o operador mapTo para substituí-lo pela string "success".

É assim que o fluxo criado pelo método makeRequest se parece:



Agora temos uma escolha: chamar o método makeRequest dentro do fluxo ou atribuir essa responsabilidade ao observador?

A primeira abordagem é preferível, pois, neste caso, poderemos usar todo o potencial do RxJS com seus operadores e aliviar nosso observador de tarefas desnecessárias. Usamos o método timer para executar solicitações por intervalo:

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log }); 

Quando executamos esse código, veremos que no console.log não recebemos uma mensagem com o texto "success", mas um objeto do tipo Observable:



A resposta é bastante esperada, porque no mapa retornamos o fluxo. Para que um fluxo funcione, você precisa se inscrever nele. Bem, vamos ver como não fazer isso :

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); }); 

O problema com o exemplo acima é que obtemos uma assinatura em uma assinatura. Mas e se quisermos fazer mais de uma solicitação em uma cadeia? Ou se, em algum momento, precisarmos cancelar a assinatura do fluxo interno? Nesse caso, nosso código se parecerá cada vez mais com "macarrão". Para resolver esse problema, o RxJS possui operadores especiais chamados HOO.

Hoo


HOO é um tipo especial de instrução que aceita fluxos como valores. Um desses operadores é o método mergeAll.

Quando um fluxo chega a mergeAll, ele assina. O fluxo ao qual o operador se inscreveu é chamado interno. O fluxo do qual o operador recebe outros fluxos como valores é chamado de externo.

Quando um encadeamento interno gera um valor, o mergeAll envia esse valor para o encadeamento externo. Assim, nos livramos da necessidade de assinar manualmente. Se cancelarmos a inscrição no fluxo externo, o mergeAll cancelará automaticamente a inscrição no fluxo interno.

Vamos ver como podemos reescrever nosso exemplo com mergeAll:

 timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log }); 

No exemplo acima, o fluxo externo foi criado pela instrução timer. E os fluxos criados no operador de mapa são internos. Cada thread criado cai na instrução mergeAll.



A combinação map + mergeAll é usada com muita frequência; portanto, no RxJS, existe um método mergeMap:

 timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log }); 

Quando um encadeamento externo gera um valor, o operador mergeMap chama a função de retorno de chamada passada para ele, o que gera um novo encadeamento. O mergeMap se inscreve no fluxo gerado.



A peculiaridade do operador mergeAll / mergeMap é que, se outro fluxo chegar a ele, ele também o assinará. Assim, em um fluxo externo, podemos obter valores de vários internos ao mesmo tempo. Vamos ver o seguinte exemplo:

  timer(0, 1000) 

É assim que o fluxo externo ficará sem o operador mergeMap:



E assim com o mergeMap:

 timer(0, 1000).pipe( mergeMap(() => interval(1000)) ) 



A cada segundo, criamos um novo encadeamento interno e o mergeMap o assina. Portanto, temos muitos threads internos trabalhando simultaneamente, cujos valores se enquadram no externo:





Nota : tenha cuidado ao usar o mergeMap, cada novo encadeamento interno funcionará até que você cancele sua inscrição no externo. No exemplo acima, o número de threads internos está aumentando a cada segundo. No final, pode haver tantos threads que o computador não consegue lidar com a carga.

concatAll / concatMap


O método mergeMap é ótimo quando você não se importa com a ordem de execução dos threads internos, mas e se você precisar? Suponha que queremos que a próxima solicitação do servidor seja executada somente após receber uma resposta da anterior?

Para esses fins, o operador HOO concatAll / concatMap é adequado. Esse operador, tendo se inscrito no encadeamento interno, espera até que ele termine e só depois assina o próximo.

Se, durante a execução de um encadeamento, um novo descer para ele, ele será colocado na fila até que o anterior seja concluído.

 // ,  1     const firstInnerObservable = timer(1000).pipe( mapTo(1) ); // ,  2     const secondInnerObservable = timer(500).pipe( mapTo(2) ); of( firstInnerObservable, secondInnerObservable ).pipe( concatAll() ).subscribe({ next: console.log }); 

No exemplo acima, criamos dois threads usando o método timer. Para maior clareza, usei o operador mapTo para exibir valores diferentes. O primeiro thread gerará 1, o segundo - 2. Um thread externo é criado usando o método of, que usa dois dos itens acima observáveis ​​como entrada.

A instrução concatAll primeiro recebe firstInnerObservable, assina e aguarda a conclusão, e somente após a conclusão da primeira assinatura em secondInnerObservable. Aqui está a aparência do fluxo externo:



Se substituirmos concatAll por mergeAll, o fluxo terá a seguinte aparência:

 of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log }); 



switchAll / switchMap


Esse operador difere dos anteriores, pois ao receber um novo fluxo, ele imediatamente cancela a inscrição no anterior e assina o novo.

Pegue o exemplo acima e substitua concatAll por switchAll e veja como o fluxo externo se comporta:

 of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log }); 



Somente o valor do segundo fluxo interno entrou no fluxo externo. Isso ocorre porque o switchMap cancelou a assinatura do primeiro quando recebeu o segundo thread.

Quando isso é necessário? Por exemplo, ao implementar uma pesquisa de dados. Se a resposta do servidor ainda não chegou e já enviamos uma nova solicitação, não precisamos aguardar a anterior.

escape / escapeMap


escape é exatamente o oposto da instrução switchAll, e seu comportamento é semelhante ao concatAll. Esse método, inscrevendo-se no fluxo, aguarda a conclusão. Se um novo fluxo se resumir a ele, ele será simplesmente descartado.

 of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log }); 



No exemplo acima, não conseguimos um empate, porque naquele momento o operador estava aguardando a conclusão do primeiro encadeamento e simplesmente eliminou o segundo.

Eu acho que muitos têm uma pergunta, quando esse comportamento pode ser necessário? Um bom exemplo é o formulário de login. Não faz sentido enviar várias solicitações para o servidor até que a atual seja concluída.

Estamos finalizando a aplicação


Recordamos o exemplo do segundo artigo . Nele, implementamos uma pesquisa no GitHub e usamos o operador mergeMap para enviar solicitações ao servidor. Agora conhecemos os recursos desse operador, é realmente adequado no nosso caso?

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

Vamos supor que o servidor GitHub esteja sobrecarregado, e processar nossa resposta levará muito tempo. O que poderia dar errado nesse caso?

Suponha que um usuário inseriu alguns dados, não esperou por uma resposta e inseriu novos. Nesse caso, enviaremos a segunda solicitação para o servidor. No entanto, ninguém garante que a resposta à primeira solicitação venha antes.

Como o operador mergeMap não se importa em que ordem processar os encadeamentos internos, no caso em que a primeira solicitação for executada depois da segunda, apagaremos os dados reais. Portanto, proponho substituir o método mergeMap por switchMap:

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

Agora, se o usuário digitar novos dados, o switchMap cancelará a inscrição no fluxo anterior e o novo.

Vale a pena notar que nossa solicitação http continuará paralisada até que o servidor dê uma resposta. Mas, como cancelamos a inscrição no fluxo interno, a resposta não cairá no fluxo externo.

Nota : se você trabalha com Angular e usa HttpClient para trabalhar com http, não pode se preocupar em cancelar a solicitação. O HttpClient pode fazer isso ao cancelar a inscrição.

Cancelar http


A API de busca tem a capacidade de cancelar a solicitação http usando o AbortController . Quando combinada com o operador switchMap, essa funcionalidade economiza o tráfego do usuário.

Vamos reescrever um pouco o nosso exemplo. E crie um método que envolva a chamada de busca em observável:

 const createCancellableRequest = (url) => { //      const controller = new AbortController(); const signal = controller.signal; return new Observable(observer => { fetch(url, { signal }) .then(response => { if (response.ok) { return response.json(); } throw new Error(''); }) //     .then(result => observer.next(result)) //   .then(() => observer.complete()) //   ,     .catch(error => observer.error(error)); // ,    return () => { //   controller.abort(); }; }); }; 

Altere também o método getUsersRepsFromApi:

 const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); } 

Agora, o método retorna não promissor, mas observável. Portanto, removemos o wrapper do switchMap:

 switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) ) 

Nota : no RxJS versão 6.5, eles adicionaram a instrução fromFetch , que em si chama o método abort por baixo do capô, para que você não precise mais escrever sua “bicicleta”.

Isso é tudo! Todo o código de exemplo pode ser encontrado aqui .

Conclusão


Hoje analisamos o que é o HOO e alguns operadores muito úteis dessa categoria. Claro, estes estavam longe de todos eles. Para informações mais detalhadas e detalhadas, recomendo visitar a documentação do RxJS.

No próximo artigo, planejo considerar qual é a diferença entre observáveis ​​quentes e frios.

Finalmente: não use a assinatura na assinatura, porque existe HOO!

Source: https://habr.com/ru/post/pt450050/


All Articles