
Neste artigo, veremos como é possível processar outro em um thread, por que é necessário e como os operadores de Observadores de Ordem Superior (doravante referidos como HOO) nos ajudarão nisso.
Série de artigos "Fundamentos da programação reativa usando RxJS":
Ao trabalhar com encadeamentos, geralmente surge uma situação em que é necessário transferir os resultados de outro para um encadeamento como um valor. Por exemplo, queremos executar uma solicitação ajax e processar sua resposta no encadeamento atual ou executar várias solicitações paralelas, implementar o pool. Eu acho que muitas pessoas estão acostumadas a resolver esses problemas usando um mecanismo como a promessa. Mas é possível resolvê-los usando o RxJS? Claro, e tudo é muito mais fácil do que você pensa!
Nota : para entender a parte teórica do artigo, você não precisa ler os artigos anteriores, basta saber o que são observáveis, operadores e tubulações. Na parte prática, refinaremos o exemplo do
segundo artigo , que você pode encontrar
aqui .
O problema
Vamos imaginar a seguinte tarefa: precisamos descobrir a cada segundo se o servidor está acessível. Como podemos resolver isso?
Primeiro, crie um fluxo usando o método do timer:
timer(0, 1000).subscribe({ next: console.log });
O método do
temporizador é muito semelhante em princípio ao
intervalo . Porém, diferentemente, permite definir o tempo limite de início do encadeamento, que é transmitido pelo primeiro parâmetro. O segundo parâmetro indica o intervalo através do qual um novo valor será gerado. Se o segundo parâmetro não for especificado, o timer gerará apenas um valor e encerrará o fluxo.
Como você e eu não temos um servidor, sugiro apenas escrever uma função que emule uma solicitação ao servidor:
const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) }
O que esse método faz? Ele retorna um fluxo criado usando o método timer, que emite um valor após um segundo e termina. Como o método timer gera apenas um número, usamos o operador mapTo para substituí-lo pela string "success".
É assim que o fluxo criado pelo método makeRequest se parece:

Agora temos uma escolha: chamar o método makeRequest dentro do fluxo ou atribuir essa responsabilidade ao observador?
A primeira abordagem é preferível, pois, neste caso, poderemos usar todo o potencial do RxJS com seus operadores e aliviar nosso observador de tarefas desnecessárias. Usamos o método timer para executar solicitações por intervalo:
timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log });
Quando executamos esse código, veremos que no console.log não recebemos uma mensagem com o texto "success", mas um objeto do tipo Observable:

A resposta é bastante esperada, porque no mapa retornamos o fluxo. Para que um fluxo funcione, você precisa se inscrever nele. Bem, vamos ver como
não fazer isso :
timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); });
O problema com o exemplo acima é que obtemos uma assinatura em uma assinatura. Mas e se quisermos fazer mais de uma solicitação em uma cadeia? Ou se, em algum momento, precisarmos cancelar a assinatura do fluxo interno? Nesse caso, nosso código se parecerá cada vez mais com "macarrão". Para resolver esse problema, o RxJS possui operadores especiais chamados HOO.
Hoo
HOO é um tipo especial de instrução que aceita fluxos como valores. Um desses operadores é o método mergeAll.
Quando um fluxo chega a mergeAll, ele assina. O fluxo ao qual o operador se inscreveu é chamado interno. O fluxo do qual o operador recebe outros fluxos como valores é chamado de externo.
Quando um encadeamento interno gera um valor, o mergeAll envia esse valor para o encadeamento externo. Assim, nos livramos da necessidade de assinar manualmente. Se cancelarmos a inscrição no fluxo externo, o mergeAll cancelará automaticamente a inscrição no fluxo interno.
Vamos ver como podemos reescrever nosso exemplo com mergeAll:
timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log });
No exemplo acima, o fluxo externo foi criado pela instrução timer. E os fluxos criados no operador de mapa são internos. Cada thread criado cai na instrução mergeAll.

A combinação map + mergeAll é usada com muita frequência; portanto, no RxJS, existe um método mergeMap:
timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log });
Quando um encadeamento externo gera um valor, o operador mergeMap chama a função de retorno de chamada passada para ele, o que gera um novo encadeamento. O mergeMap se inscreve no fluxo gerado.

A peculiaridade do operador mergeAll / mergeMap é que, se outro fluxo chegar a ele, ele também o assinará. Assim, em um fluxo externo, podemos obter valores de vários internos ao mesmo tempo. Vamos ver o seguinte exemplo:
timer(0, 1000)
É assim que o fluxo externo ficará sem o operador mergeMap:

E assim com o mergeMap:
timer(0, 1000).pipe( mergeMap(() => interval(1000)) )

A cada segundo, criamos um novo encadeamento interno e o mergeMap o assina. Portanto, temos muitos threads internos trabalhando simultaneamente, cujos valores se enquadram no externo:

