我想在Vertx框架上的简单Web应用程序示例中展示异步编程的三种范例-回调,期货,协程。 我们将用Kotlin编写代码。
假设我们有一个应用程序,该应用程序在HTTP请求中接收某个字符串,它在数据库中查找URL,转到该URL,然后将其内容发送回客户端。
Vertx被认为是用于高负载应用程序的异步框架,它使用netty,新IO,事件总线
按照Vertx的惯例,一个Verticle(演员的类似物,如果您知道Akka)接收到一个请求,然后发送事件总线字符串以搜索其他从事实际工作的BusinessVerticle的URL。
object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } }
class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } }
在标准API中,所有异步都是通过回调完成的,因此BusinessVerticle的初始实现将如下所示:
class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } }
坦白地说,它看起来很一般-回调地狱,尤其是错误处理。
让我们尝试改善情况,正如回调专家教给我们的-通过在单独的方法中选择每个回调:
class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) {
好吧,它变得更好。 也是马马虎虎。
很多行,而不是特别易读的代码,都需要拖动
消息对象以进行响应,错误处理遍布代码。
让我们尝试使用
Future重写此废话
期货特别好,因为它们可以使用
Future.compose()轻松组合
首先,我们将接受回调且不返回任何内容的标准Vertx方法转换为返回Future的方法。
我们利用Kotlin向现有类添加方法的能力:
fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f }
并将我们的BusinessVerticle.handleMessage转换为:
private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
看起来很酷。
简单易读的代码。 一处处理错误。 如有必要,您可以对不同的异常做出不同的反应,或者说,将其放在单独的函数中。
诗人的梦想!
但是,如果我们需要在某种条件下终止
期货链会怎样?
例如,如果数据库中没有相应的条目,我们不想抛出异常(并将代码500传递给客户端),而是返回带有代码200的字符串“ No record”。
从
Future.compose()结束链的唯一方法(我知道
)是引发异常。
即 您需要执行以下操作:确定您的异常类型,如果数据库中没有条目,则抛出该异常,以特殊方式处理此异常。
class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
有效!
但是它看起来已经很糟了-使用异常来控制执行流程并不完美。 而且,如果有许多此类情况需要单独处理,则代码的可读性将大大降低。
让我们尝试对Kotlin协程进行同样的操作。
关于协程的文章很多,包括关于Habré(
1,2,3 ,...)的文章,因此我不会单独写它们。
Vertx的最新版本会自动生成回调应接受的所有方法的协程版本。
我们连接图书馆
'vertx-lang-kotlin-协程'
'vertx-lang-kotlin'
并得到例如
JDBCClient.getConnectionAwait() SQLConnection.queryAwait()
等
然后,我们的消息处理方法变得简单而优美:
private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
好了,您需要通过提供协程上下文来更改呼叫:
vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} }
这是怎么回事
所有这些带有Await的方法都异步调用代码,等待代码结果,然后等待,线程切换到另一个协程的执行。
如果我们在幕后看,则看起来像这样:
suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } }
与我们使用Futures自行编写的实现类似。
但是在这里,我们得到了正常的代码-将String作为返回类型(而不是Future),尝试使用/ catch而不是带有AsyncResult的难看的回调
而且,如果我们需要在中间停止执行链,那么它看起来很自然,没有任何例外:
private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
在我看来,太好了!