Reactor, WebFlux, Kotlin Coroutines o Asynchrony con un ejemplo simple



Muchos servicios en el mundo moderno, en su mayor parte, "no hacen nada". Sus tareas se reducen a solicitudes de otras bases de datos / servicios / cachés y la agregación de todos estos datos de acuerdo con varias reglas y diversas lógicas comerciales. Por lo tanto, no es sorprendente que aparezcan idiomas como Golang, con un conveniente sistema competitivo incorporado que facilita la organización del código sin bloqueo.


En el mundo JVM, las cosas son un poco más complicadas. Hay una gran cantidad de marcos y bibliotecas que bloquean los hilos cuando se usan. Entonces, stdlib en sí mismo puede hacer lo mismo a veces. Y en Java no hay un mecanismo similar al de las gorutinas en Golang.


Sin embargo, JVM se está desarrollando activamente y aparecen nuevas oportunidades interesantes. Hay Kotlin con corutinas, que en su uso son muy similares a las gorutinas de Golang (aunque se implementan de una manera completamente diferente). Hay JEP Loom, que traerá fibras a la JVM en el futuro. Uno de los marcos web más populares, Spring, recientemente agregó la capacidad de crear servicios completamente sin bloqueo en Webflux. Y con el reciente lanzamiento de Spring boot 2.2, la integración con Kotlin es aún mejor.


Propongo, utilizando el ejemplo de un pequeño servicio para transferir dinero de una tarjeta a otra, escribir una aplicación en Spring boot 2.2 y Kotlin para la integración con varios servicios externos.


Es bueno si ya está familiarizado con Java, Kotlin, Gradle, Spring, Spring boot 2, Reactor, Web flux, Tomcat, Netty, Kotlin oroutines, Gradle Kotlin DSL o incluso tiene un Ph.D. Pero si no, no importa. El código se simplificará al máximo, e incluso si no eres del mundo JVM, espero que todo esté claro para ti.


Si planea escribir un servicio usted mismo, asegúrese de que todo lo que necesita esté instalado:


  • Java 8+
  • Docker y Docker Compose;
  • cURL y preferiblemente jq ;
  • Git
  • preferiblemente un IDE para Kotlin (Intellij Idea, Eclipse, VS, vim , etc.). Pero es posible en un cuaderno.

Los ejemplos contendrán ambos espacios en blanco para la implementación en el servicio y una implementación ya escrita. Primero, ejecute la instalación y el ensamblaje y observe más de cerca los servicios y sus API.


El ejemplo de los servicios y la API en sí se hace solo con fines ilustrativos; ¡no transfiera todo AS IS a su producto!

Primero, clonamos el repositorio con servicios para nosotros, la integración con la que lo haremos, y vamos al directorio:


 git clone https://github.com/evgzakharov/spring-demo-services && cd spring-demo-services 

En una terminal separada, recopilamos todas las aplicaciones usando gradle , donde después de una compilación exitosa, todos los servicios se lanzarán usando docker-compose .


 ./gradlew build && docker-compose up 

Mientras todo está descargado e instalado, considere un proyecto con servicios.



A la entrada del servicio (servicio de demostración) se recibirá una solicitud con un token, los números de tarjeta para la transferencia y el monto a transferir entre tarjetas:


 { "authToken": "auth-token1", "cardFrom": "55593478", "cardTo": "55592020", "amount": "10.1" } 

authToken token authToken , debe ir al servicio AUTH y obtener userId , con el que luego puede hacer una solicitud al USER y extraer toda la información adicional sobre el usuario. AUTH también devolverá información sobre a cuál de los tres servicios podemos acceder. Ejemplo de respuesta de AUTH :


 { "userId": 158, "cardAccess": true, "paymentAccess": true, "userAccess": true } 

Para transferir entre tarjetas, primero vaya con cada número de tarjeta en la CARD . En respuesta a las solicitudes, recibiremos cardId , luego con ellos enviaremos una solicitud a PAYMENT y haremos una transferencia. Y lo último: una vez más enviamos una solicitud de PAYMENT con fromCardId y descubrimos el saldo actual.


