
En un
artículo anterior, analizamos qué son las transmisiones y con qué comen. En la nueva parte, nos familiarizaremos con los métodos que RxJS proporciona para crear flujos, qué son operadores, tuberías y cómo trabajar con ellos.
Serie de artículos "Fundamentos de la programación reactiva con RxJS":
RxJS tiene una
API rica. La documentación describe más de cien métodos. Para conocerlos un poco, escribiremos una aplicación simple y en la práctica veremos cómo se ve el código reactivo. Verá que las mismas tareas, que solían parecer rutinarias y requerían escribir mucho código, tienen una solución elegante si las observa a través del prisma de la reactividad. Pero antes de comenzar a practicar, veremos cómo se pueden representar gráficamente los flujos y nos familiarizaremos con métodos convenientes para crearlos y procesarlos.
Representación gráfica de hilos
Para demostrar claramente cómo se comporta un flujo particular, utilizaré la notación adoptada en el enfoque reactivo. Recordemos nuestro ejemplo del artículo anterior:
const observable = new Observable((observer) => { observer.next(1); observer.next(2); observer.complete(); });
Así se verá su representación gráfica:

El flujo generalmente se representa como una línea recta. Si la secuencia emite algún valor, se muestra en la línea como un círculo. Una línea recta en la pantalla es la señal para finalizar la transmisión. Para mostrar el error, use el símbolo - "×".
const observable = new Observable((observer) => { observer.error(); });

Flujos de una línea
En mi práctica, rara vez tuve que crear mis propias instancias de Observable directamente. La mayoría de los métodos para crear hilos ya están en RxJS. Para crear un flujo que emite los valores 1 y 2, es suficiente usar el método of:
const observable = of(1, 2);
El método of acepta cualquier número de argumentos y devuelve una instancia terminada del Observable. Después de suscribirse, emitirá los valores recibidos y completará:

Si desea representar la matriz como una secuencia, puede usar el método from. El método from como argumento espera cualquier objeto iterable (matriz, cadena, etc.) o promete, y proyecta este objeto en la secuencia. Así se verá la secuencia obtenida de la cadena:
const observable = from('abc');

Y así, puedes envolver una promesa en una secuencia:
const promise = new Promise((resolve, reject) => { resolve(1); }); const observable = from(promise);
Nota: a menudo los hilos se comparan con la promesa. De hecho, solo tienen una cosa en común: una
estrategia de impulso para difundir el cambio. El resto son entidades completamente diferentes. Promise no puede producir múltiples valores. Solo puede ejecutar resolver o rechazar, es decir solo tiene dos estados. Un flujo puede transmitir varios valores y puede reutilizarse.
¿Recuerdas el ejemplo con el intervalo del
primer artículo ? Este flujo es un temporizador que cuenta el tiempo en segundos desde el momento de la suscripción.
const timer = new Observable(observer => { let counter = 0; const intervalId = setInterval(() => { observer.next(counter++); }, 1000); return () => { clearInterval(intervalId); } });
Así es como puede implementar lo mismo en una línea:
const timer = interval(1000);

Y, por último, un método que le permite crear una secuencia de eventos para elementos DOM:
const observable = fromEvent(domElementRef, 'keyup');
Como valores, esta secuencia recibirá y emitirá objetos de evento de keyup.
Tuberías y Operadores
Pipe es un método de clase Observable agregado en RxJS en la versión 5.5. Gracias a ello, podemos construir cadenas de operadores para el procesamiento secuencial de los valores recibidos en la secuencia. La tubería es un canal unidireccional que interconecta a los operadores. Los operadores mismos son funciones normales descritas en RxJS que procesan valores de una secuencia.
Por ejemplo, pueden convertir el valor y pasarlo más a la secuencia, o pueden actuar como filtros y no omitir ningún valor si no cumplen con la condición especificada.
Veamos a los operadores en acción. Multiplique cada valor de la secuencia por 2 usando el operador de mapa:
of(1,2,3).pipe( map(value => value * 2) ).subscribe({ next: console.log });
Así es como se ve la secuencia antes de aplicar el operador de mapa:

Después de la declaración del mapa:

