Saya ingin menunjukkan tiga paradigma pemrograman asinkron - callback, futures, coroutine menggunakan contoh aplikasi web sederhana pada kerangka Vertx. Kami akan menulis kode di Kotlin.
Misalkan kita memiliki aplikasi yang menerima string tertentu dalam permintaan HTTP, mencari URL dalam database yang menggunakannya, pergi ke URL itu dan mengirimkan isinya kembali ke klien.
Vertx dipahami sebagai kerangka kerja asinkron untuk aplikasi-aplikasi yang sarat muatan, menggunakan netty, IO baru, bus acara
Seperti biasa di Vertx, satu Verticle (analog aktor, jika Anda tahu Akka) menerima permintaan, mengirimkan string bus peristiwa untuk mencari URL beberapa BusinessVerticle lain, yang terlibat dalam pekerjaan yang sebenarnya.
object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } }
class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } }
Di API standar, semua asinkron dilakukan melalui callback, jadi implementasi awal BusinessVerticle akan terlihat seperti ini:
class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } }
Itu terlihat terus terang, begitu-begitu - panggilan balik neraka, terutama penanganan kesalahan.
Mari kita coba untuk memperbaiki situasi, seperti yang diajarkan oleh guru panggilan balik - dengan memilih setiap panggilan balik dalam metode terpisah:
class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) {
Baiklah, ini menjadi lebih baik. Tapi juga begitu-begitu.
Banyak baris, bukan kode yang mudah dibaca, Anda harus menyeret objek
pesan untuk ditanggapi, penanganan kesalahan tersebar di kode.
Mari kita coba menulis ulang omong kosong ini menggunakan
FuturesFutures sangat baik karena mereka dapat dengan mudah digabungkan menggunakan
Future.compose ()Pertama, kami akan menerjemahkan metode Vertx standar yang menerima panggilan balik dan tidak mengembalikan apa pun ke metode yang mengembalikan Masa Depan.
Kami memanfaatkan kemampuan Kotlin untuk menambahkan metode ke kelas yang ada:
fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f }
Dan ubah BusinessVerticle.handleMessage kami menjadi ini:
private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
Itu terlihat keren.
Kode sederhana dan mudah dibaca. Kesalahan penanganan di satu tempat. Jika perlu, Anda dapat membuat reaksi yang berbeda terhadap pengecualian yang berbeda, atau, katakanlah, letakkan dalam fungsi yang terpisah.
Mimpi si penyair!
Tetapi apa yang terjadi jika kita perlu menghentikan rantai
Berjangka dengan suatu syarat?
Misalnya, jika tidak ada entri yang sesuai dalam database, kami tidak ingin melempar pengecualian (dan kode 500 ke klien), tetapi mengembalikan string "Tidak ada catatan" dengan kode 200.
Satu-satunya cara (yang saya tahu) untuk mengakhiri rantai dari
Future.compose () adalah dengan melemparkan pengecualian.
Yaitu Anda perlu melakukan sesuatu seperti ini: tentukan jenis pengecualian Anda, lemparkan pengecualian ini jika tidak ada entri dalam basis data, tangani pengecualian ini dengan cara khusus.
class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
Itu berhasil!
Tapi itu sudah terlihat lebih buruk - menggunakan pengecualian untuk mengontrol aliran eksekusi tidak indah. Dan jika akan ada banyak kasus seperti itu yang membutuhkan pemrosesan terpisah, kode akan menjadi jauh lebih mudah dibaca.
Mari kita coba melakukan hal yang sama dengan coroutine Kotlin.
Banyak yang telah ditulis tentang coroutine, termasuk di Habré (
1 ,
2 ,
3 , ...) oleh karena itu saya tidak akan menulis tentang mereka secara terpisah.
Versi terbaru dari Vertx secara otomatis menghasilkan versi coroutine dari semua metode yang harus menerima panggilan balik.
Kami menghubungkan perpustakaan
'vertx-lang-kotlin-coroutines'
'vertx-lang-kotlin'
dan dapatkan, misalnya
JDBCClient.getConnectionAwait() SQLConnection.queryAwait()
dll.
Kemudian metode pemrosesan pesan kami berubah menjadi sesuatu yang menyenangkan dan sederhana:
private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
Nah, Anda perlu mengubah panggilan dengan memberikan konteks coroutine:
vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} }
Apa yang sedang terjadi di sini?
Semua metode ini dengan Await memanggil kode secara tidak sinkron, menunggu hasil darinya, dan sementara mereka menunggu, utas beralih ke eksekusi coroutine lain.
Jika kita melihat di bawah tenda, maka akan terlihat seperti ini:
suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } }
Sesuatu yang mirip dengan implementasi yang kami tulis sendiri dengan Futures.
Tapi di sini kita mendapatkan kode normal - String sebagai tipe pengembalian (bukan Future), coba / tangkap alih-alih panggilan balik jelek dengan AsyncResult
Dan jika kita perlu menghentikan rantai eksekusi di tengah, itu terlihat alami, tanpa kecuali:
private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
Menurut saya, bagus!