
En este artículo veremos cómo es posible procesar otro en un hilo, por qué es necesario y cómo los operadores de Observación de orden superior (en lo sucesivo, HOO) nos ayudarán en esto.
Serie de artículos "Fundamentos de la programación reactiva con RxJS":
Cuando se trabaja con hilos, a menudo surge una situación en la que es necesario transferir los resultados de otro a un hilo como valor. Por ejemplo, queremos ejecutar una solicitud ajax y procesar su respuesta en el hilo actual, o ejecutar varias solicitudes paralelas, implementar la agrupación. Creo que muchas personas están acostumbradas a resolver estos problemas utilizando un mecanismo como la promesa. ¿Pero es posible resolverlos usando RxJS? ¡Por supuesto, y todo es mucho más fácil de lo que piensas!
Nota : para comprender la parte teórica del artículo, no es necesario que lea los artículos anteriores, solo necesita saber qué son observables, operadores y tuberías. En la parte práctica, refinaremos el ejemplo del
segundo artículo , que puede encontrar
aquí .
El problema
Imaginemos la siguiente tarea: necesitamos averiguar cada segundo si el servidor es accesible. ¿Cómo podemos resolverlo?
Primero, cree una secuencia utilizando el método del temporizador:
timer(0, 1000).subscribe({ next: console.log });
El método del
temporizador es muy similar en principio al
intervalo . Pero a diferencia de esto, le permite establecer el tiempo de espera de inicio del subproceso, que se transmite por el primer parámetro. El segundo parámetro indica el intervalo a través del cual se generará un nuevo valor. Si no se especifica el segundo parámetro, el temporizador generará solo un valor y terminará la transmisión.
Como usted y yo no tenemos un servidor, sugiero escribir una función que emule una solicitud al servidor:
const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) }
¿Qué hace este método? Devuelve una secuencia creada utilizando el método del temporizador, que emite un valor después de que un segundo ha pasado y termina. Dado que el método del temporizador solo genera un número, utilizamos el operador mapTo para reemplazarlo con la cadena "éxito".
Así es como se ve la secuencia creada por el método makeRequest:

Ahora tenemos una opción: ¿llamar al método makeRequest dentro de la secuencia o asignar esta responsabilidad al observador?
El primer enfoque es preferible, ya que en este caso podremos utilizar todo el potencial de RxJS con sus operadores y liberar a nuestro observador de tareas innecesarias. Usamos el método del temporizador para ejecutar solicitudes por intervalo:
timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log });
Cuando ejecutamos dicho código, veremos que en console.log no recibimos un mensaje con el texto "éxito", sino un objeto de tipo Observable:

La respuesta es bastante esperada, porque en el mapa devolvemos la secuencia. Para que una secuencia funcione, debe suscribirse a ella. Bueno, veamos cómo
no hacerlo :
timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); });
El problema con el ejemplo anterior es que obtenemos una suscripción en una suscripción. Pero, ¿qué pasa si queremos hacer más de una solicitud en una cadena? ¿O qué pasa si en algún momento necesitamos cancelar la suscripción del flujo interno? En este caso, nuestro código se parecerá cada vez más a "fideos". Para resolver este problema, RxJS tiene operadores especiales llamados HOO.
Hoo
HOO es un tipo especial de declaraciones que aceptan flujos como valores. Uno de estos operadores es el método mergeAll.
Cuando una secuencia llega a mergeAll, se suscribe a ella. La secuencia a la que se suscribió el operador se llama interna. El flujo desde el cual el operador recibe otros flujos como valores se llama externo.
Cuando un hilo interno genera un valor, mergeAll empuja ese valor al hilo externo. Por lo tanto, nos deshacemos de la necesidad de suscribirse manualmente. Si nos damos de baja del flujo externo, mergeAll automáticamente se dará de baja del flujo interno.
Veamos cómo podemos reescribir nuestro ejemplo con mergeAll:
timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log });
En el ejemplo anterior, la secuencia externa fue creada por la declaración del temporizador. Y los flujos que se crean en el operador del mapa son internos. Cada hilo creado cae en la instrucción mergeAll.

La combinación map + mergeAll se usa muy a menudo, por lo tanto, en RxJS hay un método mergeMap:
timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log });
Cuando un hilo externo genera un valor, el operador mergeMap llama a la función de devolución de llamada que se le pasa, lo que genera un nuevo hilo. Luego mergeMap se suscribe a la secuencia generada.

La peculiaridad del operador mergeAll / mergeMap es que si se trata de otra secuencia, también se suscribe. Por lo tanto, en un flujo externo, podemos obtener valores de varios internos a la vez. Veamos el siguiente ejemplo:
timer(0, 1000)
Así es como se verá la transmisión externa sin el operador mergeMap:

Y así con mergeMap:
timer(0, 1000).pipe( mergeMap(() => interval(1000)) )

Cada segundo, creamos un nuevo hilo interno y mergeMap se suscribe a él. Por lo tanto, tenemos muchos hilos internos trabajando simultáneamente, cuyos valores caen en el externo:

