Wie ich RxJava in meinem Projekt durch Coroutinen ersetzt habe und warum Sie dies wahrscheinlich auch tun sollten

Hallo Habr! Ich präsentiere Ihnen eine Übersetzung eines Artikels von Paulo Sato über die Verwendung von Kotlin Coroutines anstelle von RxJava in ihren Android-Projekten.

RxJava als Panzerfaust, die meisten Anwendungen verbrauchen nicht einmal die Hälfte ihrer Feuerkraft. In dem Artikel wird erläutert, wie es durch Kotlin-Coroutinen (Coroutinen) ersetzt werden kann.

Ich arbeite seit mehreren Jahren mit RxJava. Dies ist definitiv eine der besten Bibliotheken für jedes Android-Projekt, das heute noch unter Schock steht, insbesondere wenn Sie in Java programmieren. Wenn Sie Kotlin benutzen, können wir sagen, dass die Stadt einen neuen Sheriff hat.

Die meisten verwenden RxJava nur, um Threads zu steuern und Rückrufhölle zu verhindern (wenn Sie nicht wissen, was es ist, können Sie sich glücklich schätzen und deshalb ). Tatsache ist, dass wir bedenken müssen, dass die wahre Kraft von RxJava reaktive Programmierung und Gegendruck ist. Wenn Sie damit asynchrone Anforderungen steuern, verwenden Sie die Panzerfaust, um die Spinne zu töten. Sie wird ihren Job machen, aber es ist übertrieben.

Ein bemerkenswerter Nachteil von RxJava ist die Anzahl der Methoden. Es ist riesig und verbreitet sich im gesamten Code. In Kotlin können Sie Coroutinen verwenden, um den größten Teil des Verhaltens zu implementieren, das Sie zuvor mit RxJava erstellt haben.

Aber ... was sind Coroutinen?

Corutin ist eine Möglichkeit, wettbewerbsfähige Aufgaben in einem Thread zu erledigen. Der Thread funktioniert so lange, bis er gestoppt wird, und der Kontext ändert sich für jede Coroutine, ohne dass ein neuer Thread erstellt wird.
Die Coroutinen in Kotlin sind noch experimentell, aber sie sind in Kotlin 1.3 enthalten, daher habe ich unten eine neue UseCase-Klasse (für saubere Architektur) geschrieben, die sie verwendet. In diesem Beispiel ist ein Coroutine-Aufruf in einer einzelnen Datei gekapselt. Somit hängen andere Schichten nicht von den ausgeführten Coroutinen ab, wodurch eine stärker getrennte Architektur bereitgestellt wird.

