Ich möchte drei Paradigmen der asynchronen Programmierung zeigen - Rückrufe, Futures, Coroutinen am Beispiel einer einfachen Webanwendung im Vertx-Framework. Wir werden den Code in Kotlin schreiben.
Angenommen, wir haben eine Anwendung, die eine bestimmte Zeichenfolge in einer HTTP-Anforderung empfängt, nach einer URL in der Datenbank sucht, zu dieser URL wechselt und ihren Inhalt an den Client zurücksendet.
Vertx ist als asynchrones Framework für hoch ausgelastete Anwendungen konzipiert und verwendet einen kleinen, neuen E / A-Ereignisbus
Wie in Vertx üblich, empfängt ein Verticle (ein Analogon des Schauspielers, wenn Sie Akka kennen) eine Anfrage und sendet eine Ereignisbuszeichenfolge, um nach der URL eines anderen BusinessVerticle zu suchen, das tatsächlich an der Arbeit beteiligt ist.
object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } }
class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } }
In der Standard-API erfolgt die gesamte Asynchronität über die Rückrufe. Die erste Implementierung von BusinessVerticle sieht also folgendermaßen aus:
class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } }
Es sieht ehrlich gesagt so lala aus - Rückrufe Hölle, vor allem Fehlerbehandlung.
Versuchen wir, die Situation zu verbessern, wie uns der Rückruf-Guru lehrt - indem wir jeden Rückruf in einer separaten Methode auswählen:
class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) {
Nun, es wurde besser. Aber auch so lala.
Bei vielen Zeilen, nicht besonders lesbarem Code, müssen Sie das
Nachrichtenobjekt zur Antwort mitziehen. Die Fehlerbehandlung verteilt sich auf den Code.
Lassen Sie uns versuchen, diesen Mist mit
Futures neu zu schreiben
Futures sind besonders gut, weil sie mit
Future.compose () einfach kombiniert werden
können.Zunächst übersetzen wir die Standard-Vertx-Methoden, die einen Rückruf akzeptieren, und geben nichts in Methoden zurück, die Future zurückgeben.
Wir nutzen Kotlins Fähigkeit, Methoden zu vorhandenen Klassen hinzuzufügen:
fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f }
Und verwandeln Sie unsere BusinessVerticle.handleMessage in Folgendes:
private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
Es sieht cool aus.
Einfacher, lesbarer Code. Fehlerbehandlung an einer Stelle. Bei Bedarf können Sie auf verschiedene Ausnahmen unterschiedlich reagieren oder sie beispielsweise in eine separate Funktion einfügen.
Der Traum des Dichters!
Aber was passiert, wenn wir die
Futures- Kette unter bestimmten Bedingungen beenden müssen?
Wenn beispielsweise kein entsprechender Eintrag in der Datenbank vorhanden ist, möchten wir keine Ausnahme auslösen (und Code 500 an den Client senden), sondern die Zeichenfolge "Kein Datensatz" mit Code 200 zurückgeben.
Die einzige Möglichkeit (die ich kenne), die Kette von
Future.compose () zu
beenden, besteht darin, eine Ausnahme
auszulösen .
Das heißt, Sie müssen Folgendes tun: Bestimmen Sie Ihren Ausnahmetyp, lösen Sie diese Ausnahme aus, wenn kein Eintrag in der Datenbank vorhanden ist, und behandeln Sie diese Ausnahme auf besondere Weise.
class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
Es funktioniert!
Aber es sieht schon schlimmer aus - Ausnahmen zur Steuerung des Ausführungsflusses zu verwenden, ist nicht schön. Und wenn es viele solcher Fälle gibt, die eine separate Verarbeitung erfordern, wird der Code viel weniger lesbar.
Versuchen wir, dasselbe mit Kotlin-Coroutinen zu tun.
Es wurde viel über Koroutinen geschrieben, auch über Habré (
1 ,
2 ,
3 , ...), daher werde ich nicht separat darüber schreiben.
Die neuesten Versionen von Vertx generieren automatisch Coroutine-Versionen aller Methoden, die ein Rückruf akzeptieren sollte.
Wir verbinden Bibliotheken
'vertx-lang-kotlin-coroutines'
'vertx-lang-kotlin'
und zum Beispiel bekommen
JDBCClient.getConnectionAwait() SQLConnection.queryAwait()
usw.
Dann werden unsere Nachrichtenverarbeitungsmethoden zu etwas Schönem und Einfachem:
private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
Nun, Sie müssen den Anruf ändern, indem Sie einen Coroutine-Kontext bereitstellen:
vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} }
Was ist hier los?
Alle diese Methoden mit Await rufen den Code asynchron auf, warten auf das Ergebnis, und während sie warten, wechselt der Thread zur Ausführung einer anderen Coroutine.
Wenn wir unter die Haube schauen, sieht es so aus:
suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } }
Ähnlich wie bei unserer selbstgeschriebenen Implementierung mit Futures.
Aber hier erhalten wir den normalen Code - String als Rückgabetyp (anstelle von Future), try / catch anstelle des hässlichen Rückrufs mit AsyncResult
Und wenn wir die Ausführungskette in der Mitte stoppen müssen, sieht es ausnahmslos natürlich aus:
private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
Meiner Meinung nach super!