ثلاثة نماذج من البرمجة غير المتزامنة في Vertx

أرغب في عرض ثلاثة نماذج من البرمجة غير المتزامنة - عمليات الاسترجاعات والعقود الآجلة والكوروتينات باستخدام مثال لتطبيق ويب بسيط على إطار عمل Vertx. سنكتب الرمز في Kotlin.

افترض أن لدينا تطبيقًا يتلقى سلسلة معينة في طلب HTTP ، ويبحث عن عنوان URL في قاعدة البيانات التي تستخدمه ، ويذهب إلى عنوان URL هذا ويرسل محتوياته مرة أخرى إلى العميل.
تم تصميم Vertx كإطار عمل غير متزامن للتطبيقات المحملة بدرجة كبيرة ، ويستخدم ناقل الحدث netty ، IO الجديد

كما هو معتاد في Vertx ، يرسل Verticle (تناظر الممثل ، إذا كنت تعرف Akka) طلبًا ، يرسل سلسلة حافلة حدث للبحث عن عنوان URL لبعض BusinessVerticle الأخرى ، التي تشارك بالفعل في العمل.

object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } } 

 class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } } 

في واجهة برمجة التطبيقات القياسية ، تتم جميع عمليات المزامنة من خلال عمليات الاسترجاعات ، وبالتالي سيبدو التنفيذ الأولي لـ BusinessVerticle كما يلي:

 class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } } 

يبدو بصراحة ، وما إلى ذلك - الجحيم معاودة الاتصال ، وخاصة معالجة الأخطاء.

دعونا نحاول تحسين الموقف ، كما يعلمنا معلم رد الاتصال - من خلال تحديد كل رد اتصال بطريقة منفصلة:

  class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) { // Obtain response val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } 

حسنا ، لقد تحسنت. ولكن أيضا ما إلى ذلك.

هناك الكثير من الأسطر ، وليس الشفرة القابلة للقراءة بشكل خاص ، تحتاج إلى سحب كائن الرسالة للاستجابة ، ويمتد خطأ التعامل مع الشفرة.

دعنا نحاول إعادة كتابة هذه حماقة باستخدام العقود الآجلة
العقود المستقبلية جيدة بشكل خاص لأنه يمكن دمجها بسهولة باستخدام Future.compose ()

أولاً ، سنقوم بترجمة أساليب Vertx القياسية التي تقبل رد الاتصال ولن تُرجع أي شيء إلى أساليب تُرجع المستقبل.

نستفيد من قدرة Kotlin على إضافة طرق إلى الفئات الحالية:

 fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f } 

وتحويل BusinessVerticle.handleMessage لدينا إلى هذا:

  private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { message.fail(500, res.cause().message) } } } private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } 

يبدو بارد.

رمز بسيط قابل للقراءة. خطأ في التعامل في مكان واحد. إذا لزم الأمر ، يمكنك إجراء رد فعل مختلف على استثناءات مختلفة ، أو ، على سبيل المثال ، وضعها في وظيفة منفصلة.

حلم الشاعر!

ولكن ماذا يحدث إذا كنا بحاجة إلى إنهاء سلسلة العقود المستقبلية بشروط؟
على سبيل المثال ، إذا لم يكن هناك إدخال مطابق في قاعدة البيانات ، فنحن لا نريد طرح استثناء (ورمز 500 للعميل) ، ولكننا نرجع السلسلة "بلا سجل" بالكود 200.

الطريقة الوحيدة (التي أعرفها) لإنهاء السلسلة من Future.compose () هي وضع استثناء.

أي تحتاج إلى القيام بشيء من هذا القبيل: حدد نوع الاستثناء ، ورمي هذا الاستثناء إذا لم يكن هناك إدخال في قاعدة البيانات ، تعامل مع هذا الاستثناء بطريقة خاصة.

 class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) { // Obtain response val response = res.result() message.reply(response) } else { if (res.cause() is NoContentException) message.reply(res.cause().message) else message.fail(500, res.cause().message) } } } 

إنه يعمل!

لكنه يبدو بالفعل أسوأ - استخدام استثناءات للسيطرة على تدفق التنفيذ ليست جميلة. وإذا كان هناك العديد من هذه الحالات التي تتطلب معالجة منفصلة ، فإن الكود يصبح أقل قابلية للقراءة.

دعنا نحاول أن نفعل نفس الشيء مع كوريلين.
لقد كتب الكثير عن coroutines ، بما في ذلك على حبري ( 1 ، 2 ، 3 ، ...) لذلك أنا لن أكتب عنها بشكل منفصل.

تقوم أحدث إصدارات Vertx تلقائيًا بإنشاء إصدارات coroutine لجميع الطرق التي يجب أن يقبلها رد الاتصال.

نحن نربط المكتبات
"Vertx لانغ-kotlin-coroutines"
"Vertx لانغ-kotlin"

واحصل ، على سبيل المثال

 JDBCClient.getConnectionAwait() SQLConnection.queryAwait() 

إلخ

ثم تتحول أساليب معالجة الرسائل الخاصة بنا إلى شيء لطيف وبسيط:

 private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content } 

حسنًا ، تحتاج إلى تغيير المكالمة من خلال توفير سياق coroutine:

 vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} } 

ما الذي يحدث هنا؟

جميع هذه الأساليب مع Await استدعاء الرمز بشكل غير متزامن ، انتظر النتيجة منه ، وبينما ينتظرون ، يتحول مؤشر الترابط إلى تنفيذ coroutine آخر.

إذا نظرنا إلى أسفل الغطاء ، يبدو كما يلي:

 suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } } 

شيء مشابه لتطبيقنا المكتوب ذاتيا مع العقود المستقبلية.

لكن هنا نحصل على الكود العادي - السلسلة كنوع المرتجعات (بدلاً من المستقبل) ، جرّب / امسك بدلاً من رد الاتصال القبيح مع AsyncResult

وإذا كنا بحاجة إلى إيقاف سلسلة التنفيذ في الوسط ، فستبدو طبيعية دون أي استثناءات:

  private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content } 

في رأيي ، عظيم!

Source: https://habr.com/ru/post/ar449092/


All Articles