/** * (C) Copyright 2018 Paulo Vitor Sato Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package com.psato.devcamp.interactor.usecase import android.util.Log import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.android.UI import kotlin.coroutines.experimental.CoroutineContext /** * Abstract class for a Use Case (Interactor in terms of Clean Architecture). * This interface represents a execution unit for different use cases (this means any use case * in the application should implement this contract). * <p> * By convention each UseCase implementation will return the result using a coroutine * that will execute its job in a background thread and will post the result in the UI thread. */ abstract class UseCase<T> { protected var parentJob: Job = Job() //var backgroundContext: CoroutineContext = IO var backgroundContext: CoroutineContext = CommonPool var foregroundContext: CoroutineContext = UI protected abstract suspend fun executeOnBackground(): T fun execute(onComplete: (T) -> Unit, onError: (Throwable) -> Unit) { parentJob.cancel() parentJob = Job() launch(foregroundContext, parent = parentJob) { try { val result = withContext(backgroundContext) { executeOnBackground() } onComplete.invoke(result) } catch (e: CancellationException) { Log.d("UseCase", "canceled by user") } catch (e: Exception) { onError(e) } } } protected suspend fun <X> background(context: CoroutineContext = backgroundContext, block: suspend () -> X): Deferred<X> { return async(context, parent = parentJob) { block.invoke() } } fun unsubscribe() { parentJob.cancel() } } 

Zunächst habe ich eine übergeordnete Aufgabe erstellt. Dies ist der Schlüssel zum Rückgängigmachen aller Coroutinen, die in der UseCase-Klasse erstellt wurden. Wenn wir die Ausführung aufrufen, ist es wichtig, dass die alten Aufgaben abgebrochen werden, um sicherzustellen, dass wir keine einzige Coroutine verpasst haben (dies geschieht auch, wenn wir uns von diesem UseCase abmelden).

Außerdem rufe ich Startup (UI) auf. Dies bedeutet, dass ich eine Coroutine erstellen möchte, die im UI-Thread ausgeführt wird. Danach rufe ich die Hintergrundmethode auf, die in CommonPool Async erstellt (dieser Ansatz weist tatsächlich eine schlechte Leistung auf). Async gibt wiederum Deffered zurück, und dann rufe ich die Wartemethode auf. Er wartet auf die Fertigstellung der Hintergrundkoroutine, die ein Ergebnis oder einen Fehler bringt.

Dies kann verwendet werden, um fast alles zu implementieren, was wir mit RxJava gemacht haben. Nachfolgend einige Beispiele.

Karte


Ich habe die searchShow-Ergebnisse heruntergeladen und geändert, um den Namen der ersten Show zurückzugeben.
RxJava-Code:
 public class SearchShows extends UseCase { private ShowRepository showRepository; private ResourceRepository resourceRepository; private String query; @Inject public SearchShows(ShowRepository showRepository, ResourceRepository resourceRepository) { this.showRepository = showRepository; this.resourceRepository = resourceRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<String> buildUseCaseObservable() { return showRepository.searchShow(query).map(showInfos -> { if (showInfos != null && !showInfos.isEmpty() && showInfos.get(0).getShow() != null) { return showInfos.get(0).getShow().getTitle(); } else { return resourceRepository.getNotFoundShow(); } }); } } 

Coroutine Code:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<String>() { var query: String? = null override suspend fun executeOnBackground(): String { query?.let { val showsInfo = showRepository.searchShow(it) val showName: String? = showsInfo?.getOrNull(0)?.show?.title return showName ?: resourceRepository.notFoundShow } return "" } } 

ZIP


Zip nimmt zwei Emissionen von Observer und fügt sie zu einer neuen Emission zusammen. Beachten Sie, dass Sie bei RxJava angeben müssen, dass mit parcribeOn in jedem Single ein Anruf parallel getätigt werden soll. Wir wollen beide gleichzeitig bekommen und sie zusammen zurückgeben.

RxJava-Code:

 public class ShowDetail extends UseCase { private ShowRepository showRepository; private String id; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setId(String id) { this.id = id; } @Override protected Single<Show> buildUseCaseObservable() { Single<ShowDetail> singleDetail = showRepository.showDetail(id).subscribeOn(Schedulers.io()); Single<ShowBanner> singleBanner = showRepository.showBanner(id).subscribeOn(Schedulers.io()); return Single.zip(singleDetail, singleBanner, (detail, banner) -> new Show(detail,banner)); } 

Coroutine Code:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<Show>() { var id: String? = null override suspend fun executeOnBackground(): Show { id?.let { val showDetail = background{ showRepository.showDetail(it) } val showBanner = background{ showRepository.showBanner(it) } return Show(showDetail.await(), showBanner.await()) } return Show() } } 

Flatmap


In diesem Fall suche ich nach Shows mit einer Abfragezeichenfolge und für jedes Ergebnis (begrenzt auf 200 Ergebnisse) erhalte ich auch die Bewertung der Show. Am Ende gebe ich eine Liste der Shows mit den entsprechenden Bewertungen zurück.

RxJava-Code:

 public class SearchShows extends UseCase { private ShowRepository showRepository; private String query; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<List<ShowResponse>> buildUseCaseObservable() { return showRepository.searchShow(query).flatMapPublisher( (Function<List<ShowInfo>, Flowable<ShowInfo>>) Flowable::fromIterable) .flatMapSingle((Function<ShowInfo, SingleSource<ShowResponse>>) showInfo -> showRepository.showRating(showInfo.getShow().getIds().getTrakt()) .map(rating -> new ShowResponse(showInfo.getShow().getTitle(), rating .getRating())).subscribeOn(Schedulers.io()), false, 4).toList(); } } 

Coroutine Code:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository) : UseCase<List<ShowResponse>>() { var query: String? = null override suspend fun executeOnBackground(): List<ShowResponse> { query?.let { query -> return showRepository.searchShow(query).map { background { val rating: Rating = showRepository.showRating(it.show!!.ids!!.trakt!!) ShowResponse(it.show.title!!, rating.rating) } }.map { it.await() } } return arrayListOf() } } 

Lass mich erklären. Mit RxJava gibt mein Repository eine einzelne Emission von List zurück, sodass ich mehrere Emissionen benötige, eine für jede ShowInfo. Dazu habe ich flatMapPublisher aufgerufen. Für jede Ausgabe muss ich ShowResponse markieren und am Ende alle in einer Liste zusammenfassen.

Wir erhalten diese Konstruktion: List foreach → (ShowInfo → ShowRating → ShowResponse) → List.

Mit Coroutinen habe ich für jedes List-Element eine Map erstellt, um es in eine List <Deffered> zu konvertieren.

Wie Sie sehen, ist das meiste, was wir mit RxJava gemacht haben, mit synchronen Aufrufen einfacher zu implementieren. Coroutinen können sogar flatMap verarbeiten, was meiner Meinung nach eine der komplexesten Funktionen in RxJava ist.

Es ist bekannt, dass Coroutinen leicht sein können ( hier ein Beispiel), aber die Ergebnisse haben mich verwirrt. In diesem Beispiel wurde RxJava in ungefähr 3,1 Sekunden gestartet, während Coroutinen ungefähr 5,8 Sekunden brauchten, um auf CommonPool ausgeführt zu werden.

Diese Ergebnisse werfen die Frage vor mir auf, ob sie etwas Unangemessenes enthalten könnten. Später fand ich das. Ich habe Retrofit Call verwendet, wodurch der Fluss blockiert wurde.

Es gibt zwei Möglichkeiten, um diesen Fehler zu beheben. Die Auswahl hängt davon ab, welche Version von Android Studio Sie verwenden. In Android Studio 3.1 müssen wir sicherstellen, dass der Hintergrund-Thread nicht blockiert wird. Dafür habe ich diese Bibliothek benutzt:
Implementierung 'ru.gildor.coroutines: kotlin-coroutines-retrofit: 0.12.0'

Dieser Code erstellt eine Erweiterung der Nachrüst-Aufruffunktion, um den Stream anzuhalten:

 public suspend fun <T : Any> Call<T>.await(): T { return suspendCancellableCoroutine { continuation -> enqueue(object : Callback<T> { override fun onResponse(call: Call<T>?, response: Response<T?>) { if (response.isSuccessful) { val body = response.body() if (body == null) { continuation.resumeWithException( NullPointerException("Response body is null: $response") ) } else { continuation.resume(body) } } else { continuation.resumeWithException(HttpException(response)) } } override fun onFailure(call: Call<T>, t: Throwable) { // Don't bother with resuming the continuation if it is already cancelled. if (continuation.isCancelled) return continuation.resumeWithException(t) } }) registerOnCompletion(continuation) } } 

In Android Studio 3.2 können Sie die Corutin-Bibliothek auf Version 0.25.0 aktualisieren. Diese Version verfügt über CoroutineContext IO (den entsprechenden Kommentar finden Sie in meiner UseCase-Klasse).

Das Ausführen auf CommonPool ohne blockierenden Anruf dauerte 2,3 Sekunden und 2,4 Sekunden mit E / A und blockierenden Anrufen.

Bild

Ich hoffe, dieser Artikel wird Sie dazu inspirieren, Corutin zu verwenden, eine leichtere und vielleicht schnellere Alternative zu RxJava, und es ein wenig einfacher machen zu verstehen, dass Sie synchronisierten Code schreiben, der asynchron ausgeführt wird.

Source: https://habr.com/ru/post/de421739/


All Articles