Nota : tenha cuidado ao usar o mergeMap, cada novo encadeamento interno funcionará até que você cancele sua inscrição no externo. No exemplo acima, o número de threads internos está aumentando a cada segundo. No final, pode haver tantos threads que o computador não consegue lidar com a carga.
concatAll / concatMap
O método mergeMap é ótimo quando você não se importa com a ordem de execução dos threads internos, mas e se você precisar? Suponha que queremos que a próxima solicitação do servidor seja executada somente após receber uma resposta da anterior?
Para esses fins, o operador HOO concatAll / concatMap é adequado. Esse operador, tendo se inscrito no encadeamento interno, espera até que ele termine e só depois assina o próximo.
Se, durante a execução de um encadeamento, um novo descer para ele, ele será colocado na fila até que o anterior seja concluído.
No exemplo acima, criamos dois threads usando o método timer. Para maior clareza, usei o operador mapTo para exibir valores diferentes. O primeiro thread gerará 1, o segundo - 2. Um thread externo é criado usando o método of, que usa dois dos itens acima observáveis como entrada.
A instrução concatAll primeiro recebe firstInnerObservable, assina e aguarda a conclusão, e somente após a conclusão da primeira assinatura em secondInnerObservable. Aqui está a aparência do fluxo externo:

Se substituirmos concatAll por mergeAll, o fluxo terá a seguinte aparência:
of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log });

switchAll / switchMap
Esse operador difere dos anteriores, pois ao receber um novo fluxo, ele imediatamente cancela a inscrição no anterior e assina o novo.
Pegue o exemplo acima e substitua concatAll por switchAll e veja como o fluxo externo se comporta:
of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log });

Somente o valor do segundo fluxo interno entrou no fluxo externo. Isso ocorre porque o switchMap cancelou a assinatura do primeiro quando recebeu o segundo thread.
Quando isso é necessário? Por exemplo, ao implementar uma pesquisa de dados. Se a resposta do servidor ainda não chegou e já enviamos uma nova solicitação, não precisamos aguardar a anterior.
escape / escapeMap
escape é exatamente o oposto da instrução switchAll, e seu comportamento é semelhante ao concatAll. Esse método, inscrevendo-se no fluxo, aguarda a conclusão. Se um novo fluxo se resumir a ele, ele será simplesmente descartado.
of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log });

No exemplo acima, não conseguimos um empate, porque naquele momento o operador estava aguardando a conclusão do primeiro encadeamento e simplesmente eliminou o segundo.
Eu acho que muitos têm uma pergunta, quando esse comportamento pode ser necessário? Um bom exemplo é o formulário de login. Não faz sentido enviar várias solicitações para o servidor até que a atual seja concluída.
Estamos finalizando a aplicação
Recordamos o
exemplo do
segundo artigo . Nele, implementamos uma pesquisa no GitHub e usamos o operador mergeMap para enviar solicitações ao servidor. Agora conhecemos os recursos desse operador, é realmente adequado no nosso caso?
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
Vamos supor que o servidor GitHub esteja sobrecarregado, e processar nossa resposta levará muito tempo. O que poderia dar errado nesse caso?
Suponha que um usuário inseriu alguns dados, não esperou por uma resposta e inseriu novos. Nesse caso, enviaremos a segunda solicitação para o servidor. No entanto, ninguém garante que a resposta à primeira solicitação venha antes.
Como o operador mergeMap não se importa em que ordem processar os encadeamentos internos, no caso em que a primeira solicitação for executada depois da segunda, apagaremos os dados reais. Portanto, proponho substituir o método mergeMap por switchMap:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
Agora, se o usuário digitar novos dados, o switchMap cancelará a inscrição no fluxo anterior e o novo.
Vale a pena notar que nossa solicitação http continuará paralisada até que o servidor dê uma resposta. Mas, como cancelamos a inscrição no fluxo interno, a resposta não cairá no fluxo externo.
Nota : se você trabalha com Angular e usa HttpClient para trabalhar com http, não pode se preocupar em cancelar a solicitação. O HttpClient pode fazer isso ao cancelar a inscrição.
Cancelar http
A API de busca tem a capacidade de cancelar a solicitação http usando o
AbortController . Quando combinada com o operador switchMap, essa funcionalidade economiza o tráfego do usuário.
Vamos reescrever um pouco o nosso exemplo. E crie um método que envolva a chamada de busca em observável:
const createCancellableRequest = (url) => {
Altere também o método getUsersRepsFromApi:
const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); }
Agora, o método retorna não promissor, mas observável. Portanto, removemos o wrapper do switchMap:
switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) )
Nota : no RxJS versão 6.5, eles adicionaram a
instrução fromFetch , que em si chama o método abort por baixo do capô, para que você não precise mais escrever sua “bicicleta”.
Isso é tudo! Todo o código de exemplo pode ser encontrado
aqui .
Conclusão
Hoje analisamos o que é o HOO e alguns operadores muito úteis dessa categoria. Claro, estes estavam longe de todos eles. Para informações mais detalhadas e detalhadas, recomendo visitar a
documentação do RxJS.
No próximo artigo, planejo considerar qual é a diferença entre observáveis quentes e frios.
Finalmente: não use a assinatura na assinatura, porque existe HOO!