Bagaimana saya mengganti RxJava dengan coroutine di proyek saya dan mengapa Anda mungkin juga harus melakukan ini

Halo, Habr! Saya mempersembahkan kepada Anda terjemahan sebuah artikel oleh Paulo Sato tentang penggunaan Kotlin Coroutines alih-alih RxJava dalam proyek Android mereka.

RxJava sebagai bazooka, sebagian besar aplikasi bahkan tidak menggunakan setengah daya tembaknya. Artikel ini akan membahas cara menggantinya dengan coroutine Kotlin (coroutine).

Saya telah bekerja dengan RxJava selama beberapa tahun. Ini jelas merupakan salah satu perpustakaan terbaik untuk setiap proyek Android, yang masih mengejutkan saat ini, terutama jika Anda pemrograman di Jawa. Jika Anda menggunakan Kotlin, maka kita dapat mengatakan bahwa kota ini memiliki sheriff baru.

Sebagian besar menggunakan RxJava hanya untuk mengontrol utas dan untuk mencegah panggilan balik neraka (jika Anda tidak tahu apa itu, anggap diri Anda beruntung dan itulah sebabnya ). Faktanya adalah bahwa kita harus ingat bahwa kekuatan sebenarnya dari RxJava adalah pemrograman reaktif dan tekanan balik. Jika Anda menggunakannya untuk mengontrol permintaan asinkron, Anda menggunakan bazoka untuk membunuh laba-laba. Dia akan melakukan pekerjaannya, tapi itu berlebihan.

Salah satu kekurangan RxJava adalah jumlah metode. Ini sangat besar dan cenderung menyebar ke seluruh kode. Di Kotlin, Anda bisa menggunakan coroutine untuk mengimplementasikan sebagian besar perilaku yang Anda buat sebelumnya menggunakan RxJava.

Tapi ... apa itu coroutine?

Corutin adalah cara untuk menangani tugas kompetitif di utas. Utas akan berfungsi sampai dihentikan dan konteksnya akan berubah untuk setiap coroutine tanpa membuat utas baru.
Coroutine di Kotlin masih eksperimental, tetapi mereka dimasukkan dalam Kotlin 1.3, jadi saya menulis kelas UseCase baru (untuk arsitektur bersih) dengan menggunakannya di bawah ini. Dalam contoh ini, panggilan coroutine dienkapsulasi dalam satu file. Dengan demikian, layer lain tidak akan bergantung pada coroutine yang dieksekusi, memberikan arsitektur yang lebih terputus.

