Bem-vindo - Esta é a terceira parte da minha série de artigos sobre Flutter Architecture.
Desta vez, faremos um pequeno mergulho no reino mágico das extensões reativas (Rx). Vou me concentrar nas funções Rx mais usadas e explicar sua aplicação. Se você não leu a postagem anterior, agora é a hora de fazer isso antes de prosseguir.
O RxDart é uma implementação do conceito Rx para a linguagem Dart, graças a Frank Pepermans e Brian Egan por isso . Se você já usou o Rx em outros idiomas, provavelmente notará uma diferença na nomeação de várias funções, mas é improvável que isso lhe cause dificuldades.
O código para teste está aqui .
Até agora, usamos fluxos como uma maneira de transferir dados de um local para outro em nosso aplicativo, mas eles podem fazer muito mais. Vamos dar uma olhada em alguns dos recursos que o Rx adiciona ao Streams.
Criando Observáveis
Como afirmado anteriormente , os Observables são versões Rx de fluxos com ótimos recursos. Existem várias maneiras interessantes de criá-los:
Fora do fluxo
Qualquer fluxo pode ser convertido em Observable passando-o para o construtor:
var controller = new StreamController<String>(); var streamObservable = new Observable(controller.stream); streamObservable.listen(print);
Eventos recorrentes
var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() ); timerObservable.listen(print);
Dessa maneira, será construído um Observável que exibe valores com um período específico. Então você pode substituir o temporizador.
De um único valor
Às vezes, uma API espera um Stream / Observable em que você tenha apenas um valor. Para tais casos, o Observable possui uma fábrica.
var justObservable = Observable<int>.just(42); justObservable.listen(print);
Do futuro
Future<String> asyncFunction() async { return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult"); } test('Create Observable from Future', () async { print('start'); var fromFutureObservable = Observable.fromFuture(asyncFunction()); fromFutureObservable.listen(print);
Criar um Observable
partir do Futuro aguardará que o Futuro seja concluído e retorne um valor para seu resultado ou null
se o valor não for retornado. Outra maneira de criar um fluxo a partir do Future é chamar toStream()
para qualquer Future.
Você pode estar se perguntando qual é o sentido de converter o Futuro em um Observável / Fluxo em vez de apenas esperar por ele. Tenha certeza, isso ficará claro quando examinarmos as funções disponíveis para manipular dados enquanto eles estão no fluxo.
Assuntos
Subjects
substituem o StreamController
no RxDart e é assim que eles são implementados em algum lugar nas entranhas da biblioteca.
Mas o comportamento deles é um pouco diferente dos StreamControllers básicos:
- você pode aplicar
listen()
diretamente a um Assunto sem acessar a propriedade Stream - qualquer número de assinaturas está disponível e todos os ouvintes recebem os mesmos dados ao mesmo tempo
- Existem três variedades de assuntos, que são explicadas abaixo com exemplos:
PublishSubjects
PublishSubjects
se comportam como StreamControllers
, exceto pela possibilidade de muitos ouvintes:
var subject = new PublishSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1");
Execute este código e você obterá:
Item1 ITEM2 Item2 ITEM3 Item3
É claro que o segundo ouvinte que estava atrasado para a festa (nós os chamaremos de assinantes atrasados) perdeu o primeiro ponto. Para evitar isso, você pode usar o BehaviourSubject
Behaviourubject
Com BehaviourSubject
cada novo assinante receberá primeiro o último valor aceito:
var subject = new BehaviorSubject<String>(); subject.listen((item) => print(item)); subject.add("Item1"); subject.add("Item2"); subject.listen((item) => print(item.toUpperCase())); subject.add("Item3");
Na saída
Item1 ITEM2 ITEM3 Item2 Item3
Você pode ver que o Item1
perdido para o segundo assinante, mas recebe o Item2
. Você pode se surpreender que o segundo assinante receba o Item3
antes que o primeiro assinante receba o Item2
. Isso ocorre porque a sequência de atendimento aos assinantes não é garantida, embora todos os assinantes recebam dados na ordem correta. BehaviourSubject
armazena em cache apenas o último item recebido para assinantes atrasados. Se você precisar armazenar mais elementos em cache, poderá usar ReplaySubject . Na maioria dos casos, isso não é necessário.
Manipulando dados em tempo real

A verdadeira força do Rx reside no fato de permitir processar dados durante a transmissão pelo fluxo. Cada um dos métodos Rx retorna um novo fluxo com os dados resultantes (como na ilustração), o que significa que você pode vinculá-los em um pipeline de processamento, e isso faz do Rx uma ferramenta extremamente poderosa.
Mapa
Se houver alguma operação de Stream que eu não queira mais perder, esse é o map()
. O que map()
faz é que leva cada item de dados a ser transferido e aplica uma certa função a ele, após o que coloca o resultado no fluxo resultante. Um exemplo simples:

var subject = new PublishSubject<String>(); subject.map((item) => item.toUpperCase()).listen(print); subject.add("Item1"); subject.add("Item2"); subject.add("Item3");
Resultado:
ITEM1 ITEM2 ITEM3
Mas o map
não map
necessário para retornar o mesmo tipo de dados que ele recebe como entrada. O exemplo a seguir terá números inteiros em vez de cadeias. Além disso, vincularemos duas transformações:
var subject = new PublishSubject<int>(); subject.map((intValue) => intValue.toString()) .map((item) => item.toUpperCase()) .listen(print); subject.add(1); subject.add(2); subject.add(3);
ou algo parecido com isto:

class DataClass{} class WrapperClass { final DataClass wrapped; WrapperClass(this.wrapped); } var subject = new PublishSubject<WrapperClass>(); subject.map<WrapperClass>((a) => new WrapperClass(a));
Um dos usos mais úteis do .map
é quando você obtém dados em um formato de alguma API REST ou de um banco de dados e deseja que eles sejam convertidos em seus próprios objetos:
class User { final String name; final String adress; final String phoneNumber; final int age;
Observo que não apenas o Streams, mas também qualquer Iterable oferece uma função de map
que você pode usar para transformações em listas.
Onde
Se você estiver interessado apenas em determinados valores que ocorrem no fluxo, poderá usar a função .where()
vez de usar a if
no seu ouvinte, isso é mais expressivo e fácil de ler:
var subject = new PublishSubject<int>(); subject.where((val) => val.isOdd) .listen( (val) => print('This only prints odd numbers: $val')); subject.where((val) => val.isEven) .listen( (val) => print('This only prints even numbers: $val')); subject.add(1); subject.add(2); subject.add(3);
Debounce
Esta é uma das pequenas pérolas de Rx! Imagine que você tenha um campo de pesquisa que chama a API REST se o texto for alterado. Fazer uma chamada de API para cada pressionamento de tecla é caro. Assim, você gostaria de fazer uma ligação apenas se o usuário parar por um momento. Para isso, é usada a função debounce()
, que engole todos os eventos recebidos se não forem seguidos por uma pausa.
var subject = new PublishSubject<String>(); subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s)); subject.add('A'); subject.add('AB'); await Future.delayed(Duration(milliseconds: 200)); subject.add("ABC");
Portanto, se você converter o manipulador TextField.onChanged
em Observable
, obterá uma solução elegante.
Expandir
Se o seu Stream de origem emitir matrizes de objetos e você quiser processar cada objeto por conta própria, poderá usar .expand
, o que fará exatamente isso:

Você verá a aplicação deste método abaixo no exemplo do FireStore.
Mesclar
Se você possui vários threads diferentes, mas deseja processar seus objetos juntos, pode usar .mergeWith
(em outras implementações de Rx apenas merge
), que pega uma matriz de threads e retorna um thread mesclado.

.mergeWith
não garante que nenhuma ordem nos fluxos seja mesclada. Os dados são emitidos em ordem de entrada.
Por exemplo, se você possui dois componentes que relatam erros através de um fluxo e deseja que eles sejam exibidos juntos em um diálogo, é possível fazer o seguinte (pseudo-código):
@override initState() { super.initState(); component1.errors.mergeWith([component2.errors]) .listen( (error) async => await showDialog(error.message)); }
ou se você quiser uma exibição combinada de mensagens de várias redes sociais, pode ser assim (pseudocódigo):
final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data)); final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data)); final postStream = observableTwitter.mergeWith([observableFacebook]);
Zipwith
zipWith
também mescla um fluxo com outro. Mas, diferentemente de .mergeWith
, ele não envia dados assim que recebe um elemento de um de seus fluxos de origem. Ele espera até que os elementos dos dois fluxos de origem cheguem e os combina usando a função zipper
fornecida:

A assinatura zipWith
parece assustadora, mas agora olhamos para ela:
Um exemplo muito simplificado:
new Observable.just(1)
Uma aplicação mais prática é se você precisar esperar por duas funções assíncronas que retornam Future
e desejar processar os dados assim que os resultados forem retornados. Neste exemplo um pouco artificial, apresentamos duas APIs REST: uma retorna User
, a outra retorna Product
como strings JSON, e queremos aguardar as duas chamadas antes de retornar o objeto Invoice
.
class Invoice { final User user; final Product product; Invoice(this.user, this.product); printInvoice() { print(user.toString()); print(product.toString()); } }
Observando a saída, você pode ver como isso é feito de forma assíncrona
Started getting User Started getting product Start listening for invoices Finished getting product Finished getting User Jon Doe - New York - 424242 - 42 Flux compensator - 99999.99
Combinelatest
combineLatest
também mescla valores de fluxo, mas de uma maneira ligeiramente diferente de merge
e zip
. Ele escuta mais threads e emite um valor combinado sempre que um novo valor chega de um dos threads. É interessante que ele gere não apenas o valor alterado, mas também os últimos valores obtidos de todos os outros fluxos de origem. Veja atentamente esta animação:

Antes que combineLates
seu primeiro valor, todos os encadeamentos de origem devem receber pelo menos uma entrada.
Diferente dos métodos usados anteriormente, o combineLatest
é estático. Além disso, como o Dart não permite sobrecarga do operador, existem versões do combLastest
dependendo do número de fluxos de origem: combineLatest2 ... combineLatest9
combineLatest
bom uso, por exemplo, se você tiver dois Observable<bool>
que sinalizam que algumas partes do seu aplicativo estão ocupadas e deseja exibir o botão giratório Ocupado, se um deles estiver ocupado. Pode ficar assim (pseudo-código):
class Model { Observable<bool> get isBusy => Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2); PublishSubject<bool> isBusyOne; PublishSubject<bool> isBusyTwo; }
Na sua interface do usuário, você pode usar o isBusy
com o StreamBuilder
para exibir o Spinner
se o valor resultante for verdadeiro.
combineLatest
recurso muito adequado em combinação com os fluxos de captura instantânea do FireStore .
Imagine que você deseja criar um aplicativo que exiba um feed de notícias e uma previsão do tempo. As mensagens do relógio e os dados meteorológicos são armazenados em duas coleções diferentes do FireStore. Ambos são atualizados independentemente. Você deseja exibir atualizações de dados usando o StreamBuilder. Com o combineLatest
é fácil:
class WeatherForecast { final String forecastText; final GeoPoint location; factory WeatherForecast.fromMap(Map<String, dynamic> map) { return WeatherForecast(map['forecastText'], map['location']); } WeatherForecast(this.forecastText, this.location); } class NewsMessage { final String newsText; final GeoPoint location; factory NewsMessage.fromMap(Map<String, dynamic> map) { return NewsMessage(map['newsText'], map['location']); } NewsMessage(this.newsText, this.location); } class CombinedMessage { final WeatherForecast forecast; final NewsMessage newsMessage; CombinedMessage(this.forecast, this.newsMessage); } class Model { CollectionReference weatherCollection; CollectionReference newsCollection; Model() { weatherCollection = Firestore.instance.collection('weather'); newsCollection = Firestore.instance.collection('news'); } Observable<CombinedMessage> getCombinedMessages() { Observable<WeatherForecast> weatherForecasts = weatherCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data)); Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data)); return Observable.combineLatest2( weatherForecasts, news, (weather, news) => CombinedMessage(weather, news)); } }
Na sua interface do usuário, seria algo como isto: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).
Distinto
No cenário descrito acima, pode acontecer que isBusyOne e isBusyTwo forneçam o mesmo valor, o que levará a uma atualização da interface do usuário com os mesmos dados. Para evitar isso, podemos usar .distinct()
. Ele garante que os dados sejam transmitidos apenas se o valor do novo elemento for diferente do último. Assim, mudaríamos o código para:
Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
e também demonstra que podemos combinar nossas funções em diferentes cadeias à vontade.
Asyncmap
Além de map()
também há uma função asyncMap
, que permite usar uma função assíncrona como uma função de mapa. Vamos introduzir uma configuração ligeiramente diferente para o nosso exemplo do FireStore. Agora, o WeatherForecast necessário depende da localização do NewsMessage e só deve ser atualizado quando um novo NewsMessage for recebido :
Observable<CombinedMessage> getDependendMessages() { Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) { return snapShot.documents; }).map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); }); return news.asyncMap((newsEntry) async { var weatherDocuments = await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments(); return new CombinedMessage( WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry); }); }
O Observable retornado por getDependendMessages gerará um novo CombinedMessage toda vez que o newsCollection for alterado.
Observáveis de depuração
Observando as elegantes cadeias de chamadas Rx, parece quase impossível depurar uma expressão como esta:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) => snapShot.documents) .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
Mas lembre-se de que =>
é apenas um pequeno formulário para uma função anônima. Usando Converter para bloquear o corpo , você obterá:
Observable<NewsMessage> news = newsCollection .snapshots() .expand((snapShot) { return snapShot.documents; }) .map<NewsMessage>((document) { return NewsMessage.fromMap(document.data); });
E agora podemos definir um ponto de interrupção ou adicionar instruções de impressão em todas as etapas do nosso pipeline.
Cuidado com os efeitos colaterais
Se você deseja capitalizar o Rx para tornar seu código mais robusto, lembre-se sempre de que o Rx é uma conversão de dados ao movê-lo "ao longo da correia transportadora". Portanto, nunca chame funções que alterem quaisquer variáveis / estados fora do pipeline de processamento até atingir a função .listen.
Em vez de fazer isso:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) { var product = Product.fromJson(jsonString); database.save(product); setState((){ _product = product }); return product; }).listen();
faça o seguinte:
Observable.fromFuture(getProduct()) .map<Product>((jsonString) => Product.fromJson(jsonString)) .listen( (product) { database.save(product); setState((){ _product = product }); });
O dever do map()
é transformar os dados no fluxo, E NADA MAIS! Se a função de exibição passada fizer outra coisa, ela será considerada um efeito colateral, gerando erros em potencial que são difíceis de detectar ao ler o código.
Algumas reflexões sobre como liberar recursos
Para evitar vazamentos de memória, sempre chame cancel()
para assinaturas, dispose()
para StreamControllers, close()
para Assuntos, assim que você não precisar mais deles.
Conclusão
Parabéns se você ficou comigo até esse momento. Agora você pode não apenas usar o Rx para facilitar sua vida, mas também se preparar para as próximas postagens nas quais iremos nos aprofundar nos detalhes do RxVMS .