أرغب في عرض ثلاثة نماذج من البرمجة غير المتزامنة - عمليات الاسترجاعات والعقود الآجلة والكوروتينات باستخدام مثال لتطبيق ويب بسيط على إطار عمل Vertx. سنكتب الرمز في Kotlin.
افترض أن لدينا تطبيقًا يتلقى سلسلة معينة في طلب HTTP ، ويبحث عن عنوان URL في قاعدة البيانات التي تستخدمه ، ويذهب إلى عنوان URL هذا ويرسل محتوياته مرة أخرى إلى العميل.
تم تصميم Vertx كإطار عمل غير متزامن للتطبيقات المحملة بدرجة كبيرة ، ويستخدم ناقل الحدث netty ، IO الجديد
كما هو معتاد في Vertx ، يرسل Verticle (تناظر الممثل ، إذا كنت تعرف Akka) طلبًا ، يرسل سلسلة حافلة حدث للبحث عن عنوان URL لبعض BusinessVerticle الأخرى ، التي تشارك بالفعل في العمل.
object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.vertx() vertx.deployVerticle(HttpVerticle()) vertx.deployVerticle(BusinessVerticle()) } }
class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter() vertx.createHttpServer() .requestHandler(router) .listen(8080) { result -> if (result.succeeded()) { startFuture.complete() } else { startFuture.fail(result.cause()) } } } private fun createRouter(): Router = Router.router(vertx).apply { get("/").handler(handlerRoot) } private val handlerRoot = Handler<RoutingContext> { rc -> vertx.eventBus().send("my.addr", rc.request().getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.succeeded()) { rc.response().end(resp.result().body()) } else { rc.fail(500) } } } }
في واجهة برمجة التطبيقات القياسية ، تتم جميع عمليات المزامنة من خلال عمليات الاسترجاعات ، وبالتالي سيبدو التنفيذ الأولي لـ BusinessVerticle كما يلي:
class BusinessVerticle : AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url,"/") .send { ar -> if (ar.succeeded()) { val response = ar.result() message.reply(response.bodyAsString()) } else { message.fail(500, ar.cause().message) } } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } } else { message.fail(500, res.cause().message) } } } }
يبدو بصراحة ، وما إلى ذلك - الجحيم معاودة الاتصال ، وخاصة معالجة الأخطاء.
دعونا نحاول تحسين الموقف ، كما يعلمنا معلم رد الاتصال - من خلال تحديد كل رد اتصال بطريقة منفصلة:
class BusinessVerticle: AbstractVerticle() { private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClient override fun start() { vertx.eventBus().consumer<String>("my.addr") { message -> handleMessage(message) } dbclient = JDBCClient.createShared( vertx, JsonObject() .put("url", "jdbc:postgresql://localhost:5432/payroll") .put("driver_class", "org.postgresql.Driver") .put("user", "vala") .put("password", "vala") .put("max_pool_size", 30) ) val options = WebClientOptions() .setUserAgent("My-App/1.2.3") options.isKeepAlive = false webclient = WebClient.create(vertx, options) } private fun handleMessage(message: Message<String>) { dbclient.getConnection { res -> handleConnectionCallback(res, message) } } private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.succeeded()) { val connection = res.result() connection.query("SELECT url FROM payee_company where name='${message.body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.fail(500, res.cause().message) } } private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.succeeded()) { try { val url = res2.result().rows[0].getString("url").removePrefix("http://") webclient .get(url, "/") .send { ar -> handleHttpCallback(ar, message) } } catch (e: Exception) { message.fail(500, e.message) } } else { message.fail(500, res2.cause().message) } } private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.succeeded()) {
حسنا ، لقد تحسنت. ولكن أيضا ما إلى ذلك.
هناك الكثير من الأسطر ، وليس الشفرة القابلة للقراءة بشكل خاص ، تحتاج إلى سحب كائن
الرسالة للاستجابة ، ويمتد خطأ التعامل مع الشفرة.
دعنا نحاول إعادة كتابة هذه حماقة باستخدام
العقود الآجلةالعقود المستقبلية جيدة بشكل خاص لأنه يمكن دمجها بسهولة باستخدام
Future.compose ()أولاً ، سنقوم بترجمة أساليب Vertx القياسية التي تقبل رد الاتصال ولن تُرجع أي شيء إلى أساليب تُرجع المستقبل.
نستفيد من قدرة Kotlin على إضافة طرق إلى الفئات الحالية:
fun JDBCClient.getConnectionF(): Future<SQLConnection> { val f = Future.future<SQLConnection>() getConnection { res -> if (res.succeeded()) { val connection = res.result() f.complete(connection) } else { f.fail(res.cause()) } } return f } fun SQLConnection.queryF(query:String): Future<ResultSet> { val f = Future.future<ResultSet>() query(query) { res -> if (res.succeeded()) { val resultSet = res.result() f.complete(resultSet) } else { f.fail(res.cause()) } } return f } fun <T,M> HttpRequest<T>.sendF(): Future<HttpResponse<M>> { val f = Future.future<HttpResponse<M>>() send() { res -> if (res.succeeded()) { val response = res.result() f.complete(response) } else { f.fail(res.cause()) } } return f }
وتحويل BusinessVerticle.handleMessage لدينا إلى هذا:
private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
يبدو بارد.
رمز بسيط قابل للقراءة. خطأ في التعامل في مكان واحد. إذا لزم الأمر ، يمكنك إجراء رد فعل مختلف على استثناءات مختلفة ، أو ، على سبيل المثال ، وضعها في وظيفة منفصلة.
حلم الشاعر!
ولكن ماذا يحدث إذا كنا بحاجة إلى إنهاء سلسلة
العقود المستقبلية بشروط؟
على سبيل المثال ، إذا لم يكن هناك إدخال مطابق في قاعدة البيانات ، فنحن لا نريد طرح استثناء (ورمز 500 للعميل) ، ولكننا نرجع السلسلة "بلا سجل" بالكود 200.
الطريقة الوحيدة (التي أعرفها) لإنهاء السلسلة من
Future.compose () هي وضع استثناء.
أي تحتاج إلى القيام بشيء من هذا القبيل: حدد نوع الاستثناء ، ورمي هذا الاستثناء إذا لم يكن هناك إدخال في قاعدة البيانات ، تعامل مع هذا الاستثناء بطريقة خاصة.
class NoContentException(message:String):Exception(message) private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.getConnectionF() val resultSet = connection.compose { it.queryF("SELECT url FROM payee_company where name='${message.body()}'") } val url = resultSet.map { if (it.numRows<1) throw NoContentException("No records") it.rows[0].getString("url").removePrefix("http://") } val httpResponse = url.compose { webclient.get(it, "/").sendF() } val content = httpResponse.map { it.bodyAsString() } return content } private fun handleMessage(message: Message<String>) { val content = getContent(message) content.setHandler{res-> if (res.succeeded()) {
إنه يعمل!
لكنه يبدو بالفعل أسوأ - استخدام استثناءات للسيطرة على تدفق التنفيذ ليست جميلة. وإذا كان هناك العديد من هذه الحالات التي تتطلب معالجة منفصلة ، فإن الكود يصبح أقل قابلية للقراءة.
دعنا نحاول أن نفعل نفس الشيء مع كوريلين.
لقد كتب الكثير عن coroutines ، بما في ذلك على حبري (
1 ،
2 ،
3 ، ...) لذلك أنا لن أكتب عنها بشكل منفصل.
تقوم أحدث إصدارات Vertx تلقائيًا بإنشاء إصدارات coroutine لجميع الطرق التي يجب أن يقبلها رد الاتصال.
نحن نربط المكتبات
"Vertx لانغ-kotlin-coroutines"
"Vertx لانغ-kotlin"
واحصل ، على سبيل المثال
JDBCClient.getConnectionAwait() SQLConnection.queryAwait()
إلخ
ثم تتحول أساليب معالجة الرسائل الخاصة بنا إلى شيء لطيف وبسيط:
private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.reply(content) } catch(e:Exception){ message.fail(500, e.message) } } private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
حسنًا ، تحتاج إلى تغيير المكالمة من خلال توفير سياق coroutine:
vertx.eventBus().consumer<String>("my.addr") { message -> GlobalScope.launch(vertx.dispatcher()) { handleMessage(message)} }
ما الذي يحدث هنا؟
جميع هذه الأساليب مع Await استدعاء الرمز بشكل غير متزامن ، انتظر النتيجة منه ، وبينما ينتظرون ، يتحول مؤشر الترابط إلى تنفيذ coroutine آخر.
إذا نظرنا إلى أسفل الغطاء ، يبدو كما يلي:
suspend fun SQLClient.getConnectionAwait(): SQLConnection { return awaitResult { this.getConnection(it) } } suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.succeeded()) return asyncResult.result() else throw asyncResult.cause() } suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.invoke(Handler { t -> cont.resume(t) }) } catch (e: Exception) { cont.resumeWithException(e) } } }
شيء مشابه لتطبيقنا المكتوب ذاتيا مع العقود المستقبلية.
لكن هنا نحصل على الكود العادي - السلسلة كنوع المرتجعات (بدلاً من المستقبل) ، جرّب / امسك بدلاً من رد الاتصال القبيح مع AsyncResult
وإذا كنا بحاجة إلى إيقاف سلسلة التنفيذ في الوسط ، فستبدو طبيعية دون أي استثناءات:
private suspend fun getContent(message: Message<String>): String { val connection = dbclient.getConnectionAwait() val resultSet = connection.queryAwait("SELECT url FROM payee_company where name='${message.body()}'") if (resultSet.numRows<1) return "No records" val url = resultSet.rows[0].getString("url").removePrefix("http://") val httpResponse = webclient.get(url, "/").sendAwait() val content = httpResponse.bodyAsString() return content }
في رأيي ، عظيم!