RxDart: transformaciones mágicas de flujos

Bienvenido: esta es la tercera parte de mi serie de artículos de Flutter Architecture.



Esta vez haremos una pequeña inmersión en el reino mágico de las extensiones reactivas (Rx). Me centraré en las funciones Rx más utilizadas y explicaré su aplicación. Si no ha leído la publicación anterior, ahora es el momento de hacerlo antes de continuar.


RxDart es una implementación del concepto Rx para el lenguaje Dart, gracias a Frank Pepermans y Brian Egan por eso . Si anteriormente usó Rx en otros idiomas, probablemente notará una diferencia en el nombre de una serie de funciones, pero es poco probable que esto le cause dificultades.


El código para la prueba está aquí .


Hasta ahora, hemos utilizado transmisiones como una forma de transferir datos de un lugar a otro en nuestra aplicación, pero pueden hacer mucho más. Echemos un vistazo a algunas de las características que Rx agrega a Streams.


Crear observables


Como se dijo anteriormente , los Observables son versiones Rx de transmisiones con excelentes características. Hay varias formas interesantes de crearlos:


Fuera de la corriente


Cualquier secuencia se puede convertir en observable pasándola al constructor:


var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print); 

Eventos recurrentes


 var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print); 

De esta manera, se construirá un Observable que muestre valores con un período específico. Para que pueda reemplazar el temporizador.


De un solo valor


A veces, una API espera un Stream / Observable donde solo tienes un valor. Para tales casos, el Observable tiene una fábrica.


 var justObservable = Observable<int>.just(42); justObservable.listen(print); //   : 42 

Del futuro


  Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print); 

Crear un Observable desde el futuro esperará a que se complete el Futuro y devolverá un valor para su resultado o null si el valor no se devuelve. Otra forma de crear una secuencia desde Future es llamar a toStream() para cualquier Future.


Tal vez se pregunte cuál es el punto de convertir el Futuro en un Observable / Stream en lugar de solo esperarlo. Tenga la seguridad de que esto quedará claro cuando examinemos las funciones disponibles para manipular datos mientras están en la secuencia.


Sujetos


Subjects son un reemplazo para StreamController en RxDart, y así es como se implementan en algún lugar de las entrañas de la biblioteca.


Pero su comportamiento es ligeramente diferente de los StreamControllers básicos:


  • puede aplicar listen() directamente a un Asunto, sin acceder a la propiedad Stream
  • cualquier número de suscripciones está disponible, y todos los oyentes reciben los mismos datos al mismo tiempo
  • Hay tres variedades de temas, que se explican a continuación con ejemplos:

PublishSubjects


PublishSubjects comporta como StreamControllers , excepto por la posibilidad de muchos oyentes:


 var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); //    subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); //        await Future.delayed(Duration(seconds: 5)); //    subject.close; 

Ejecute este código y obtendrá:


 Item1 ITEM2 Item2 ITEM3 Item3 

Está claro que el segundo oyente que llegó tarde a la fiesta (los llamaremos suscriptores tardíos) perdió el primer punto. Para evitar esto, puede usar BehaviourSubject


Sujeto de comportamiento


Con BehaviourSubject cada nuevo suscriptor recibirá primero el último valor aceptado:


 var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3"); 

A la salida


 Item1 ITEM2 ITEM3 Item2 Item3 

Puede ver que Item1 pierde para el segundo suscriptor, pero recibe Item2 . Puede sorprenderle que el segundo suscriptor reciba el Item3 antes de que el primer suscriptor reciba el Item2 . Esto se debe a que la secuencia de servicio de suscriptores no está garantizada, aunque todos los suscriptores reciben datos en el orden correcto. BehaviourSubject solo almacena en caché el último elemento recibido para los suscriptores tardíos. Si necesita almacenar más elementos en caché, puede usar ReplaySubject . En la mayoría de los casos esto no es necesario.


Manipulando datos sobre la marcha



La verdadera fortaleza de Rx radica en el hecho de que le permite procesar datos durante la transmisión a través de la transmisión. Cada uno de los métodos Rx devuelve una nueva secuencia con los datos resultantes (como en la ilustración), lo que significa que puede vincularlos en una sola tubería de procesamiento, y esto hace de Rx una herramienta extremadamente poderosa.