Para emular un pequeño retraso en los servicios, el valor de la variable de entorno TIMEOUT se genera en todos los contenedores, en los que el retraso de respuesta se establece en milisegundos. Y para diversificar las respuestas de AUTH , es posible variar el valor de SUCCESS_RATE , que controla la probabilidad de una respuesta true para el servicio.


Archivo Docker-compose.yaml:


 version: '3' services: service-auth: build: service-auth image: service-auth:1.0.0 environment: - SUCCESS_RATE=1.0 - TIMEOUT=100 ports: - "8081:8080" service-card: build: service-card image: service-card:1.0.0 environment: - TIMEOUT=100 ports: - "8082:8080" service-payment: build: service-payment image: service-payment:1.0.0 environment: - TIMEOUT=100 ports: - "8083:8080" service-user: build: service-user image: service-user:1.0.0 environment: - TIMEOUT=100 ports: - "8084:8080" 

Para todos los servicios, el reenvío de puertos desde 8081 a 8084 se realiza para llegar fácilmente a ellos directamente.


Pasemos a escribir el Demo service . Primero, intentemos escribir la implementación lo más torpe posible, sin asincronía ni concurrencia. Para hacer esto, tome Spring boot 2.2.1, Kotlin y un espacio en blanco para el servicio. Clonamos el repositorio y vamos a la rama spring-mvc-start :


 git clone https://github.com/evgzakharov/demo-service && cd demo-service && git checkout spring-mvc-start 

Vaya al archivo demo.Controller . Tiene el único método processRequest vacío para el que se debe escribir una implementación.


  @PostMapping fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { .. } 

Se recibirá una solicitud de transferencia entre tarjetas a la entrada del método.


 data class ServiceRequest( val authToken: String, val cardFrom: String, val cardTo: String, val amount: BigDecimal ) 

Para aquellos que no están familiarizados con Spring

Spring tiene una DI incorporada que funciona en base a anotaciones. El DemoController está marcado con la anotación especial RestController : además de registrar el bean en el DI, también agrega su procesamiento como controlador. PostProcessor encuentra todos los métodos marcados con la anotación PostMapping y los agrega como punto final para el servicio con el método POST .


El controlador también crea una clase proxy para DemoController, en la que todos los argumentos necesarios se pasan al método processRequest . En nuestro caso, este es solo un argumento, marcado con la anotación @RequestBody . Por lo tanto, en proxy, este método se llamará con el contenido JSON deserializado en la clase ServiceRequest .


Para facilitarlo, todos los métodos para la integración con otros servicios ya se han realizado, solo necesita conectarlos correctamente. Solo hay cinco métodos, uno para cada acción. Las llamadas a otros servicios se implementan en la llamada de bloqueo Spring RestTemplate .


Método de ejemplo para llamar a AUTH :


 private fun getAuthInfo(token: String): AuthInfo { log.info("getAuthInfo") return restTemplate.getForEntity("${demoConfig.auth}/{token}", AuthInfo::class.java, token) .body ?: throw RuntimeException("couldn't find user by token='$token'") } 

Pasemos a la implementación del método. Los comentarios indican el procedimiento y qué respuesta se espera en la salida:


  @PostMapping fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { //1) get auth info from service by token -> userId //2) find user info by userId from 1. //3) 4) find cards info for each card in serviceRequest // 5) make transaction for known cards by calling sendMoney(id1, id2, amount) // 6) after payment get payment info by fromCardId TODO("return SuccessResponse") // SuccessResponse( // amount = , // userName = , // userSurname = , // userAge = // ) } 

Primero, implementamos el método lo más simple posible, sin tener en cuenta que AUTH puede negarnos el acceso a otros servicios. Intenta hacerlo tú mismo. Cuando resulta (o después de cambiar a la rama spring-mvc ), puede verificar el funcionamiento del servicio de la siguiente manera:


implementación desde la sucursal spring-mvc
 fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { val authInfo = getAuthInfo(serviceRequest.authToken) val userInfo = findUser(authInfo.userId) val cardFromInfo = findCardInfo(serviceRequest.cardFrom) val cardToInfo = findCardInfo(serviceRequest.cardTo) sendMoney(cardFromInfo.cardId, cardToInfo.cardId, serviceRequest.amount) val paymentInfo = getPaymentInfo(cardFromInfo.cardId) return SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) } 

