RxDart: transformações mágicas de fluxos

Bem-vindo - Esta é a terceira parte da minha série de artigos sobre Flutter Architecture.



Desta vez, faremos um pequeno mergulho no reino mágico das extensões reativas (Rx). Vou me concentrar nas funções Rx mais usadas e explicar sua aplicação. Se você não leu a postagem anterior, agora é a hora de fazer isso antes de prosseguir.


O RxDart é uma implementação do conceito Rx para a linguagem Dart, graças a Frank Pepermans e Brian Egan por isso . Se você já usou o Rx em outros idiomas, provavelmente notará uma diferença na nomeação de várias funções, mas é improvável que isso lhe cause dificuldades.


O código para teste está aqui .


Até agora, usamos fluxos como uma maneira de transferir dados de um local para outro em nosso aplicativo, mas eles podem fazer muito mais. Vamos dar uma olhada em alguns dos recursos que o Rx adiciona ao Streams.


Criando Observáveis


Como afirmado anteriormente , os Observables são versões Rx de fluxos com ótimos recursos. Existem várias maneiras interessantes de criá-los:


Fora do fluxo


Qualquer fluxo pode ser convertido em Observable passando-o para o construtor:


var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print); 

Eventos recorrentes


 var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print); 

Dessa maneira, será construído um Observável que exibe valores com um período específico. Então você pode substituir o temporizador.


De um único valor


Às vezes, uma API espera um Stream / Observable em que você tenha apenas um valor. Para tais casos, o Observable possui uma fábrica.


 var justObservable = Observable<int>.just(42); justObservable.listen(print); //   : 42 

Do futuro


  Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print); 

Criar um Observable partir do Futuro aguardará que o Futuro seja concluído e retorne um valor para seu resultado ou null se o valor não for retornado. Outra maneira de criar um fluxo a partir do Future é chamar toStream() para qualquer Future.


Você pode estar se perguntando qual é o sentido de converter o Futuro em um Observável / Fluxo em vez de apenas esperar por ele. Tenha certeza, isso ficará claro quando examinarmos as funções disponíveis para manipular dados enquanto eles estão no fluxo.


Assuntos


Subjects substituem o StreamController no RxDart e é assim que eles são implementados em algum lugar nas entranhas da biblioteca.


Mas o comportamento deles é um pouco diferente dos StreamControllers básicos:


  • você pode aplicar listen() diretamente a um Assunto sem acessar a propriedade Stream
  • qualquer número de assinaturas está disponível e todos os ouvintes recebem os mesmos dados ao mesmo tempo
  • Existem três variedades de assuntos, que são explicadas abaixo com exemplos:

PublishSubjects


PublishSubjects se comportam como StreamControllers , exceto pela possibilidade de muitos ouvintes:


 var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); //    subject.listen((item) => print(item.toUpperCase())); subject.add("Item2"); subject.add("Item3"); //        await Future.delayed(Duration(seconds: 5)); //    subject.close; 

Execute este código e você obterá:


 Item1 ITEM2 Item2 ITEM3 Item3 

É claro que o segundo ouvinte que estava atrasado para a festa (nós os chamaremos de assinantes atrasados) perdeu o primeiro ponto. Para evitar isso, você pode usar o BehaviourSubject


Behaviourubject


Com BehaviourSubject cada novo assinante receberá primeiro o último valor aceito:


 var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3"); 

Na saída


 Item1 ITEM2 ITEM3 Item2 Item3 

Você pode ver que o Item1 perdido para o segundo assinante, mas recebe o Item2 . Você pode se surpreender que o segundo assinante receba o Item3 antes que o primeiro assinante receba o Item2 . Isso ocorre porque a sequência de atendimento aos assinantes não é garantida, embora todos os assinantes recebam dados na ordem correta. BehaviourSubject armazena em cache apenas o último item recebido para assinantes atrasados. Se você precisar armazenar mais elementos em cache, poderá usar ReplaySubject . Na maioria dos casos, isso não é necessário.


Manipulando dados em tempo real



A verdadeira força do Rx reside no fato de permitir processar dados durante a transmissão pelo fluxo. Cada um dos métodos Rx retorna um novo fluxo com os dados resultantes (como na ilustração), o que significa que você pode vinculá-los em um pipeline de processamento, e isso faz do Rx uma ferramenta extremamente poderosa.


Mapa


Se houver alguma operação de Stream que eu não queira mais perder, esse é o map() . O que map() faz é que leva cada item de dados a ser transferido e aplica uma certa função a ele, após o que coloca o resultado no fluxo resultante. Um exemplo simples:



 var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3"); 

