Je veux montrer trois paradigmes de programmation asynchrone - rappels, futurs, coroutines en utilisant un exemple d'une simple application web sur le framework Vertx. Nous allons écrire le code dans Kotlin.
Supposons que nous ayons une application qui reçoit une certaine chaîne dans une requête HTTP, recherche une URL dans la base de données qui l'utilise, accède à cette URL et renvoie son contenu au client.
Vertx est conçu comme un framework asynchrone pour des applications très chargées, utilise netty, nouvelle IO, bus d'événements
Comme d'habitude dans Vertx, un Verticle (un analogue de l'acteur, si vous connaissez Akka) reçoit une demande, envoie une chaîne de bus d'événements pour rechercher l'URL d'un autre BusinessVerticle, qui est réellement engagé dans le travail.
object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } }
class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } }
Dans l'API standard, toute l'asynchronie se fait via les rappels, donc l'implémentation initiale de BusinessVerticle ressemblera à ceci:
class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } }
Cela a l'air franchement, tant bien que mal - les rappels d'enfer, en particulier la gestion des erreurs.
Essayons d'améliorer la situation, comme nous l'apprend le gourou du rappel - en sélectionnant chaque rappel dans une méthode distincte:
class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) {
Eh bien, ça s'est amélioré. Mais aussi moyen.
Beaucoup de lignes, pas de code particulièrement lisible, vous devez faire glisser l'objet de
message le long de la réponse, la gestion des erreurs répartie sur le code.
Essayons de réécrire cette merde en utilisant
FuturesLes futurs sont particulièrement bons car ils peuvent être facilement combinés à l'aide de
Future.compose ()Tout d'abord, nous allons traduire les méthodes Vertx standard qui acceptent un rappel et ne renvoient rien en méthodes qui renvoient Future.
Nous profitons de la capacité de Kotlin à ajouter des méthodes aux classes existantes:
fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f }
Et transformez notre BusinessVerticle.handleMessage en ceci:
private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
Ça a l'air cool.
Code simple et lisible. Gestion des erreurs en un seul endroit. Si nécessaire, vous pouvez réagir différemment à différentes exceptions ou, par exemple, le mettre dans une fonction distincte.
Le rêve du poète!
Mais que se passe-t-il si nous devons mettre fin à la chaîne
Futures par une condition?
Par exemple, s'il n'y a pas d'entrée correspondante dans la base de données, nous ne voulons pas lever d'exception (et coder 500 au client), mais renvoyer la chaîne "No record" avec le code 200.
La seule façon (que je connais) de mettre fin à la chaîne de
Future.compose () est de
lever une exception.
C'est-à-dire vous devez faire quelque chose comme ceci: déterminer votre type d'exception, lever cette exception s'il n'y a pas d'entrée dans la base de données, gérer cette exception d'une manière spéciale.
class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
Ça marche!
Mais cela semble déjà pire - utiliser des exceptions pour contrôler le flux d'exécution n'est pas beau. Et si de nombreux cas de ce genre nécessitent un traitement séparé, le code deviendra beaucoup moins lisible.
Essayons de faire de même avec les coroutines Kotlin.
Beaucoup a été écrit sur les coroutines, y compris sur Habré (
1 ,
2 ,
3 , ...) donc je n’écrirai pas à leur sujet séparément.
Les dernières versions de Vertx génèrent automatiquement des versions coroutine de toutes les méthodes qu'un rappel devrait accepter.
Nous connectons les bibliothèques
«vertx-lang-kotlin-coroutines»
«vertx-lang-kotlin»
et obtenez, par exemple
JDBCClient.getConnectionAwait() SQLConnection.queryAwait()
etc.
Ensuite, nos méthodes de traitement des messages se transforment en quelque chose de bien simple:
private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
Eh bien, vous devez changer l'appel en fournissant un contexte coroutine:
vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} }
Que se passe-t-il ici?
Toutes ces méthodes avec Await appellent le code de manière asynchrone, attendent le résultat de celui-ci, et pendant qu'elles attendent, le thread passe à l'exécution d'une autre coroutine.
Si nous regardons sous le capot, cela ressemble à ceci:
suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } }
Quelque chose de similaire à notre implémentation auto-écrite avec Futures.
Mais ici, nous obtenons le code normal - String comme type de retour (au lieu de Future), essayez / catch au lieu du vilain rappel avec AsyncResult
Et si nous devons arrêter la chaîne d'exécution au milieu, cela semble naturel, sans aucune exception:
private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
À mon avis, super!