Mapa


Si hay alguna operación de Stream que no quiero perderme más que nada, entonces este es map() . Lo que map() hace es que toma cada elemento de datos para ser transferido y le aplica una determinada función, después de lo cual coloca el resultado en la secuencia resultante. Un simple ejemplo:



 var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3"); 

Resultado:


 ITEM1 ITEM2 ITEM3 

Pero map no necesita devolver el mismo tipo de datos que recibe como entrada. El siguiente ejemplo tomará enteros en lugar de cadenas. Además, vincularemos dos transformaciones:


 var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3); 

o algo como esto:



 class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a)); 

Uno de los usos más útiles de .map es cuando obtiene datos en un formato de alguna API REST o de una base de datos y desea que se conviertan en sus propios objetos:


 class User { final String name; final String adress; final String phoneNumber; final int age; //       - //   factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // -  var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; //   json-,    API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); //    dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); }); 

Noto que no solo Streams, sino también cualquier Iterable ofrece una función de map que puede usar para transformaciones en listas.


Donde


Si solo está interesado en ciertos valores que ocurren en la secuencia, puede usar la función .where() lugar de usar la if en su escucha, esto es más expresivo y más fácil de leer:


 var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3 

Debounce


Esta es una de las pequeñas perlas de Rx! Imagine que tiene un campo de búsqueda que llama a la API REST si se cambia su texto. Hacer una llamada a la API por cada pulsación de tecla es costoso. Por lo tanto, le gustaría hacer una llamada solo si el usuario hace una pausa por un momento. Para esto, se utiliza la función debounce() , que se traga todos los eventos entrantes si no son seguidos por una pausa.


 var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); //    await Future.delayed(Duration(milliseconds: 700)); //       : 'ABC' 

Por lo tanto, si convierte el controlador TextField.onChanged en Observable , obtendrá una solución elegante.


Expandir


Si su flujo fuente emite matrices de objetos y desea procesar cada objeto usted mismo, puede usar .expand , que hará exactamente eso:


imagen


Verá la aplicación de este método a continuación en el ejemplo de FireStore.


Fusionar


Si tiene varios subprocesos diferentes, pero desea procesar sus objetos juntos, puede usar .mergeWith (en otras implementaciones de Rx simplemente merge ), que toma una matriz de subprocesos y devuelve un subproceso combinado.


imagen


.mergeWith no garantiza que se .mergeWith ningún orden en las transmisiones. Los datos se emiten en orden de entrada.


Por ejemplo, si tiene dos componentes que informan errores a través de una secuencia y desea que se muestren juntos en un cuadro de diálogo, puede hacer esto de la siguiente manera (pseudocódigo):


 @override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); } 

o si desea una visualización combinada de mensajes de varias redes sociales, puede verse así (pseudocódigo):


 final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]); 

Zipwith


zipWith también combina una secuencia con otra. Pero, a diferencia de .mergeWith , no envía datos tan pronto como recibe un elemento de una de sus secuencias de origen. Espera hasta que lleguen los elementos de ambas secuencias de origen, y luego los combina usando la función de zipper provista:


imagen


La firma zipWith parece aterradora, pero ahora la vemos:


 // R :   Stream/Observable // S :   Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) 

Un ejemplo muy simplificado:


 new Observable.just(1) // .just()  Observable,    .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); //  3 

Una aplicación más práctica es si necesita esperar dos funciones asincrónicas que devuelven Future , y desea procesar los datos tan pronto como se devuelvan ambos resultados. En este ejemplo ligeramente inventado, presentamos dos API REST: una devuelve User , la otra devuelve Product como cadenas JSON, y queremos esperar ambas llamadas antes de devolver el objeto Invoice .


 class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } //  HTTP ,  Product,  JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } //  HTTP ,  User,  JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); //        await Future.delayed(Duration(seconds: 5)); }); } 

Mirando la salida, puede ver cómo se hace esto de forma asíncrona.


 Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99 

Combinelatest