Nota : tenga cuidado al usar mergeMap, cada nuevo hilo interno funcionará hasta que se dé de baja del externo. En el ejemplo anterior, el número de subprocesos internos crece cada segundo, al final puede haber tantos subprocesos que la computadora no puede hacer frente a la carga.
concatAll / concatMap
El método mergeMap es excelente cuando no te importa el orden de ejecución de los hilos internos, pero ¿qué pasa si lo necesitas? ¿Supongamos que queremos que la próxima solicitud del servidor se ejecute solo después de recibir una respuesta de la anterior?
Para tales fines, el operador HOO concatAll / concatMap es adecuado. Este operador, habiéndose suscrito al hilo interno, espera hasta que termine, y solo entonces se suscribe al siguiente.
Si durante la ejecución de un hilo desciende uno nuevo, entonces se coloca en la cola hasta que se complete el anterior.
En el ejemplo anterior, creamos dos hilos usando el método del temporizador. Para mayor claridad, utilicé el operador mapTo para mostrar diferentes valores. El primer subproceso generará 1, el segundo - 2. Se crea un subproceso externo utilizando el método of, que toma dos de los anteriores observables como entrada.
La instrucción concatAll primero recibe firstInnerObservable, se suscribe a ella y espera a que se complete, y solo después de completar las primeras suscripciones a secondInnerObservable. Así es como se verá la transmisión externa:

Si reemplazamos concatAll con mergeAll, la secuencia se verá así:
of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log });

switchAll / switchMap
Este operador difiere de los anteriores en que cuando recibe una nueva secuencia, inmediatamente se da de baja de la anterior y se suscribe a la nueva.
Tome el ejemplo anterior y reemplace concatAll por switchAll, y vea cómo se comporta el flujo externo:
of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log });

Solo el valor de la segunda secuencia interna se introdujo en la secuencia externa. Esto se debe a que switchMap canceló la suscripción del primero cuando recibió el segundo subproceso.
¿Cuándo se necesita esto? Por ejemplo, al implementar una búsqueda de datos. Si la respuesta del servidor aún no ha llegado y ya hemos enviado una nueva solicitud, entonces no tenemos que esperar a la anterior.
escape / escapeMapa
exhaust es exactamente lo contrario de la instrucción switchAll, y su comportamiento es similar al de concatAll. Este método, al suscribirse a la transmisión, espera a que se complete. Si se trata de una nueva secuencia, simplemente se descarta.
of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log });

En el ejemplo anterior, no obtuvimos un deuce, porque en ese momento el operador estaba esperando la finalización del primer hilo, y simplemente dejó caer el segundo.
Creo que muchos tienen una pregunta, ¿cuándo puede ser necesario ese comportamiento? Un buen ejemplo es el formulario de inicio de sesión. No tiene sentido enviar varias solicitudes al servidor hasta que se complete la actual.
Estamos finalizando la solicitud.
Recordamos el
ejemplo del
segundo artículo . En él, implementamos una búsqueda en GitHub y utilizamos el operador mergeMap para enviar solicitudes al servidor. Ahora que conocemos las características de este operador, ¿es realmente adecuado en nuestro caso?
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
Supongamos que el servidor GitHub se sobrecargará en gran medida, y procesar nuestra respuesta llevará mucho tiempo. ¿Qué podría salir mal en este caso?
Supongamos que un usuario ingresó algunos datos, no esperó una respuesta e ingresó otros nuevos. En este caso, enviaremos la segunda solicitud al servidor. Sin embargo, nadie garantiza que la respuesta a la primera solicitud llegue antes.
Como al operador mergeMap no le importa en qué orden procesar los flujos internos, en el caso de que la primera solicitud se ejecute más tarde que la segunda, borraremos los datos reales. Por lo tanto, propongo reemplazar el método mergeMap con switchMap:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
Ahora, si el usuario ingresa datos nuevos, switchMap se dará de baja de la secuencia anterior y se suscribirá a la nueva.
Vale la pena señalar que nuestra solicitud http continuará colgando hasta que el servidor responda. Pero, dado que nos hemos dado de baja de la secuencia interna, la respuesta no caerá en la secuencia externa.
Nota : si trabaja con Angular y usa HttpClient para trabajar con http, entonces no puede preocuparse por cancelar la solicitud en sí. HttpClient puede hacer esto por usted al darse de baja.
Cancelar http
La API de recuperación tiene la capacidad de cancelar la solicitud http utilizando el
AbortController . Cuando se combina con el operador switchMap, esta funcionalidad ahorrará tráfico de usuarios.
Reescribamos un poco nuestro ejemplo. Y cree un método que ajuste la llamada de búsqueda en observable:
const createCancellableRequest = (url) => {
También cambie el método getUsersRepsFromApi:
const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); }
Ahora el método devuelve no prometedor, sino observable. Por lo tanto, eliminamos el contenedor del switchMap:
switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) )
Nota : en RxJS versión 6.5, agregaron la
declaración fromFetch , que a su vez llama al método abortar debajo del capó, por lo que ya no necesita escribir su "bicicleta".
Eso es todo! Todo el código de muestra se puede encontrar
aquí .
Conclusión
Hoy vimos qué es HOO y algunos operadores muy útiles de esta categoría. Por supuesto, estos estaban lejos de todos ellos. Para obtener información más detallada y detallada, recomiendo visitar la
documentación de RxJS.
En el próximo artículo planeo considerar cuál es la diferencia entre los observables en frío y en caliente.
Finalmente: ¡no use la suscripción en la suscripción, porque hay HOO!