Resultado:


 ITEM1 ITEM2 ITEM3 

Mas o map não map necessário para retornar o mesmo tipo de dados que ele recebe como entrada. O exemplo a seguir terá números inteiros em vez de cadeias. Além disso, vincularemos duas transformações:


 var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3); 

ou algo parecido com isto:



 class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a)); 

Um dos usos mais úteis do .map é quando você obtém dados em um formato de alguma API REST ou de um banco de dados e deseja que eles sejam convertidos em seus próprios objetos:


 class User { final String name; final String adress; final String phoneNumber; final int age; //       - //   factory User.fromJson(String jsonString) { var jsonMap = json.decode(jsonString); return User( jsonMap['name'], jsonMap['adress'], jsonMap['phoneNumber'], jsonMap['age'], ); } User(this.name, this.adress, this.phoneNumber, this.age); @override String toString() { return '$name - $adress - $phoneNumber - $age'; } } void main() { test('Map', () { // -  var jsonStrings = [ '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }', '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }', '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }', ]; //   json-,    API/DB. var dataStreamFromAPI = new PublishSubject<String>(); dataStreamFromAPI .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User .listen((user) => print(user.toString())); //    dataStreamFromAPI.add(jsonStrings[0]); dataStreamFromAPI.add(jsonStrings[1]); dataStreamFromAPI.add(jsonStrings[2]); }); 

Observo que não apenas o Streams, mas também qualquer Iterable oferece uma função de map que você pode usar para transformações em listas.


Onde


Se você estiver interessado apenas em determinados valores que ocorrem no fluxo, poderá usar a função .where() vez de usar a if no seu ouvinte, isso é mais expressivo e fácil de ler:


 var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3); //: This only prints odd numbers: 1 This only prints even numbers: 2 This only prints odd numbers: 3 

Debounce


Esta é uma das pequenas pérolas de Rx! Imagine que você tenha um campo de pesquisa que chama a API REST se o texto for alterado. Fazer uma chamada de API para cada pressionamento de tecla é caro. Assim, você gostaria de fazer uma ligação apenas se o usuário parar por um momento. Para isso, é usada a função debounce() , que engole todos os eventos recebidos se não forem seguidos por uma pausa.


 var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC"); //    await Future.delayed(Duration(milliseconds: 700)); //       : 'ABC' 

Portanto, se você converter o manipulador TextField.onChanged em Observable , obterá uma solução elegante.


Expandir


Se o seu Stream de origem emitir matrizes de objetos e você quiser processar cada objeto por conta própria, poderá usar .expand , o que fará exatamente isso:


imagem


Você verá a aplicação deste método abaixo no exemplo do FireStore.


Mesclar


Se você possui vários threads diferentes, mas deseja processar seus objetos juntos, pode usar .mergeWith (em outras implementações de Rx apenas merge ), que pega uma matriz de threads e retorna um thread mesclado.


imagem


.mergeWith não garante que nenhuma ordem nos fluxos seja mesclada. Os dados são emitidos em ordem de entrada.


Por exemplo, se você possui dois componentes que relatam erros através de um fluxo e deseja que eles sejam exibidos juntos em um diálogo, é possível fazer o seguinte (pseudo-código):


 @override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); } 

ou se você quiser uma exibição combinada de mensagens de várias redes sociais, pode ser assim (pseudocódigo):


 final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]); 

Zipwith


zipWith também mescla um fluxo com outro. Mas, diferentemente de .mergeWith , ele não envia dados assim que recebe um elemento de um de seus fluxos de origem. Ele espera até que os elementos dos dois fluxos de origem cheguem e os combina usando a função zipper fornecida:


imagem


A assinatura zipWith parece assustadora, mas agora olhamos para ela:


 // R :   Stream/Observable // S :   Stream/Observable // zipper: - Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) 

Um exemplo muito simplificado:


 new Observable.just(1) // .just()  Observable,    .zipWith(new Observable.just(2), (one, two) => one + two) .listen(print); //  3 

Uma aplicação mais prática é se você precisar esperar por duas funções assíncronas que retornam Future e desejar processar os dados assim que os resultados forem retornados. Neste exemplo um pouco artificial, apresentamos duas APIs REST: uma retorna User , a outra retorna Product como strings JSON, e queremos aguardar as duas chamadas antes de retornar o objeto Invoice .


 class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } } //  HTTP ,  Product,  JSON Future<String> getProduct() async { print("Started getting product"); await Future.delayed(Duration(seconds: 2)); print("Finished getting product"); return '{"name": "Flux compensator", "price": 99999.99}'; } //  HTTP ,  User,  JSON Future<String> getUser() async { print("Started getting User"); await Future.delayed(Duration(seconds: 4)); print("Finished getting User"); return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }'; } void main() { test('zipWith', () async { var userObservable = Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString)); var productObservable = Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)); Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>( productObservable, (user, product) => Invoice(user, product)); print("Start listening for invoices"); invoiceObservable.listen((invoice) => invoice.printInvoice()); //        await Future.delayed(Duration(seconds: 5)); }); } 

