
Em um
artigo anterior, vimos quais são os fluxos e com o que eles comem. Na nova parte, vamos nos familiarizar com os métodos que o RxJS fornece para criar fluxos, quais são os operadores, tubulações e como trabalhar com eles.
Série de artigos "Fundamentos da programação reativa usando RxJS":
O RxJS possui uma
API rica. A documentação descreve mais de cem métodos. Para conhecê-los um pouco, escreveremos um aplicativo simples e, na prática, veremos como é o código reativo. Você verá que as mesmas tarefas, que costumavam parecer rotineiras e exigiam escrever muito código, têm uma solução elegante se você as observar através do prisma da reatividade. Porém, antes de começarmos a praticar, veremos como os fluxos podem ser representados graficamente e nos familiarizaremos com métodos convenientes para criá-los e processá-los.
Representação gráfica de threads
Para demonstrar claramente como um fluxo específico se comporta, usarei a notação adotada na abordagem reativa. Lembre-se do nosso exemplo do artigo anterior:
const observable = new Observable((observer) => { observer.next(1); observer.next(2); observer.complete(); });
Aqui está como será sua representação gráfica:

O fluxo é geralmente representado como uma linha reta. Se o fluxo emitir algum valor, ele será exibido na linha como um círculo. Uma linha reta no visor é o sinal para terminar o fluxo. Para exibir o erro, use o símbolo - “×”.
const observable = new Observable((observer) => { observer.error(); });

Fluxos de uma linha
Na minha prática, raramente tive que criar minhas próprias instâncias observáveis diretamente. A maioria dos métodos para criar threads já está no RxJS. Para criar um fluxo que emite os valores 1 e 2, basta usar o método of:
const observable = of(1, 2);
O método of aceita qualquer número de argumentos e retorna uma instância finalizada do Observable. Após a inscrição, ele emitirá os valores recebidos e concluirá:

Se você deseja representar a matriz como um fluxo, pode usar o método from. O método from como argumento espera qualquer objeto iterável (matriz, string etc.) ou promessa e projeta esse objeto no fluxo. Aqui está como será o fluxo obtido da string:
const observable = from('abc');

E assim, você pode cumprir uma promessa em um fluxo:
const promise = new Promise((resolve, reject) => { resolve(1); }); const observable = from(promise);
Nota: geralmente os threads são comparados à promessa. De fato, eles têm apenas uma coisa em comum - uma
estratégia de impulso para espalhar a mudança. O resto são entidades completamente diferentes. A promessa não pode produzir vários valores. Ele só pode executar resolver ou rejeitar, ou seja, só tem dois estados. Um fluxo pode transmitir vários valores e pode ser reutilizado.
Você se lembra do exemplo com o intervalo do
primeiro artigo ? Esse fluxo é um cronômetro que conta o tempo em segundos a partir do momento da assinatura.
const timer = new Observable(observer => { let counter = 0; const intervalId = setInterval(() => { observer.next(counter++); }, 1000); return () => { clearInterval(intervalId); } });
Veja como você pode implementar a mesma coisa em uma linha:
const timer = interval(1000);

E, finalmente, um método que permite criar um fluxo de eventos para elementos DOM:
const observable = fromEvent(domElementRef, 'keyup');
Como valores, esse fluxo receberá e emitirá objetos de eventos de keyup.
Tubulações e operadores
Pipe é um método de classe Observable adicionado no RxJS na versão 5.5. Graças a isso, podemos construir cadeias de operadores para processamento sequencial dos valores recebidos no fluxo. Pipe é um canal unidirecional que interliga os operadores. Os próprios operadores são funções normais descritas no RxJS que processam valores de um fluxo.
Por exemplo, eles podem converter o valor e transmiti-lo ainda mais ao fluxo, ou podem atuar como filtros e não ignorar nenhum valor se não atenderem à condição especificada.
Vamos olhar para os operadores em ação. Multiplique cada valor do fluxo por 2 usando o operador de mapa:
of(1,2,3).pipe( map(value => value * 2) ).subscribe({ next: console.log });
Veja como é o fluxo antes de aplicar o operador de mapa:

Após a declaração do mapa:

Vamos usar o operador de filtro. Essa instrução funciona exatamente como a função de filtro na classe Array. O método assume uma função como o primeiro argumento, que descreve uma condição. Se o valor do fluxo atender à condição, ele será passado:
of(1, 2, 3).pipe(
E é assim que todo o esquema do nosso fluxo ficará:

Após o filtro:

Após o mapa:
Nota: pipe! == inscrever-se. O método de tubulação declara o comportamento do fluxo, mas não se inscreve. Até você chamar o método de inscrição, seu fluxo não começará a funcionar.
Estamos escrevendo um aplicativo
Agora que descobrimos o que são tubos e operadores, você pode começar a praticar. Nosso aplicativo executará uma tarefa simples: exibir uma lista de repositórios abertos do github pelo apelido do proprietário digitado.
Existem poucos requisitos:
- Não execute uma solicitação de API se a sequência inserida na entrada contiver menos de 3 caracteres;
- Para não atender à solicitação de cada caractere digitado pelo usuário, é necessário definir um atraso (devolução) de 700 milissegundos antes de acessar a API;
Para procurar repositórios, usaremos a
API do
github . Eu recomendo executar os exemplos em
stackblitz . Lá eu expus a implementação finalizada. Os links são fornecidos no final do artigo.
Vamos começar com a marcação html. Vamos descrever os elementos input e ul:
<input type="text"> <ul></ul>
Em seguida, no arquivo js ou ts, obtemos links para os elementos atuais usando a API do navegador:
const input = document.querySelector('input'); const ul = document.querySelector('ul');
Também precisamos de um método que execute uma solicitação para a API do github. Abaixo está o código da função getUsersRepsFromAPI, que aceita o apelido do usuário e executa uma solicitação ajax usando a busca. Em seguida, ele retorna uma promessa, convertendo a resposta bem-sucedida para json ao longo do caminho:
const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return fetch(url) .then(response => { if(response.ok) { return response.json(); } throw new Error(''); }); }
Em seguida, escrevemos um método que lista os nomes dos repositórios:
const recordRepsToList = (reps) => { for (let i = 0; i < reps.length; i++) {
Os preparativos estão completos. É hora de dar uma olhada no RxJS em ação. Precisamos ouvir o evento keyup de nossa entrada. Antes de tudo, precisamos entender que, em uma abordagem reativa, trabalhamos com fluxos. Felizmente, o RxJS já oferece uma opção semelhante. Lembre-se do método fromEvent que eu mencionei acima. Nós usamos:
const keyUp = fromEvent(input, 'keyup'); keyUp.subscribe({ next: console.log });
Agora nosso evento é apresentado como um fluxo. Se observarmos o que é exibido no console, veremos um objeto do tipo KeyboardEvent. Mas precisamos de um valor inserido pelo usuário. É aqui que o método de tubulação e o operador de mapa são úteis:
fromEvent(input, 'keyup').pipe( map(event => event.target.value) ).subscribe({ next: console.log });
Prosseguimos com a implementação dos requisitos. Para começar, executaremos a consulta quando o valor digitado contiver mais de dois caracteres. Para fazer isso, use o operador de filtro:
fromEvent(input, 'keyup').pipe( map(event => event.target.value), filter(value => value.length > 2) )
Lidamos com o primeiro requisito. Prosseguimos para o segundo. Precisamos implementar debounce. O RxJS possui uma instrução debounceTime. Esse operador, como o primeiro argumento, leva o número de milissegundos durante os quais o valor será mantido antes da transmissão. Nesse caso, cada novo valor redefinirá o timer. Assim, na saída, obtemos o último valor, após o qual 700 milissegundos se passaram.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(value => value.length > 2) )
Aqui está a aparência do nosso fluxo sem debounceTime:

E é assim que o mesmo fluxo transmitido por essa declaração será semelhante:

Com o debounceTime, é menos provável que você use a API, o que economizará tráfego e descarregará o servidor.
Para otimização adicional, sugiro usar outro operador - distinctUntilChanged. Este método nos salvará de duplicatas. É melhor mostrar seu trabalho usando um exemplo:
from('aaabccc').pipe( distinctUntilChanged() )
Sem distinctUntilChanged:

Com distinctUntilChanged:

Adicione esta declaração imediatamente após a declaração debounceTime. Portanto, não acessaremos a API se o novo valor, por algum motivo, coincidir com o anterior. Uma situação semelhante pode ocorrer quando o usuário digita novos caracteres e os apaga novamente. Como implementamos um atraso, apenas o último valor cairá no fluxo, a resposta à qual já temos.
Vá para o servidor
Já podemos descrever a lógica da solicitação e o processamento da resposta. Embora só possamos trabalhar com promessa. Portanto, descrevemos outro operador de mapa que chamará o método getUsersRepsFromAPI. No observador, descrevemos a lógica de processamento de nossa promessa:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), map(value => getUsersRepsFromAPI(value)) ).subscribe({ next: promise => promise.then(reps => recordRepsToList(reps)) });
No momento, implementamos tudo o que queríamos. Mas nosso exemplo tem uma grande desvantagem: não há tratamento de erros. Nosso observador recebe apenas uma promessa e não tem idéia de que algo pode dar errado.
Obviamente, podemos manter a promessa no próximo método, mas, por causa disso, nosso código começará a parecer cada vez mais um "inferno de retorno de chamada". Se de repente precisarmos executar mais uma solicitação, a complexidade do código aumentará.
Nota: o uso da promessa no código RxJS é considerado antipadrão. A promessa tem muitas desvantagens em relação ao observável. Não pode ser desfeito e não pode ser reutilizado. Se você tiver uma escolha, escolha observável. O mesmo vale para o método toPromise da classe Observable. Este método foi implementado para compatibilidade com bibliotecas que não podem funcionar com fluxos.
Podemos usar o método from para projetar uma promessa em um fluxo, mas esse método está repleto de chamadas adicionais para o método de inscrição e também levará ao crescimento e à complexidade do código.
Esse problema pode ser resolvido usando o operador mergeMap:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
Agora não precisamos escrever uma lógica de processamento de promessas. O método from criou um fluxo de promessa e o operador mergeMap o processou. Se a promessa for cumprida com sucesso, o próximo método será chamado e nosso observador receberá o objeto final. Se ocorrer um erro, o método do erro será chamado e nosso observador emitirá um erro no console.
O operador mergeMap é um pouco diferente dos operadores com os quais trabalhamos anteriormente; ele pertence aos chamados
Observáveis de Ordem Superior , que discutimos no próximo artigo. Mas, olhando para o futuro, direi que o próprio método mergeMap se inscreve no fluxo.
Tratamento de erros
Se o nosso thread receber um erro, ele será encerrado. E se tentarmos interagir com o aplicativo após um erro, não receberemos nenhuma reação, pois nosso encadeamento foi concluído.
Aqui, o operador catchError nos ajudará. catchError é gerado apenas quando ocorre um erro no fluxo. Permite interceptá-lo, processá-lo e retornar ao fluxo o valor usual, o que não levará à sua conclusão.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))), catchError(err => of([])) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
Pegamos o erro em catchError e, em vez disso, retornamos um fluxo com uma matriz vazia. Agora, quando ocorrer um erro, limparemos a lista de repositórios. Mas então o fluxo termina novamente.
O problema é que catchError substitui nosso fluxo original por um novo. E então nosso observador ouve apenas ele. Quando o fluxo of emite uma matriz vazia, o método completo será chamado.
Para não substituir nosso encadeamento original, chamamos o operador catchError no encadeamento from de dentro do operador mergeMap.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
Assim, nosso fluxo original não notará nada. Em vez de um erro, ele obterá uma matriz vazia.
Conclusão
Finalmente começamos a praticar e vimos para que servem os operadores e tubos. Vimos como reduzir o código usando a API avançada que o RxJS nos fornece. Obviamente, nosso aplicativo não está concluído. Na próxima parte, analisaremos como é possível processar outro em um thread e como cancelar nossa solicitação http, a fim de economizar ainda mais tráfego e recursos de nosso aplicativo. E para que você possa ver a diferença, eu expus um exemplo sem usar o RxJS, você pode vê-lo
aqui .
Neste link, você encontrará o código completo do aplicativo atual. Para gerar os circuitos, usei o
visualizador RxJS .
Espero que este artigo tenha ajudado você a entender melhor como o RxJS funciona. Desejo-lhe sucesso em seu estudo!