Referência da fonte de eventos Rx



O RxJava é usado em um grande número de aplicativos Android, mas, ao mesmo tempo, muitos não conhecem outras fontes de eventos, exceto Observable e talvez Flowable. Eles esquecem as classes especializadas Single, Maybe e Completable, que geralmente podem adicionar mais clareza ao código.

Sob o gato, você encontrará uma folha de dicas sobre as fontes de eventos que existem no RxJava.

Completável é na verdade um análogo Rx do Runnable. É uma operação que pode ser executada ou não. Se traçarmos uma analogia com Kotlin, isso é divertido () do mundo do Rx. Portanto, para se inscrever, é necessário implementar onComplete e onError. Não pode ser criado a partir do valor (Observable # just, ...) porque não foi projetado para isso.

Callable de reação única , porque aqui é possível retornar o resultado da operação. Continuando a comparação com Kotlin, podemos dizer que Single é divertido single (): T {}. Portanto, para se inscrever, você deve implementar onSuccess (T) e onError.

Talvez - um cruzamento entre Único e Completável, porque suporta um valor, sem valores e erro. É mais difícil traçar um paralelo inequívoco com os métodos, mas acho que Talvez seja divertido, talvez (): T? {}, que retorna nulo quando não há resultado. É fácil adivinhar que, para uma assinatura, você precisa definir onSuccess (T), onComplete e onError.
É importante observar que onSuccess (T) e onComplete são mutuamente exclusivos. I.e. no caso de chamar o primeiro, você não pode esperar pelo segundo.
Observável é a fonte mais comum, devido à sua versatilidade. Ele sabe como não produzir eventos, e gerar muitos deles, para que ele possa sempre ser usado quando outras opções não forem adequadas. Apesar disso, o Observable tem uma desvantagem - ele não sabe lidar com a contrapressão. Para se inscrever, você precisa de onNext (T), onError e onComplete.

Contrapressão - uma situação em que novos eventos chegam muito mais rapidamente do que eles têm tempo para processar e começam a se acumular no buffer, transbordando. Isso pode causar problemas como o OutOfMemoryError. Mais detalhes podem ser encontrados aqui .

ConnectableObservable - uma versão aquecida do Observable. Todas as fontes de dados começam a emitir seu fluxo de eventos no momento da assinatura. Mas não esse cara. Para fazer isso, ConnectableObservable aguarda uma ligação para se conectar. Isso é feito para que vários observadores possam revisar um fluxo de eventos sem reiniciá-lo em cada assinatura. Para ilustrar, darei a você o seguinte snippet:

val observable = Observable.fromCallable { Log.d("RxLogs", "observable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe() 
No console estará:
observável deCallable executado
observável deCallable executado
observável deCallable executado
observável deCallable executado

 val connectedObservable = Observable.fromCallable { Log.d("RxLogs", "connectedObservable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) .publish() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.connect() 

E neste caso: observável deCallable executado

Flowable - uma fonte que fornece operadores adicionais para processar a contrapressão. Quando você precisar manipular mais de 10.000 eventos que ocorrem rapidamente, um após o outro, é recomendável usá-lo em vez de Observável.

O último pode criar um ConnectableFlowable, abrindo as mesmas possibilidades que o ConnectableObservable.

Falando sobre geradores de eventos, não se pode deixar de mencionar Assunto e Processador.

Assunto - uma classe que pode ser tanto uma fonte quanto um navegador. Isso permite que você o use, por exemplo, em vários tipos de controladores, que o fornecerão externamente como um Observável e serão notificados internamente como um Observador. A seguir, passaremos por diferentes implementações dessa classe.

AsyncSubject / AsyncProcessor mantém o último evento até que o thread termine corretamente e o envia aos assinantes. Se ocorrer um erro, nenhum evento será encaminhado.

imagem

O PublishSubject / PublishProcessor encaminha os eventos que estão chegando até que um sinal de terminal chegue. Após o final do fluxo ou erro, ele retorna os eventos apropriados.

imagem

BehaviorSubject / BehaviorProcessor funciona de maneira semelhante ao PublishSubject / PublishProcessor, mas após a assinatura, ele retorna o último evento, se houver, e se o Subject não passou para o estado do terminal.

imagem

ReplaySubject / ReplayProcessor - BehaviourSubject / BehaviorProcessor em esteróides. Ele retorna não um último evento, mas tanto quanto a alma deseja. Se você se inscrever em um ReplaySubject ou ReplayProcessor concluído, todos os dados acumulados serão recebidos.

imagem
Portanto, ReplaySubject.createWithSize (1) e BehaviourSubject.create () funcionam de maneira diferente após a transição para o estado terminal. Durante a assinatura, o primeiro retornará o último evento e o segundo não. Isso também se aplica ao ReplayProcessor.
CompletableSubject , MaybeSubject e SingleSubject funcionam de maneira semelhante ao PublishSubject, projetado apenas para uso com Completable, Maybe e Single, respectivamente.

UnicastSubject / UnicastProcessor é realmente um ReplaySubject que garante que ele tenha apenas um assinante. Ele lança uma IllegalStateException ao tentar se inscrever novamente.

imagem

I.e. próximo trecho

 val subject = UnicastSubject.create<String>(3) repeat(3) { subject.onNext(it.toString()) } subject.onComplete() subject.subscribe({ Log.d("RxLogs", it) }, { }, { Log.d("RxLogs", "complete") }) 

será exibido no log
0 0
1
2
completo

O MulticastProcessor funciona de maneira semelhante ao PublishProcessor, com exceção de um pequeno recurso. Ele sabe como lidar com a contrapressão no fluxo de entrada. O MulticastProcessor permite que você defina o tamanho do buffer no qual ele pré-consulta elementos do upstream para futuros assinantes.

No diagrama abaixo, um processador é criado com armazenamento para 2 elementos, que ele solicita imediatamente de sua origem. Portanto, quando o primeiro observador se inscreve, emite imediatamente o conteúdo do buffer, que é instantaneamente preenchido com novos eventos. Após o evento do terminal, o MulticastProcessor limpa seu armazenamento e os novos assinantes recebem imediatamente a conclusão do fluxo.

imagem

Source: https://habr.com/ru/post/pt459174/


All Articles