Bienvenido: esta es la tercera parte de mi serie de artículos de Flutter Architecture.
Esta vez haremos una pequeña inmersión en el reino mágico de las extensiones reactivas (Rx). Me centraré en las funciones Rx más utilizadas y explicaré su aplicación. Si no ha leído la publicación anterior, ahora es el momento de hacerlo antes de continuar.
RxDart es una implementación del concepto Rx para el lenguaje Dart, gracias a Frank Pepermans y Brian Egan por eso . Si anteriormente usó Rx en otros idiomas, probablemente notará una diferencia en el nombre de una serie de funciones, pero es poco probable que esto le cause dificultades.
El código para la prueba está aquí .
Hasta ahora, hemos utilizado transmisiones como una forma de transferir datos de un lugar a otro en nuestra aplicación, pero pueden hacer mucho más. Echemos un vistazo a algunas de las características que Rx agrega a Streams.
Crear observables
Como se dijo anteriormente , los Observables son versiones Rx de transmisiones con excelentes características. Hay varias formas interesantes de crearlos:
Fuera de la corriente
Cualquier secuencia se puede convertir en observable pasándola al constructor:
var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print);
Eventos recurrentes
var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print);
De esta manera, se construirá un Observable que muestre valores con un período específico. Para que pueda reemplazar el temporizador.
De un solo valor
A veces, una API espera un Stream / Observable donde solo tienes un valor. Para tales casos, el Observable tiene una fábrica.
var justObservable = Observable<int>.just(42); justObservable.listen(print);
Del futuro
Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print);
Crear un Observable
desde el futuro esperará a que se complete el Futuro y devolverá un valor para su resultado o null
si el valor no se devuelve. Otra forma de crear una secuencia desde Future es llamar a toStream()
para cualquier Future.
Tal vez se pregunte cuál es el punto de convertir el Futuro en un Observable / Stream en lugar de solo esperarlo. Tenga la seguridad de que esto quedará claro cuando examinemos las funciones disponibles para manipular datos mientras están en la secuencia.
Sujetos
Subjects
son un reemplazo para StreamController
en RxDart, y así es como se implementan en algún lugar de las entrañas de la biblioteca.
Pero su comportamiento es ligeramente diferente de los StreamControllers básicos:
- puede aplicar
listen()
directamente a un Asunto, sin acceder a la propiedad Stream - cualquier número de suscripciones está disponible, y todos los oyentes reciben los mismos datos al mismo tiempo
- Hay tres variedades de temas, que se explican a continuación con ejemplos:
PublishSubjects
PublishSubjects
comporta como StreamControllers
, excepto por la posibilidad de muchos oyentes:
var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1");
Ejecute este código y obtendrá:
Item1 ITEM2 Item2 ITEM3 Item3
Está claro que el segundo oyente que llegó tarde a la fiesta (los llamaremos suscriptores tardíos) perdió el primer punto. Para evitar esto, puede usar BehaviourSubject
Sujeto de comportamiento
Con BehaviourSubject
cada nuevo suscriptor recibirá primero el último valor aceptado:
var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3");
A la salida
Item1 ITEM2 ITEM3 Item2 Item3
Puede ver que Item1
pierde para el segundo suscriptor, pero recibe Item2
. Puede sorprenderle que el segundo suscriptor reciba el Item3
antes de que el primer suscriptor reciba el Item2
. Esto se debe a que la secuencia de servicio de suscriptores no está garantizada, aunque todos los suscriptores reciben datos en el orden correcto. BehaviourSubject
solo almacena en caché el último elemento recibido para los suscriptores tardíos. Si necesita almacenar más elementos en caché, puede usar ReplaySubject . En la mayoría de los casos esto no es necesario.
Manipulando datos sobre la marcha