combineLatest también combina valores de transmisión, pero de una manera ligeramente diferente a la merge y zip . Escucha más hilos y emite un valor combinado cada vez que llega un nuevo valor de uno de los hilos. Es interesante que genera no solo el valor modificado, sino también los últimos valores obtenidos de todas las demás secuencias de origen. Mire cuidadosamente esta animación:


imagen


Antes de que combineLates su primer valor, todos los hilos de origen deben recibir al menos una entrada.


A diferencia de los métodos que se usaron anteriormente, combineLatest es estático. Además, dado que Dart no permite la sobrecarga del operador, existen versiones de combLastest dependiendo del número de secuencias de origen: combineLatest2 ... combineLatest9


combineLatest buen uso, por ejemplo, si tiene dos Observable<bool> que indican que algunas partes de su aplicación están ocupadas, y desea mostrar la rueda ocupada si una de ellas está ocupada. Podría verse así (pseudocódigo):


 class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; } 

En su interfaz de usuario, puede usar isBusy con StreamBuilder para mostrar Spinner si el valor resultante es verdadero.


combineLatest característica muy adecuada en combinación con las secuencias de instantáneas de FireStore .


Imagine que desea crear una aplicación que muestre una fuente de noticias junto con un pronóstico del tiempo. Los mensajes de teletipo y los datos meteorológicos se almacenan en dos colecciones diferentes de FireStore. Ambos se actualizan de forma independiente. Desea mostrar actualizaciones de datos con StreamBuilder. Con combineLatest es fácil:


 class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } } 

En su UI, se vería así: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


Distinto


En el escenario descrito anteriormente, puede ocurrir que isBusyOne e isBusyTwo den el mismo valor, lo que conducirá a una actualización de la interfaz de usuario con los mismos datos. Para evitar esto, podemos usar .distinct() . Asegura que los datos solo se transmiten si el valor del nuevo elemento es diferente del anterior. Por lo tanto, cambiaríamos el código a:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct(); 

y también demuestra que podemos combinar nuestras funciones en diferentes cadenas a voluntad.


Asyncmap


Además de map() también hay una función asyncMap , que le permite utilizar una función asincrónica como una función de mapa. Vamos a introducir una configuración ligeramente diferente para nuestro ejemplo de FireStore. Ahora el WeatherForecast necesario depende de la ubicación de NewsMessage y solo debe actualizarse cuando se recibe un nuevo NewsMessage :


 Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); } 

El Observable devuelto por getDependendMessages generará un nuevo CombinedMessage cada vez que newsCollection cambie.


Debug Observables


Mirando las elegantes cadenas de llamadas Rx, parece casi imposible depurar una expresión como esta:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); 

Pero tenga en cuenta que => es solo una forma corta de una función anónima. Usando Convertir para bloquear el cuerpo , obtendrá:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); 

Y ahora podemos establecer un punto de interrupción o agregar declaraciones de impresión en cada paso de nuestra tubería.


Cuidado con los efectos secundarios.


Si desea capitalizar en Rx para hacer que su código sea más robusto, siempre tenga en cuenta que Rx es conversión de datos al moverlo "a lo largo de la cinta transportadora". Por lo tanto, nunca llame a funciones que cambien cualquier variable / estado fuera de la tubería de procesamiento hasta que llegue a la función .listen.
En lugar de hacerlo:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen(); 

haz esto:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); }); 

El deber de map() es transformar los datos en la secuencia, ¡Y NADA MÁS! Si la función de visualización pasada hace algo más, se considerará como un efecto secundario, generando posibles errores que son difíciles de detectar al leer el código.


Algunas reflexiones sobre la liberación de recursos


Para evitar pérdidas de memoria, siempre llame a cancel() para suscripciones, dispose() para StreamControllers, close() para Asignaturas, tan pronto como ya no las necesite.


Conclusión


Felicitaciones si te quedaste conmigo hasta este momento. Ahora no solo puede usar Rx para facilitarle la vida, sino también prepararse para las próximas publicaciones en las que profundizaremos en los detalles de RxVMS .

Source: https://habr.com/ru/post/451292/


All Articles