
O RxJava é usado em um grande número de aplicativos Android, mas, ao mesmo tempo, muitos não conhecem outras fontes de eventos, exceto Observable e talvez Flowable. Eles esquecem as classes especializadas Single, Maybe e Completable, que geralmente podem adicionar mais clareza ao código.
Sob o gato, você encontrará uma folha de dicas sobre as fontes de eventos que existem no RxJava.
Completável é na verdade um análogo Rx do Runnable. É uma operação que pode ser executada ou não. Se traçarmos uma analogia com Kotlin, isso é
divertido () do mundo do Rx. Portanto, para se inscrever, é necessário implementar onComplete e onError. Não pode ser criado a partir do valor (Observable # just, ...) porque não foi projetado para isso.
Callable de reação
única , porque aqui é possível retornar o resultado da operação. Continuando a comparação com Kotlin, podemos dizer que Single é divertido single (): T {}. Portanto, para se inscrever, você deve implementar onSuccess (T) e onError.
Talvez - um cruzamento entre Único e Completável, porque suporta um valor, sem valores e erro. É mais difícil traçar um paralelo inequívoco com os métodos, mas acho que Talvez seja divertido, talvez (): T? {}, que retorna nulo quando não há resultado. É fácil adivinhar que, para uma assinatura, você precisa definir onSuccess (T), onComplete e onError.
É importante observar que onSuccess (T) e onComplete são mutuamente exclusivos. I.e. no caso de chamar o primeiro, você não pode esperar pelo segundo.
Observável é a fonte mais comum, devido à sua versatilidade. Ele sabe como não produzir eventos, e gerar muitos deles, para que ele possa sempre ser usado quando outras opções não forem adequadas. Apesar disso, o Observable tem uma desvantagem - ele não sabe lidar com a contrapressão. Para se inscrever, você precisa de onNext (T), onError e onComplete.
Contrapressão - uma situação em que novos eventos chegam muito mais rapidamente do que eles têm tempo para processar e começam a se acumular no buffer, transbordando. Isso pode causar problemas como o OutOfMemoryError. Mais detalhes podem ser encontrados aqui .
ConnectableObservable - uma versão aquecida do Observable. Todas as fontes de dados começam a emitir seu fluxo de eventos no momento da assinatura. Mas não esse cara. Para fazer isso, ConnectableObservable aguarda uma ligação para se conectar. Isso é feito para que vários observadores possam revisar um fluxo de eventos sem reiniciá-lo em cada assinatura. Para ilustrar, darei a você o seguinte snippet:
val observable = Observable.fromCallable { Log.d("RxLogs", "observable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe()
No console estará:
observável deCallable executado
observável deCallable executado
observável deCallable executado
observável deCallable executado
val connectedObservable = Observable.fromCallable { Log.d("RxLogs", "connectedObservable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) .publish() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.connect()
E neste caso: observável deCallable executado
Flowable - uma fonte que fornece operadores adicionais para processar a contrapressão. Quando você precisar manipular mais de 10.000 eventos que ocorrem rapidamente, um após o outro, é recomendável usá-lo em vez de Observável.
O último pode criar um ConnectableFlowable, abrindo as mesmas possibilidades que o ConnectableObservable.
Falando sobre geradores de eventos, não se pode deixar de mencionar Assunto e Processador.
Assunto - uma classe que pode ser tanto uma fonte quanto um navegador. Isso permite que você o use, por exemplo, em vários tipos de controladores, que o fornecerão externamente como um Observável e serão notificados internamente como um Observador. A seguir, passaremos por diferentes implementações dessa classe.
AsyncSubject / AsyncProcessor mantém o último evento até que o thread termine corretamente e o envia aos assinantes. Se ocorrer um erro, nenhum evento será encaminhado.
O PublishSubject / PublishProcessor encaminha os eventos que estão chegando até que um sinal de terminal chegue. Após o final do fluxo ou erro, ele retorna os eventos apropriados.
BehaviorSubject / BehaviorProcessor funciona de maneira semelhante ao PublishSubject / PublishProcessor, mas após a assinatura, ele retorna o último evento, se houver, e se o Subject não passou para o estado do terminal.
ReplaySubject / ReplayProcessor - BehaviourSubject / BehaviorProcessor em esteróides. Ele retorna não um último evento, mas tanto quanto a alma deseja. Se você se inscrever em um ReplaySubject ou ReplayProcessor concluído, todos os dados acumulados serão recebidos.

Portanto, ReplaySubject.createWithSize (1) e BehaviourSubject.create () funcionam de maneira diferente após a transição para o estado terminal. Durante a assinatura, o primeiro retornará o último evento e o segundo não. Isso também se aplica ao ReplayProcessor.
CompletableSubject ,
MaybeSubject e
SingleSubject funcionam de maneira semelhante ao PublishSubject, projetado apenas para uso com Completable, Maybe e Single, respectivamente.
UnicastSubject / UnicastProcessor é realmente um ReplaySubject que garante que ele tenha apenas um assinante. Ele lança uma IllegalStateException ao tentar se inscrever novamente.

I.e. próximo trecho
val subject = UnicastSubject.create<String>(3) repeat(3) { subject.onNext(it.toString()) } subject.onComplete() subject.subscribe({ Log.d("RxLogs", it) }, { }, { Log.d("RxLogs", "complete") })
será exibido no log
0 0
1
2
completo
O MulticastProcessor funciona de maneira semelhante ao PublishProcessor, com exceção de um pequeno recurso. Ele sabe como lidar com a contrapressão no fluxo de entrada. O MulticastProcessor permite que você defina o tamanho do buffer no qual ele pré-consulta elementos do upstream para futuros assinantes.
No diagrama abaixo, um processador é criado com armazenamento para 2 elementos, que ele solicita imediatamente de sua origem. Portanto, quando o primeiro observador se inscreve, emite imediatamente o conteúdo do buffer, que é instantaneamente preenchido com novos eventos. Após o evento do terminal, o MulticastProcessor limpa seu armazenamento e os novos assinantes recebem imediatamente a conclusão do fluxo.