La verdadera fortaleza de Rx radica en el hecho de que le permite procesar datos durante la transmisión a través de la transmisión. Cada uno de los métodos Rx devuelve una nueva secuencia con los datos resultantes (como en la ilustración), lo que significa que puede vincularlos en una sola tubería de procesamiento, y esto hace de Rx una herramienta extremadamente poderosa.
Mapa
Si hay alguna operación de Stream que no quiero perderme más que nada, entonces este es map()
. Lo que map()
hace es que toma cada elemento de datos para ser transferido y le aplica una determinada función, después de lo cual coloca el resultado en la secuencia resultante. Un simple ejemplo:

var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3");
Resultado:
ITEM1 ITEM2 ITEM3
Pero map
no necesita devolver el mismo tipo de datos que recibe como entrada. El siguiente ejemplo tomará enteros en lugar de cadenas. Además, vincularemos dos transformaciones:
var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3);
o algo como esto:

class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a));
Uno de los usos más útiles de .map
es cuando obtiene datos en un formato de alguna API REST o de una base de datos y desea que se conviertan en sus propios objetos:
class User { final String name; final String adress; final String phoneNumber; final int age;
Noto que no solo Streams, sino también cualquier Iterable ofrece una función de map
que puede usar para transformaciones en listas.
Donde
Si solo está interesado en ciertos valores que ocurren en la secuencia, puede usar la función .where()
lugar de usar la if
en su escucha, esto es más expresivo y más fácil de leer:
var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3);
Debounce
Esta es una de las pequeñas perlas de Rx! Imagine que tiene un campo de búsqueda que llama a la API REST si se cambia su texto. Hacer una llamada a la API por cada pulsación de tecla es costoso. Por lo tanto, le gustaría hacer una llamada solo si el usuario hace una pausa por un momento. Para esto, se utiliza la función debounce()
, que se traga todos los eventos entrantes si no son seguidos por una pausa.
var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC");
Por lo tanto, si convierte el controlador TextField.onChanged
en Observable
, obtendrá una solución elegante.
Expandir
Si su flujo fuente emite matrices de objetos y desea procesar cada objeto usted mismo, puede usar .expand
, que hará exactamente eso:

Verá la aplicación de este método a continuación en el ejemplo de FireStore.
Fusionar
Si tiene varios subprocesos diferentes, pero desea procesar sus objetos juntos, puede usar .mergeWith
(en otras implementaciones de Rx simplemente merge
), que toma una matriz de subprocesos y devuelve un subproceso combinado.

.mergeWith
no garantiza que se .mergeWith
ningún orden en las transmisiones. Los datos se emiten en orden de entrada.
Por ejemplo, si tiene dos componentes que informan errores a través de una secuencia y desea que se muestren juntos en un cuadro de diálogo, puede hacer esto de la siguiente manera (pseudocódigo):
@override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); }
o si desea una visualización combinada de mensajes de varias redes sociales, puede verse así (pseudocódigo):
final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]);
Zipwith
zipWith
también combina una secuencia con otra. Pero, a diferencia de .mergeWith
, no envía datos tan pronto como recibe un elemento de una de sus secuencias de origen. Espera hasta que lleguen los elementos de ambas secuencias de origen, y luego los combina usando la función de zipper
provista:

La firma zipWith
parece aterradora, pero ahora la vemos:
Un ejemplo muy simplificado:
new Observable.just(1)
Una aplicación más práctica es si necesita esperar dos funciones asincrónicas que devuelven Future
, y desea procesar los datos tan pronto como se devuelvan ambos resultados. En este ejemplo ligeramente inventado, presentamos dos API REST: una devuelve User
, la otra devuelve Product
como cadenas JSON, y queremos esperar ambas llamadas antes de devolver el objeto Invoice
.
class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } }
Mirando la salida, puede ver cómo se hace esto de forma asíncrona.
Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99
Combinelatest
combineLatest
también combina valores de transmisión, pero de una manera ligeramente diferente a la merge
y zip
. Escucha más hilos y emite un valor combinado cada vez que llega un nuevo valor de uno de los hilos. Es interesante que genera no solo el valor modificado, sino también los últimos valores obtenidos de todas las demás secuencias de origen. Mire cuidadosamente esta animación:

Antes de que combineLates
su primer valor, todos los hilos de origen deben recibir al menos una entrada.
A diferencia de los métodos que se usaron anteriormente, combineLatest
es estático. Además, dado que Dart no permite la sobrecarga del operador, existen versiones de combLastest
dependiendo del número de secuencias de origen: combineLatest2 ... combineLatest9
combineLatest
buen uso, por ejemplo, si tiene dos Observable<bool>
que indican que algunas partes de su aplicación están ocupadas, y desea mostrar la rueda ocupada si una de ellas está ocupada. Podría verse así (pseudocódigo):
class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; }
En su interfaz de usuario, puede usar isBusy
con StreamBuilder
para mostrar Spinner
si el valor resultante es verdadero.
combineLatest
característica muy adecuada en combinación con las secuencias de instantáneas de FireStore .
Imagine que desea crear una aplicación que muestre una fuente de noticias junto con un pronóstico del tiempo. Los mensajes de teletipo y los datos meteorológicos se almacenan en dos colecciones diferentes de FireStore. Ambos se actualizan de forma independiente. Desea mostrar actualizaciones de datos con StreamBuilder. Con combineLatest
es fácil:
class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } }
En su UI, se vería así: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).
Distinto
En el escenario descrito anteriormente, puede ocurrir que isBusyOne e isBusyTwo den el mismo valor, lo que conducirá a una actualización de la interfaz de usuario con los mismos datos. Para evitar esto, podemos usar .distinct()
. Asegura que los datos solo se transmiten si el valor del nuevo elemento es diferente del anterior. Por lo tanto, cambiaríamos el código a:
Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
y también demuestra que podemos combinar nuestras funciones en diferentes cadenas a voluntad.
Asyncmap
Además de map()
también hay una función asyncMap
, que le permite utilizar una función asincrónica como una función de mapa. Vamos a introducir una configuración ligeramente diferente para nuestro ejemplo de FireStore. Ahora el WeatherForecast necesario depende de la ubicación de NewsMessage y solo debe actualizarse cuando se recibe un nuevo NewsMessage :
Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); }
El Observable devuelto por getDependendMessages generará un nuevo CombinedMessage cada vez que newsCollection cambie.
Debug Observables
Mirando las elegantes cadenas de llamadas Rx, parece casi imposible depurar una expresión como esta:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
Pero tenga en cuenta que =>
es solo una forma corta de una función anónima. Usando Convertir para bloquear el cuerpo , obtendrá:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); });
Y ahora podemos establecer un punto de interrupción o agregar declaraciones de impresión en cada paso de nuestra tubería.
Cuidado con los efectos secundarios.
Si desea capitalizar en Rx para hacer que su código sea más robusto, siempre tenga en cuenta que Rx es conversión de datos al moverlo "a lo largo de la cinta transportadora". Por lo tanto, nunca llame a funciones que cambien cualquier variable / estado fuera de la tubería de procesamiento hasta que llegue a la función .listen.
En lugar de hacerlo:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen();
haz esto:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); });
El deber de map()
es transformar los datos en la secuencia, ¡Y NADA MÁS! Si la función de visualización pasada hace algo más, se considerará como un efecto secundario, generando posibles errores que son difíciles de detectar al leer el código.
Algunas reflexiones sobre la liberación de recursos
Para evitar pérdidas de memoria, siempre llame a cancel()
para suscripciones, dispose()
para StreamControllers, close()
para Asignaturas, tan pronto como ya no las necesite.
Conclusión
Felicitaciones si te quedaste conmigo hasta este momento. Ahora no solo puede usar Rx para facilitarle la vida, sino también prepararse para las próximas publicaciones en las que profundizaremos en los detalles de RxVMS .