Inicie el servicio (desde la carpeta de servicio de demostración):


 ./gradlew bootRun 

Enviamos una solicitud al punto final:


 ./demo-request.sh 

En respuesta, obtenemos algo como esto:


 ➜ demo-service git:(spring-mvc) ✗ ./demo-request.sh + curl -XPOST http://localhost:8080/ -d @demo-payment-request.json -H 'Content-Type: application/json; charset=UTF-8' + jq . % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 182 0 85 100 97 20 23 0:00:04 0:00:04 --:--:-- 23 { "amount": 989.9, "userName": "Vasia", "userSurname": "Pupkin", "userAge": 18, "status": true } 

En total, debe realizar 6 solicitudes para implementar el servicio. Y dado que cada uno de ellos responde con un retraso de 100 ms, el tiempo total no puede ser inferior a 600 ms. En realidad, resulta unos 700 ms, teniendo en cuenta todos los gastos generales. Hasta ahora, el código es bastante simple, y si ahora queremos agregar una verificación de respuesta AUTH para acceder a otros servicios, esto no será difícil de hacer (como cualquier otra refactorización).


Pero pensemos en cómo puede acelerar la ejecución de consultas. Si no tiene en cuenta la verificación de la respuesta de AUTH , tenemos 2 tareas independientes:


  • obtener userId y solicitar datos del USER ;
  • recibiendo cardId para cada tarjeta, haciendo un pago y recibiendo el monto total.

Estas tareas se pueden realizar independientemente una de la otra. Entonces, el tiempo total de ejecución dependerá de la cadena de llamadas más larga (en este caso, la segunda) y se ejecutará en total durante 300 ms + X ms de sobrecarga.


Dado que las llamadas mismas están bloqueando, la única forma de ejecutar solicitudes paralelas es ejecutarlas en subprocesos separados. Puede crear un hilo separado para cada llamada, pero será muy costoso. Otra forma es ejecutar tareas en ThreadPool. A primera vista, dicha solución parece apropiada y el tiempo realmente disminuirá. Por ejemplo, podemos ejecutar consultas en CompletableFuture. Le permite ejecutar tareas en segundo plano llamando a métodos con el postfix async . Y si no especifica un ThreadPool específico al llamar a los métodos, las tareas se iniciarán en ForkJoinPool.commonPool() . Intente escribir una implementación usted mismo o vaya a la rama spring-mvc-async .


Implementación desde la rama spring-mvc-async
 fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { val authInfoFuture = CompletableFuture.supplyAsync { getAuthInfo(serviceRequest.authToken) } val userInfoFuture = authInfoFuture.thenApplyAsync { findUser(it.userId) } val cardFromInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardFrom) } val cardToInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardTo) } val waitAll = CompletableFuture.allOf(cardFromInfo, cardToInfo) val paymentInfoFuture = waitAll .thenApplyAsync { sendMoney(cardFromInfo.get().cardId, cardToInfo.get().cardId, serviceRequest.amount) } .thenApplyAsync { getPaymentInfo(cardFromInfo.get().cardId) } val paymentInfo = paymentInfoFuture.get() val userInfo = userInfoFuture.get() log.info("result") return SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) } 

Si ahora medimos el tiempo de solicitud, será en la región de 360 ​​ms. En comparación con la versión original, el tiempo total disminuyó en casi 2 veces. El código en sí se ha vuelto un poco más complicado, pero hasta ahora no es difícil modificarlo. Y si aquí queremos agregar una verificación de respuesta de AUTH , entonces esto no es difícil.


Pero, ¿qué pasa si tenemos una gran cantidad de solicitudes entrantes para el servicio en sí? ¿Decir unas 1000 solicitudes simultáneas? Con este enfoque, resulta bastante rápido que todos los hilos de ThreadPool están ocupados haciendo llamadas de bloqueo. Y llegamos a la conclusión de que la versión actual tampoco es adecuada.


