Três paradigma de programação assíncrona no Vertx

Eu quero mostrar três paradigmas de programação assíncrona - retornos de chamada, futuros, corotinas, usando um exemplo de um aplicativo da Web simples na estrutura Vertx. Vamos escrever o código no Kotlin.

Suponha que tenhamos um aplicativo que receba uma determinada string em uma solicitação HTTP, pesquise uma URL no banco de dados usando-a, vá para essa URL e envie seu conteúdo de volta ao cliente.
O Vertx é concebido como uma estrutura assíncrona para aplicativos altamente carregados, usa netty, novo IO, barramento de eventos

Como é habitual na Vertx, um Verticle (um análogo do ator, se você conhece Akka) recebe uma solicitação, envia uma sequência de bus de eventos para procurar a URL de algum outro BusinessVerticle, que está envolvido no trabalho real.

object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } } 

 class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } } 

Na API padrão, toda a assincronia é feita através dos retornos de chamada; portanto, a implementação inicial do BusinessVerticle se parecerá com isso:

 class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } } 

Parece francamente, mais ou menos - retornos infernais, especialmente manipulação de erros.

Vamos tentar melhorar a situação, como o guru de retorno de chamada nos ensina - selecionando cada retorno de chamada em um método separado:

  class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) { // Obtain response val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } 

Bem, ficou melhor. Mas também mais ou menos.

Muitas linhas, não o código particularmente legível, você precisa arrastar o objeto de mensagem para obter resposta, a manipulação de erros espalhada pelo código.

Vamos tentar reescrever essa porcaria usando Futures
Os futuros são especialmente bons porque podem ser facilmente combinados usando Future.compose ()

Primeiro, traduziremos os métodos padrão da Vertx que aceitam retorno de chamada e não retornam nada em métodos que retornam Futuro.

Aproveitamos a capacidade de Kotlin de adicionar métodos às classes existentes:

 fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f } 

E transforme nossa BusinessVerticle.handleMessage nisso:

  private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { message.fail(500, res.cause().message) } } } private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } 

Parece legal.

Código simples e legível. Tratamento de erros em um só lugar. Se necessário, você pode fazer uma reação diferente a diferentes exceções ou, por exemplo, colocá-lo em uma função separada.

O sonho do poeta!

Mas o que acontece se precisarmos encerrar a cadeia de futuros por alguma condição?
Por exemplo, se não houver entrada correspondente no banco de dados, não queremos lançar uma exceção (e codificar 500 para o cliente), mas retornar a sequência "No record" com o código 200.

A única maneira (que eu sei) de encerrar a cadeia de Future.compose () é lançar uma exceção.

I.e. você precisa fazer algo assim: determinar seu tipo de exceção, lançar essa exceção se não houver entrada no banco de dados, tratar essa exceção de uma maneira especial.

 class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { if (res.cause() is NoContentException) message.reply(res.cause().message) else message.fail(500, res.cause().message) } } } 

Isso funciona!

Mas já parece pior - usar exceções para controlar o fluxo de execução não é bonito. E se houver muitos casos que exijam processamento separado, o código se tornará muito menos legível.

Vamos tentar fazer o mesmo com as corotinas Kotlin.
Muito já foi escrito sobre corotinas, inclusive sobre Habré ( 1 , 2 , 3 , ...), portanto, não escreverei sobre elas separadamente.

As versões mais recentes do Vertx geram automaticamente versões de corotina de todos os métodos que um retorno de chamada deve aceitar.

Nós conectamos bibliotecas
«vertx-lang-kotlin-coroutines»
«vertx-lang-kotlin»

e obtenha, por exemplo

 JDBCClient.getConnectionAwait() SQLConnection.queryAwait() 

etc.

Então, nossos métodos de processamento de mensagens se tornam algo simples e agradável:

 private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content } 

Bem, você precisa alterar a chamada fornecendo um contexto de rotina:

 vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} } 

O que está acontecendo aqui?

Todos esses métodos com Await chamam o código de forma assíncrona, aguardam o resultado e, enquanto esperam, o thread alterna para a execução de outra corotina.

Se olharmos sob o capô, será assim:

 suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } } 

Algo semelhante à nossa implementação auto-escrita com futuros.

Mas aqui temos o código normal - String como o tipo de retorno (em vez de Future), try / catch em vez do retorno de chamada feio com AsyncResult

E se precisamos parar a cadeia de execução no meio, parece natural, sem exceções:

  private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content } 

Na minha opinião, ótimo!

Source: https://habr.com/ru/post/pt449092/


All Articles