Usemos el operador de filtro. Esta declaración funciona igual que la función de filtro en la clase Array. El método toma una función como primer argumento, que describe una condición. Si el valor del flujo satisface la condición, entonces se pasa:
of(1, 2, 3).pipe(
Y así es como se verá todo el esquema de nuestra transmisión:

Después del filtro:

Después del mapa:
Nota: pipe! == suscribirse. El método de tubería declara el comportamiento del flujo, pero no se suscribe. Hasta que llame al método de suscripción, su transmisión no comenzará a funcionar.
Estamos escribiendo una solicitud
Ahora que hemos descubierto qué son las tuberías y los operadores, puede ponerse a practicar. Nuestra aplicación realizará una tarea simple: mostrar una lista de repositorios abiertos de github por el apodo ingresado por el propietario.
Habrá pocos requisitos:
- No ejecute una solicitud de API si la cadena ingresada en la entrada contiene menos de 3 caracteres;
- Para no cumplir con la solicitud de cada carácter ingresado por el usuario, es necesario establecer un retraso (rebote) de 700 milisegundos antes de acceder a la API;
Para buscar repositorios, utilizaremos la
API de github . Recomiendo ejecutar los ejemplos ellos mismos en
stackblitz . Allí presenté la implementación terminada. Los enlaces se proporcionan al final del artículo.
Comencemos con el marcado html. Describamos los elementos input y ul:
<input type="text"> <ul></ul>
Luego, en el archivo js o ts, obtenemos enlaces a los elementos actuales utilizando la API del navegador:
const input = document.querySelector('input'); const ul = document.querySelector('ul');
También necesitamos un método que ejecute una solicitud a la API de github. A continuación se muestra el código para la función getUsersRepsFromAPI, que acepta el apodo del usuario y realiza una solicitud ajax utilizando fetch. Luego devuelve una promesa, convirtiendo la respuesta exitosa a json en el camino:
const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return fetch(url) .then(response => { if(response.ok) { return response.json(); } throw new Error(''); }); }
A continuación, escribimos un método que enumerará los nombres de los repositorios:
const recordRepsToList = (reps) => { for (let i = 0; i < reps.length; i++) {
Los preparativos están completos. Es hora de echar un vistazo a RxJS en acción. Necesitamos escuchar el evento keyup de nuestra entrada. En primer lugar, debemos entender que en un enfoque reactivo, trabajamos con flujos. Afortunadamente, RxJS ya ofrece una opción similar. Recuerde el método fromEvent que mencioné anteriormente. Lo usamos:
const keyUp = fromEvent(input, 'keyup'); keyUp.subscribe({ next: console.log });
Ahora nuestro evento se presenta como una transmisión. Si miramos lo que se muestra en la consola, veremos un objeto de tipo KeyboardEvent. Pero necesitamos un valor introducido por el usuario. Aquí es donde el método de tubería y el operador del mapa son útiles:
fromEvent(input, 'keyup').pipe( map(event => event.target.value) ).subscribe({ next: console.log });
Procedemos a la implementación de los requisitos. Para comenzar, ejecutaremos la consulta cuando el valor ingresado contenga más de dos caracteres. Para hacer esto, use el operador de filtro:
fromEvent(input, 'keyup').pipe( map(event => event.target.value), filter(value => value.length > 2) )
Nos ocupamos del primer requisito. Pasamos a la segunda. Necesitamos implementar debounce. RxJS tiene una declaración debounceTime. Este operador como primer argumento toma el número de milisegundos durante los cuales se mantendrá el valor antes de pasar. En este caso, cada nuevo valor restablecerá el temporizador. Por lo tanto, en la salida obtenemos el último valor, después del cual han pasado 700 milisegundos.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(value => value.length > 2) )
Así es como se vería nuestra transmisión sin debounceTime:

Y así es como se verá la misma secuencia que pasó a través de esta declaración:

Con debounceTime, será menos probable que usemos la API, que ahorrará tráfico y descargará el servidor.
Para una optimización adicional, sugiero usar otro operador: distinctUntilChanged. Este método nos salvará de los duplicados. Es mejor mostrar su trabajo usando un ejemplo:
from('aaabccc').pipe( distinctUntilChanged() )
Sin distinciónUntilChanged:

Con distinctUntilChanged:

Agregue esta declaración inmediatamente después de la declaración debounceTime. Por lo tanto, no accederemos a la API si el nuevo valor por alguna razón coincide con el anterior. Una situación similar puede ocurrir cuando el usuario ha ingresado nuevos caracteres y luego los ha borrado nuevamente. Como hemos implementado un retraso, solo el último valor caerá en la secuencia, la respuesta a la que ya tenemos.
Ir al servidor
Ya ahora podemos describir la lógica de la solicitud y el procesamiento de la respuesta. Si bien solo podemos trabajar con promesa. Por lo tanto, describimos otro operador de mapa que llamará al método getUsersRepsFromAPI. En el observador, describimos la lógica de procesamiento de nuestra promesa:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), map(value => getUsersRepsFromAPI(value)) ).subscribe({ next: promise => promise.then(reps => recordRepsToList(reps)) });
Por el momento, hemos implementado todo lo que queríamos. Pero nuestro ejemplo tiene un gran inconveniente: no hay manejo de errores. Nuestro observador recibe solo una promesa y no tiene idea de que algo podría salir mal.
Por supuesto, podemos mantener la promesa en el próximo método, pero debido a esto, nuestro código comenzará a parecerse cada vez más a un "infierno de devolución de llamada". Si de repente necesitamos ejecutar una solicitud más, la complejidad del código aumentará.
Nota: el uso de promesa en el código RxJS se considera antipatrón. La promesa tiene muchas desventajas en comparación con la observable. No se puede deshacer y no se puede reutilizar. Si tiene una opción, elija observable. Lo mismo ocurre con el método toPromise de la clase Observable. Este método se implementó para la compatibilidad con bibliotecas que no pueden funcionar con secuencias.
Podemos usar el método from para proyectar una promesa en una secuencia, pero este método está lleno de llamadas adicionales al método de suscripción y también conducirá al crecimiento y la complejidad del código.
Este problema se puede resolver utilizando el operador mergeMap:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
Ahora no necesitamos escribir la lógica de procesamiento de promesas. El método from hizo un flujo prometedor y el operador mergeMap lo procesó. Si la promesa se cumple con éxito, se llama al siguiente método y nuestro observador recibirá el objeto terminado. Si se produce un error, se llamará al método de error y nuestro observador generará un error en la consola.
El operador mergeMap es ligeramente diferente de los operadores con los que trabajamos anteriormente; pertenece a los llamados
Observables de orden superior , que analizaré en el próximo artículo. Pero, mirando hacia el futuro, diré que el método mergeMap se suscribe a la transmisión.
Manejo de errores
Si nuestro hilo recibe un error, entonces terminará. Y si intentamos interactuar con la aplicación después de un error, no tendremos ninguna reacción, ya que nuestro hilo se ha completado.
Aquí el operador catchError nos ayudará. catchError solo se genera cuando se produce un error en la secuencia. Le permite interceptarlo, procesarlo y devolver a la secuencia el valor habitual, lo que no llevará a su finalización.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))), catchError(err => of([])) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
Detectamos el error en catchError y en su lugar devolvemos una secuencia con una matriz vacía. Ahora, cuando ocurre un error, borraremos la lista de repositorios. Pero entonces el flujo termina de nuevo.
Lo que pasa es que catchError reemplaza nuestra transmisión original por una nueva. Y luego nuestro observador solo lo escucha. Cuando of of stream emite una matriz vacía, se llamará al método completo.
Para no reemplazar nuestro hilo original, llamamos al operador catchError en el hilo from desde el operador mergeMap.
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log })
Por lo tanto, nuestra transmisión original no notará nada. En lugar de un error, obtendrá una matriz vacía.
Conclusión
Finalmente comenzamos a practicar y vimos para qué son las tuberías y los operadores. Analizamos cómo reducir el código utilizando la API enriquecida que RxJS nos proporciona. Por supuesto, nuestra aplicación no está terminada, en la siguiente parte analizaremos cómo es posible procesar otro en un hilo y cómo cancelar nuestra solicitud http para ahorrar aún más tráfico y recursos de nuestra aplicación. Y para que pueda ver la diferencia, presenté un ejemplo sin usar RxJS, puede verlo
aquí . En
este enlace encontrará el código completo de la aplicación actual. Para generar los circuitos, utilicé el
visualizador RxJS .
Espero que este artículo te haya ayudado a comprender mejor cómo funciona RxJS. ¡Te deseo éxito en tu estudio!