Solo queda hacer algo con el servicio que se hace llamar. Puede modificar consultas y hacerlas sin bloqueo. Luego, los métodos para llamar a los servicios devolverán CompletableFuture, Flux, Observable, Deferred, Promise o un objeto similar sobre el cual construir una cadena de expectativas. Con este enfoque, no necesitamos hacer llamadas en flujos separados: será suficiente tener uno (o al menos un pequeño grupo separado de flujos) que ya hayamos tomado prestados para procesar las solicitudes.


¿Podemos ahora soportar la pesada carga del servicio? Para responder a esta pregunta, eche un vistazo a Tomcat, que se utiliza en Spring boot 2.2.1 en el arranque org.springframework.boot:spring-boot-starter-web . Está construido de manera que se asigna un subproceso de ThreadPool para cada solicitud entrante para su procesamiento. Y en ausencia de flujos libres, las nuevas solicitudes se convertirán en una "cola" de espera. Pero nuestro servicio en sí solo envía solicitudes a otros servicios. Asignar una secuencia completa debajo de ella y bloquearla hasta que lleguen respuestas de todos, mira, por decirlo suavemente, superfluo.


Afortunadamente, Spring recientemente hizo posible usar un servidor web sin bloqueo basado en Netty o Undertow. Para hacer esto, solo necesita cambiar spring-boot-starter-web a spring-boot-starter-webflux y cambiar ligeramente el método para procesar solicitudes en el que la solicitud y la respuesta se "envolverán" en Mono. Esto se debe al hecho de que Webflux se basa en Reactor y, por lo tanto, ahora en el método que necesita para construir una cadena de transformaciones Mono.

Intente escribir su propia implementación sin bloqueo del método. Para hacer esto, vaya a la rama spring-webflux-start . Tenga en cuenta que el iniciador de Spring Boot ha cambiado, donde ahora se usa la versión con Webflux, y la implementación de solicitudes a otros servicios que se han reescrito para usar WebClient sin bloqueo también ha cambiado.


Método de ejemplo para llamar a AUTH:


 private fun getAuthInfo(token: String): Mono<AuthInfo> { log.info("getAuthInfo") return WebClient.create().get() .uri("${demoConfig.auth}/$token") .retrieve() .bodyToMono(AuthInfo::class.java) } 

La implementación del primer ejemplo se inserta en el contenido del método processRequest en un comentario. Intenta reescribirlo tú mismo en Reactor. Como la última vez, primero haga la versión sin tener en cuenta los controles de AUTH , y luego vea lo difícil que es agregarlos:


 fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> { // val authInfo = getAuthInfo(serviceRequest.authToken) // // val userInfo = findUser(authInfo.userId) // // val cardFromInfo = findCardInfo(serviceRequest.cardFrom) // val cardToInfo = findCardInfo(serviceRequest.cardTo) // // sendMoney(cardFromInfo.cardId, cardToInfo.cardId, serviceRequest.amount) // // val paymentInfo = getPaymentInfo(cardFromInfo.cardId) // // log.info("result") // // return SuccessResponse( // amount = paymentInfo.currentAmount, // userName = userInfo.name, // userSurname = userInfo.surname, // userAge = userInfo.age // ) TODO() } 

Después de lidiar con esto, puede compararlo con mi implementación desde la spring-webflux :


Implementación desde la rama spring-webflux
 fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> { val cacheRequest = serviceRequest.cache() val userInfoMono = cacheRequest.flatMap { getAuthInfo(it.authToken) }.flatMap { findUser(it.userId) } val cardFromInfoMono = cacheRequest.flatMap { findCardInfo(it.cardFrom) } val cardToInfoMono = cacheRequest.flatMap { findCardInfo(it.cardTo) } val paymentInfoMono = cardFromInfoMono.zipWith(cardToInfoMono) .flatMap { (cardFromInfo, cardToInfo) -> cacheRequest.flatMap { request -> sendMoney(cardFromInfo.cardId, cardToInfo.cardId, request.amount).map { cardFromInfo } } }.flatMap { getPaymentInfo(it.cardId) } return userInfoMono.zipWith(paymentInfoMono) .map { (userInfo, paymentInfo) -> log.info("result") SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) } } 