Observando a saída, você pode ver como isso é feito de forma assíncrona


 Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99 

Combinelatest


combineLatest também mescla valores de fluxo, mas de uma maneira ligeiramente diferente de merge e zip . Ele escuta mais threads e emite um valor combinado sempre que um novo valor chega de um dos threads. É interessante que ele gere não apenas o valor alterado, mas também os últimos valores obtidos de todos os outros fluxos de origem. Veja atentamente esta animação:


imagem


Antes que combineLates seu primeiro valor, todos os encadeamentos de origem devem receber pelo menos uma entrada.


Diferente dos métodos usados ​​anteriormente, o combineLatest é estático. Além disso, como o Dart não permite sobrecarga do operador, existem versões do combLastest dependendo do número de fluxos de origem: combineLatest2 ... combineLatest9


combineLatest bom uso, por exemplo, se você tiver dois Observable<bool> que sinalizam que algumas partes do seu aplicativo estão ocupadas e deseja exibir o botão giratório Ocupado, se um deles estiver ocupado. Pode ficar assim (pseudo-código):


 class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; } 

Na sua interface do usuário, você pode usar o isBusy com o StreamBuilder para exibir o Spinner se o valor resultante for verdadeiro.


combineLatest recurso muito adequado em combinação com os fluxos de captura instantânea do FireStore .


Imagine que você deseja criar um aplicativo que exiba um feed de notícias e uma previsão do tempo. As mensagens do relógio e os dados meteorológicos são armazenados em duas coleções diferentes do FireStore. Ambos são atualizados independentemente. Você deseja exibir atualizações de dados usando o StreamBuilder. Com o combineLatest é fácil:


 class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } } 

Na sua interface do usuário, seria algo como isto: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


Distinto


No cenário descrito acima, pode acontecer que isBusyOne e isBusyTwo forneçam o mesmo valor, o que levará a uma atualização da interface do usuário com os mesmos dados. Para evitar isso, podemos usar .distinct() . Ele garante que os dados sejam transmitidos apenas se o valor do novo elemento for diferente do último. Assim, mudaríamos o código para:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct(); 

e também demonstra que podemos combinar nossas funções em diferentes cadeias à vontade.


Asyncmap


Além de map() também há uma função asyncMap , que permite usar uma função assíncrona como uma função de mapa. Vamos introduzir uma configuração ligeiramente diferente para o nosso exemplo do FireStore. Agora, o WeatherForecast necessário depende da localização do NewsMessage e só deve ser atualizado quando um novo NewsMessage for recebido :


 Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); } 

O Observable retornado por getDependendMessages gerará um novo CombinedMessage toda vez que o newsCollection for alterado.


Observáveis ​​de depuração


Observando as elegantes cadeias de chamadas Rx, parece quase impossível depurar uma expressão como esta:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); 

Mas lembre-se de que => é apenas um pequeno formulário para uma função anônima. Usando Converter para bloquear o corpo , você obterá:


 Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); 

E agora podemos definir um ponto de interrupção ou adicionar instruções de impressão em todas as etapas do nosso pipeline.


Cuidado com os efeitos colaterais


Se você deseja capitalizar o Rx para tornar seu código mais robusto, lembre-se sempre de que o Rx é uma conversão de dados ao movê-lo "ao longo da correia transportadora". Portanto, nunca chame funções que alterem quaisquer variáveis ​​/ estados fora do pipeline de processamento até atingir a função .listen.
Em vez de fazer isso:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen(); 

faça o seguinte:


 Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); }); 

O dever do map() é transformar os dados no fluxo, E NADA MAIS! Se a função de exibição passada fizer outra coisa, ela será considerada um efeito colateral, gerando erros em potencial que são difíceis de detectar ao ler o código.


Algumas reflexões sobre como liberar recursos


Para evitar vazamentos de memória, sempre chame cancel() para assinaturas, dispose() para StreamControllers, close() para Assuntos, assim que você não precisar mais deles.


Conclusão


Parabéns se você ficou comigo até esse momento. Agora você pode não apenas usar o Rx para facilitar sua vida, mas também se preparar para as próximas postagens nas quais iremos nos aprofundar nos detalhes do RxVMS .

Source: https://habr.com/ru/post/pt451292/


All Articles