Tres paradigmas de programación asincrónica en Vertx

Quiero mostrar tres paradigmas de programación asincrónica: devoluciones de llamada, futuros, rutinas en el ejemplo de una aplicación web simple en el marco Vertx. Escribiremos el código en Kotlin.

Supongamos que tenemos una aplicación que recibe una determinada cadena en una solicitud HTTP, busca una URL en la base de datos que la usa, va a esa URL y envía su contenido al cliente.
Vertx está concebido como un marco asincrónico para aplicaciones altamente cargadas, utiliza un nuevo y pequeño bus de eventos IO

Como es habitual en Vertx, un Verticle (un análogo del actor, si conoce a Akka) recibe una solicitud, envía una cadena de bus de eventos para buscar la URL de algún otro BusinessVerticle, que realmente se dedica al trabajo.

object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } } 

 class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } } 

En la API estándar, toda la asincronía se realiza a través de las devoluciones de llamada, por lo que la implementación inicial de BusinessVerticle se verá así:

 class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } } 

Se ve francamente, más o menos: devolución de llamadas, especialmente el manejo de errores.

Intentemos mejorar la situación, como nos enseña el gurú de devolución de llamada, seleccionando cada devolución de llamada en un método separado:

  class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) { // Obtain response val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } 

Bueno, se puso mejor. Pero también más o menos.

Muchas líneas, no un código particularmente legible, necesita arrastrar el objeto del mensaje para obtener una respuesta, el manejo de errores se extendió por el código.

Intentemos reescribir esta basura usando Futuros
Los futuros son especialmente buenos porque se pueden combinar fácilmente con Future.compose ()

Primero, traduciremos los métodos estándar de Vertx que aceptan una devolución de llamada y no devuelven nada a métodos que devuelvan Futuro.

Aprovechamos la capacidad de Kotlin para agregar métodos a las clases existentes:

 fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f } 

Y convierta nuestro BusinessVerticle.handleMessage en esto:

  private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { message.fail(500, res.cause().message) } } } private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } 

Se ve genial

Código simple y legible. Error al manejar en un solo lugar. Si es necesario, puede hacer una reacción diferente a diferentes excepciones o, por ejemplo, ponerla en una función separada.

El sueño del poeta!

Pero, ¿qué sucede si necesitamos terminar la cadena de Futuros por alguna condición?
Por ejemplo, si no hay una entrada correspondiente en la base de datos, no queremos lanzar una excepción (y el código 500 al cliente), pero devolvemos la cadena "Sin registro" con el código 200.

La única forma (que sé) de terminar la cadena de Future.compose () es lanzar una excepción.

Es decir necesita hacer algo como esto: determine su tipo de excepción, arroje esta excepción si no hay entrada en la base de datos, maneje esta excepción de una manera especial.

 class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { if (res.cause() is NoContentException) message.reply(res.cause().message) else message.fail(500, res.cause().message) } } } 

Funciona!

Pero ya se ve peor: usar excepciones para controlar el flujo de ejecución no es hermoso. Y si habrá muchos de estos casos que requieren un procesamiento por separado, el código será mucho menos legible.

Tratemos de hacer lo mismo con las corutinas de Kotlin.
Se ha escrito mucho sobre las corutinas, incluso en Habré ( 1 , 2 , 3 , ...), por lo tanto, no escribiré sobre ellas por separado.

Las últimas versiones de Vertx generan automáticamente versiones de rutina de todos los métodos que una devolución de llamada debe aceptar.

Conectamos bibliotecas
'vertx-lang-kotlin-coroutines'
'vertx-lang-kotlin'

y obtener, por ejemplo

 JDBCClient.getConnectionAwait() SQLConnection.queryAwait() 

etc.

Luego, nuestros métodos de procesamiento de mensajes se convierten en algo agradable y simple:

 private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content } 

Bueno, debe cambiar la llamada proporcionando un contexto de rutina:

 vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} } 

¿Qué está pasando aquí?

Todos estos métodos con Await llaman al código de forma asincrónica, esperan el resultado y, mientras esperan, el hilo cambia a la ejecución de otra rutina.

Si miramos debajo del capó, entonces se ve así:

 suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } } 

Algo similar a nuestra implementación autoescrita con Futures.

Pero aquí obtenemos el código normal: cadena como tipo de retorno (en lugar de futuro), intente / atrapar en lugar de la devolución de llamada fea con AsyncResult

Y si necesitamos detener la cadena de ejecución en el medio, parece natural, sin ninguna excepción:

  private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content } 

En mi opinión, genial!

Source: https://habr.com/ru/post/449092/


All Articles