Acuerde que ahora escribir una implementación (en comparación con el enfoque de bloqueo anterior) se ha vuelto más difícil. Y si queremos agregar cheques "olvidados" de AUTH , entonces esto no será tan fácil de hacer.


Esta es la esencia del enfoque reactivo. Es ideal para construir cadenas de procesamiento no ramificadas. Pero si aparece la ramificación, entonces el código ya no es tan simple.


Las corutinas de Kotlin, que son muy amigables con cualquier código asíncrono / reactivo, pueden ayudar aquí. Además, hay una gran cantidad de envoltorios escritos para Reactor , CompletableFuture , etc. Pero incluso si no encuentra el correcto, siempre puede escribirlo usted mismo, utilizando constructores especiales.


Reescribamos la implementación en corutinas por nuestra cuenta. Para hacer esto, vaya a la spring-webflux-coroutines-start . Se le agregan las dependencias necesarias en build.gradle.kts:


 implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion") 

Y el método processRequest cambia un processRequest :


 suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope { //TODO() } 

Ya no necesita Mono y se traduce simplemente en una función de suspensión (gracias a la integración de Spring y Kotlin). Teniendo en cuenta que crearemos corutinas adicionales en el método, necesitaremos crear un coroutineScope exploración coroutineScope (para comprender las razones para crear un alcance adicional, consulte la publicación de Roman Elizarov sobre concurrencia estructurada ). Tenga en cuenta que otras llamadas de servicio no han cambiado en absoluto. Devuelven el mismo Mono en el que se puede llamar al método de suspend awaitFirst para "esperar" el resultado de la consulta.


Si las corutinas siguen siendo un concepto nuevo para usted, entonces hay una guía maravillosa con una descripción detallada. Intente escribir su propia implementación del método processRequest o vaya a la rama spring-webflux-coroutines :


implementación desde la rama spring-webflux-coroutines
 suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope { log.info("start") val userInfoDeferred = async { val authInfo = getAuthInfo(serviceRequest.authToken).awaitFirst() findUser(authInfo.userId).awaitFirst() } val paymentInfoDeferred = async { val cardFromInfoDeferred = async { findCardInfo(serviceRequest.cardFrom).awaitFirst() } val cardToInfoDeferred = async { findCardInfo(serviceRequest.cardTo).awaitFirst() } val cardFromInfo = cardFromInfoDeferred.await() sendMoney(cardFromInfo.cardId, cardToInfoDeferred.await().cardId, serviceRequest.amount).awaitFirst() getPaymentInfo(cardFromInfo.cardId).awaitFirst() } val userInfo = userInfoDeferred.await() val paymentInfo = paymentInfoDeferred.await() log.info("result") SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) } 

Puede comparar el código con el enfoque reactivo. Con las rutinas, no tiene que pensar en todos los puntos de ramificación por adelantado. Podemos llamar a los métodos de await y ramificar las tareas asincrónicas en async en los lugares correctos. El código sigue siendo lo más similar posible a la versión sencilla original, que no es nada difícil de cambiar. Y un factor importante es que las corutinas simplemente están incrustadas en el código reactivo.


Puede que incluso le guste más el enfoque reactivo para esta tarea, pero a muchas de las personas encuestadas les resulta más difícil. En general, ambos enfoques resuelven su problema y puede usar el que más le guste. Por cierto, recientemente en Kotlin también existe la oportunidad de crear corutinas "frías" con Flow, que son muy similares a Reactor. Es cierto que todavía están en la etapa experimental, pero ahora puede ver la implementación actual y probarla en su código.


Quiero terminar aquí y finalmente dejar enlaces útiles:



Espero que haya estado interesado y haya logrado escribir una implementación del método para todos los métodos usted mismo. Y, por supuesto, quiero creer que te gusta la opción con corutinas más =)


¡Gracias a todos los que leyeron hasta el final!

Source: https://habr.com/ru/post/477052/


All Articles