/** * (C) Copyright 2018 Paulo Vitor Sato Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package com.psato.devcamp.interactor.usecase import android.util.Log import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.android.UI import kotlin.coroutines.experimental.CoroutineContext /** * Abstract class for a Use Case (Interactor in terms of Clean Architecture). * This interface represents a execution unit for different use cases (this means any use case * in the application should implement this contract). * <p> * By convention each UseCase implementation will return the result using a coroutine * that will execute its job in a background thread and will post the result in the UI thread. */ abstract class UseCase<T> { protected var parentJob: Job = Job() //var backgroundContext: CoroutineContext = IO var backgroundContext: CoroutineContext = CommonPool var foregroundContext: CoroutineContext = UI protected abstract suspend fun executeOnBackground(): T fun execute(onComplete: (T) -> Unit, onError: (Throwable) -> Unit) { parentJob.cancel() parentJob = Job() launch(foregroundContext, parent = parentJob) { try { val result = withContext(backgroundContext) { executeOnBackground() } onComplete.invoke(result) } catch (e: CancellationException) { Log.d("UseCase", "canceled by user") } catch (e: Exception) { onError(e) } } } protected suspend fun <X> background(context: CoroutineContext = backgroundContext, block: suspend () -> X): Deferred<X> { return async(context, parent = parentJob) { block.invoke() } } fun unsubscribe() { parentJob.cancel() } } 

Pertama-tama, saya membuat tugas orang tua. Ini adalah kunci untuk membatalkan semua coroutine yang dibuat di kelas UseCase. Ketika kami memanggil eksekusi, penting agar tugas lama dibatalkan, untuk memastikan bahwa kami tidak melewatkan satu coroutine (ini juga akan terjadi jika kami berhenti berlangganan dari UseCase ini).

Juga, saya meminta startup (UI). Ini berarti bahwa saya ingin membuat coroutine yang akan dieksekusi di utas UI. Setelah itu, saya memanggil metode latar belakang yang menciptakan async di CommonPool (pendekatan ini sebenarnya akan memiliki kinerja yang buruk). Pada gilirannya, async akan mengembalikan Deffered, dan kemudian, saya akan memanggil metode menunggu. Dia menunggu penyelesaian coroutine latar belakang, yang akan membawa hasil atau kesalahan.

Ini dapat digunakan untuk mengimplementasikan sebagian besar semua yang kami lakukan dengan RxJava. Berikut ini beberapa contohnya.

Peta


Saya mengunduh hasil pencarianShow dan mengubahnya untuk mengembalikan nama pertunjukan pertama.
Kode RxJava:
 public class SearchShows extends UseCase { private ShowRepository showRepository; private ResourceRepository resourceRepository; private String query; @Inject public SearchShows(ShowRepository showRepository, ResourceRepository resourceRepository) { this.showRepository = showRepository; this.resourceRepository = resourceRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<String> buildUseCaseObservable() { return showRepository.searchShow(query).map(showInfos -> { if (showInfos != null && !showInfos.isEmpty() && showInfos.get(0).getShow() != null) { return showInfos.get(0).getShow().getTitle(); } else { return resourceRepository.getNotFoundShow(); } }); } } 

Kode Coroutine:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<String>() { var query: String? = null override suspend fun executeOnBackground(): String { query?.let { val showsInfo = showRepository.searchShow(it) val showName: String? = showsInfo?.getOrNull(0)?.show?.title return showName ?: resourceRepository.notFoundShow } return "" } } 

ZIP


Zip akan mengambil dua emisi dari Observer dan menyatukannya dalam emisi baru. Perhatikan bahwa dengan RxJava Anda harus menentukan untuk melakukan panggilan secara paralel menggunakan subscribeOn di masing-masing Tunggal. Kami ingin mendapatkan keduanya secara bersamaan dan mengembalikannya bersama.

Kode RxJava:

 public class ShowDetail extends UseCase { private ShowRepository showRepository; private String id; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setId(String id) { this.id = id; } @Override protected Single<Show> buildUseCaseObservable() { Single<ShowDetail> singleDetail = showRepository.showDetail(id).subscribeOn(Schedulers.io()); Single<ShowBanner> singleBanner = showRepository.showBanner(id).subscribeOn(Schedulers.io()); return Single.zip(singleDetail, singleBanner, (detail, banner) -> new Show(detail,banner)); } 

Kode Coroutine:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<Show>() { var id: String? = null override suspend fun executeOnBackground(): Show { id?.let { val showDetail = background{ showRepository.showDetail(it) } val showBanner = background{ showRepository.showBanner(it) } return Show(showDetail.await(), showBanner.await()) } return Show() } } 

Flatmap


Dalam hal ini, saya mencari acara yang memiliki string kueri dan untuk setiap hasil (terbatas pada 200 hasil), saya juga mendapatkan peringkat acara tersebut. Pada akhirnya, saya mengembalikan daftar acara dengan peringkat yang sesuai.

Kode RxJava:

 public class SearchShows extends UseCase { private ShowRepository showRepository; private String query; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<List<ShowResponse>> buildUseCaseObservable() { return showRepository.searchShow(query).flatMapPublisher( (Function<List<ShowInfo>, Flowable<ShowInfo>>) Flowable::fromIterable) .flatMapSingle((Function<ShowInfo, SingleSource<ShowResponse>>) showInfo -> showRepository.showRating(showInfo.getShow().getIds().getTrakt()) .map(rating -> new ShowResponse(showInfo.getShow().getTitle(), rating .getRating())).subscribeOn(Schedulers.io()), false, 4).toList(); } } 

Kode Coroutine:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository) : UseCase<List<ShowResponse>>() { var query: String? = null override suspend fun executeOnBackground(): List<ShowResponse> { query?.let { query -> return showRepository.searchShow(query).map { background { val rating: Rating = showRepository.showRating(it.show!!.ids!!.trakt!!) ShowResponse(it.show.title!!, rating.rating) } }.map { it.await() } } return arrayListOf() } } 

Biarkan saya jelaskan. Menggunakan RxJava, repositori saya mengembalikan satu emisi Daftar, jadi saya perlu beberapa emisi, satu untuk setiap ShowInfo. Untuk melakukan ini, saya menelepon flatMapPublisher. Untuk setiap masalah, saya harus menyoroti ShowResponse, dan pada akhirnya kumpulkan semuanya ke dalam daftar.

Kita berakhir dengan konstruksi ini: List foreach β†’ (ShowInfo β†’ ShowRating β†’ ShowResponse) β†’ List.

Dengan coroutine, saya membuat peta untuk setiap elemen Daftar untuk mengubahnya menjadi Daftar <Disediakan>.

Seperti yang Anda lihat, sebagian besar yang kami lakukan dengan RxJava lebih mudah diterapkan dengan panggilan sinkron. Coroutines bahkan dapat menangani flatMap, yang, saya percaya, adalah salah satu fungsi paling kompleks di RxJava.

Sudah diketahui bahwa coroutine bisa ringan (di sini adalah contoh), tetapi hasilnya membingungkan saya. Dalam contoh ini, RxJava dimulai dalam waktu sekitar 3,1 detik, sementara coroutine membutuhkan waktu sekitar 5,8 detik untuk berjalan di CommonPool.

Hasil ini menimbulkan pertanyaan di hadapan saya bahwa mungkin ada sesuatu yang tidak pantas di dalamnya. Kemudian, saya menemukan ini. Saya menggunakan panggilan retrofit, yang memblokir aliran.

Ada dua cara untuk memperbaiki kesalahan ini, pilihan tergantung pada versi Android Studio yang Anda gunakan. Di Android Studio 3.1, kita perlu memastikan bahwa kita tidak memblokir utas latar belakang. Untuk ini, saya menggunakan perpustakaan ini:
implementasi 'ru.gildor.coroutines: kotlin-coroutines-retrofit: 0.12.0'

Kode ini menciptakan ekstensi fungsi Panggilan retrofit untuk menjeda aliran:

 public suspend fun <T : Any> Call<T>.await(): T { return suspendCancellableCoroutine { continuation -> enqueue(object : Callback<T> { override fun onResponse(call: Call<T>?, response: Response<T?>) { if (response.isSuccessful) { val body = response.body() if (body == null) { continuation.resumeWithException( NullPointerException("Response body is null: $response") ) } else { continuation.resume(body) } } else { continuation.resumeWithException(HttpException(response)) } } override fun onFailure(call: Call<T>, t: Throwable) { // Don't bother with resuming the continuation if it is already cancelled. if (continuation.isCancelled) return continuation.resumeWithException(t) } }) registerOnCompletion(continuation) } } 

Di Android Studio 3.2, Anda dapat memperbarui perpustakaan corutin ke versi 0.25.0. Versi ini memiliki CoroutineContext IO (Anda dapat melihat komentar yang sesuai di kelas UseCase saya).

Berjalan di CommonPool tanpa pemblokiran panggilan memakan waktu 2,3 ​​detik dan 2,4 detik dengan IO dan memblokir panggilan.

gambar

Saya harap artikel ini akan menginspirasi Anda untuk menggunakan corutin, alternatif yang lebih ringan dan mungkin lebih cepat untuk RxJava dan membuatnya sedikit lebih mudah untuk memahami bahwa Anda menulis kode tersinkronisasi yang berjalan secara tidak sinkron.

Source: https://habr.com/ru/post/id421739/


All Articles