
在现代世界中,许多服务大部分都“无所作为”。 他们的任务减少为对其他数据库/服务/缓存的请求,并根据各种规则和各种业务逻辑汇总所有这些数据。 因此,出现诸如Golang之类的语言,并具有方便的内置竞争系统使组织非阻塞代码变得容易也就不足为奇了。
在JVM世界中,事情要复杂一些。 有大量的框架和库在使用时会阻塞线程。 因此,stdlib本身有时可以做同样的事情。 而且在Java中,没有类似于Golang中的goroutines的类似机制。
但是,JVM正在积极开发,并且出现了许多有趣的机会。 有带有协程的Kotlin,它们的用法与Gorang goroutines非常相似(尽管它们以完全不同的方式实现)。 有JEP Loom,它将在将来将纤维引入JVM。 最受欢迎的Web框架之一-Spring最近增加了在Webflux上创建完全非阻塞服务的功能。 借助最近发布的Spring boot 2.2,与Kotlin的集成甚至更好。
我建议以一个用于将钱从一张卡转移到另一张卡的小型服务的示例为例,在Spring boot 2.2和Kotlin上编写一个应用程序,以与多个外部服务集成。
如果您已经熟悉Java,Kotlin,Gradle,Spring,Spring boot 2,Reactor, Webflux,Tomcat,Netty,Kotlinoroutines,Gradle Kotlin DSL甚至拥有博士学位,那将是一个很好的选择。 但是,如果没有,那没关系。 该代码将得到最大程度的简化,即使您不是来自JVM世界,也希望您能理解所有内容。
如果您打算自己编写服务,请确保已安装所需的一切:
- Java 8+
- Docker和Docker Compose;
- cURL,最好是jq ;
- 吉特
- 最好是Kotlin的IDE(Intellij Idea,Eclipse,VS,
vim等)。 但是在笔记本电脑上也是可以的。
示例将包含服务中实现的空白和已编写的实现。 首先,运行安装和组装,并仔细查看服务及其API。
服务和API本身的示例仅用于说明目的;请勿将所有AS IS
转移到您的产品上!
首先,我们使用自己的服务克隆存储库,并进行集成,然后转到目录:
git clone https://github.com/evgzakharov/spring-demo-services && cd spring-demo-services
在单独的终端中,我们使用gradle
收集所有应用程序,在构建成功之后,所有服务将使用gradle
docker-compose
启动。
./gradlew build && docker-compose up
在下载并安装所有内容后,请考虑一个包含服务的项目。

服务入口(演示服务)将收到带有令牌,转账卡号和卡间转账金额的请求:
{ "authToken": "auth-token1", "cardFrom": "55593478", "cardTo": "55592020", "amount": "10.1" }
根据authToken
令牌, authToken
需要转到AUTH
服务并获取userId
,然后可以使用它向USER
发出请求并提取有关该用户的所有其他信息。 AUTH
还将返回有关我们可以访问这三个服务中的哪一个的信息。 来自AUTH
示例响应:
{ "userId": 158, "cardAccess": true, "paymentAccess": true, "userAccess": true }
要在卡之间进行转移,请首先使用卡中的每个CARD
。 响应请求,我们将收到cardId
,然后与他们一起向PAYMENT
发送请求并进行转帐。 最后-我们再次使用fromCardId
向PAYMENT
发送请求,并找出当前余额。
为了模拟较小的服务延迟,将TIMEOUT环境变量的值引发到所有容器中,其中将响应延迟设置为毫秒。 为了使来自AUTH
的响应多样化,可以更改SUCCESS_RATE
的值,该值控制对服务进行true
响应的可能性。
Docker-compose.yaml文件:
version: '3' services: service-auth: build: service-auth image: service-auth:1.0.0 environment: - SUCCESS_RATE=1.0 - TIMEOUT=100 ports: - "8081:8080" service-card: build: service-card image: service-card:1.0.0 environment: - TIMEOUT=100 ports: - "8082:8080" service-payment: build: service-payment image: service-payment:1.0.0 environment: - TIMEOUT=100 ports: - "8083:8080" service-user: build: service-user image: service-user:1.0.0 environment: - TIMEOUT=100 ports: - "8084:8080"
对于所有服务,都完成了从8081到8084的端口转发,可以轻松直接到达它们。
让我们继续编写Demo service
。 首先,让我们尝试将实现编写得尽可能笨拙,不要出现异步和并发。 为此,请使用Spring boot 2.2.1,Kotlin和一个空白的服务。 我们克隆存储库,然后转到spring-mvc-start
分支:
git clone https://github.com/evgzakharov/demo-service && cd demo-service && git checkout spring-mvc-start
转到demo.Controller
文件。 它具有必须为其编写实现的唯一空的processRequest
方法。
@PostMapping fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { .. }
该方法的入口处将收到卡之间的转移请求。
data class ServiceRequest( val authToken: String, val cardFrom: String, val cardTo: String, val amount: BigDecimal )
对于那些不熟悉Spring的人Spring有一个基于注释的内置DI。 DemoController带有特殊的RestController
注释:除了在DI中注册Bean之外,它还添加了其作为控制器的处理。 PostProcessor查找标记有PostMapping
注释的所有方法,并使用POST
方法将它们添加为服务的端点。
处理程序还为DemoController创建一个代理类,其中所有必需的参数都传递给processRequest
方法。 在我们的例子中,这只是一个带有@RequestBody
批注的参数。 因此,在代理中,将使用反序列化到ServiceRequest
类中的JSON内容来调用此方法。
为了简化,已经完成了与其他服务集成的所有方法,您只需要正确连接它们即可。 只有五种方法,每个动作一个。 对其他服务本身的调用是在Spring RestTemplate
阻塞调用上实现的。
调用AUTH
示例方法:
private fun getAuthInfo(token: String): AuthInfo { log.info("getAuthInfo") return restTemplate.getForEntity("${demoConfig.auth}/{token}", AuthInfo::class.java, token) .body ?: throw RuntimeException("couldn't find user by token='$token'") }
让我们继续该方法的实现。 注释说明了该过程以及在输出中期望得到什么响应:
@PostMapping fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response {
首先,我们尽可能简单地实现该方法,而不考虑AUTH
可以拒绝我们访问其他服务。 尝试自己做。 事实证明(或切换到spring-mvc
分支后),您可以按以下方式检查服务的运行情况:
从spring-mvc分支实现 fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { val authInfo = getAuthInfo(serviceRequest.authToken) val userInfo = findUser(authInfo.userId) val cardFromInfo = findCardInfo(serviceRequest.cardFrom) val cardToInfo = findCardInfo(serviceRequest.cardTo) sendMoney(cardFromInfo.cardId, cardToInfo.cardId, serviceRequest.amount) val paymentInfo = getPaymentInfo(cardFromInfo.cardId) return SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) }
启动服务(从demo-service文件夹中):
./gradlew bootRun
我们向端点发送请求:
./demo-request.sh
作为响应,我们得到如下信息:
➜ demo-service git:(spring-mvc) ✗ ./demo-request.sh + curl -XPOST http://localhost:8080/ -d @demo-payment-request.json -H 'Content-Type: application/json; charset=UTF-8' + jq . % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 182 0 85 100 97 20 23 0:00:04 0:00:04 --:--:-- 23 { "amount": 989.9, "userName": "Vasia", "userSurname": "Pupkin", "userAge": 18, "status": true }
为了实施该服务,总共需要发出6个请求。 考虑到每个响应的延迟为100 ms,总时间不能少于600 ms。 实际上,考虑到所有开销,结果约为700毫秒。 到目前为止,代码非常简单,如果我们现在想添加AUTH
响应检查以访问其他服务,这将不难做到(就像其他任何重构一样)。
但是,让我们考虑一下如何加快查询执行速度。 如果您不考虑对AUTH
响应的验证,那么我们有两个独立的任务:
- 获取
userId
并向USER
请求数据; - 接收每张卡的
cardId
,进行付款并接收总金额。
这些任务可以彼此独立地执行。 然后,总的执行时间将取决于最长的调用链(在这种情况下为第二个),总共将执行300 ms + X ms的开销。
鉴于调用本身是阻塞的,执行并行请求的唯一方法是在单独的线程上运行它们。 您可以为每个调用创建一个单独的线程,但这将非常昂贵。 另一种方法是在ThreadPool上运行任务。 乍一看,这样的解决方案看起来很合适,而且时间会真正减少。 例如,我们可以在CompletableFuture上执行查询。 它允许您通过使用async
后缀调用方法来运行后台任务。 并且,如果在调用方法时未指定特定的ThreadPool,则将在ForkJoinPool.commonPool()
上启动任务。 尝试自己编写一个实现,或者转到spring-mvc-async
分支。
从spring-mvc-async分支实现 fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { val authInfoFuture = CompletableFuture.supplyAsync { getAuthInfo(serviceRequest.authToken) } val userInfoFuture = authInfoFuture.thenApplyAsync { findUser(it.userId) } val cardFromInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardFrom) } val cardToInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardTo) } val waitAll = CompletableFuture.allOf(cardFromInfo, cardToInfo) val paymentInfoFuture = waitAll .thenApplyAsync { sendMoney(cardFromInfo.get().cardId, cardToInfo.get().cardId, serviceRequest.amount) } .thenApplyAsync { getPaymentInfo(cardFromInfo.get().cardId) } val paymentInfo = paymentInfoFuture.get() val userInfo = userInfoFuture.get() log.info("result") return SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) }
如果现在测量请求时间,则该时间将在360毫秒左右。 与原始版本相比,总时间减少了将近2倍。 代码本身已经变得有些复杂,但是到目前为止,对其进行修改仍然不难。 如果在这里我们要添加来自AUTH
的响应检查,那么这并不困难。
但是,如果我们对服务本身有大量传入请求怎么办? 说大约1000个并发请求? 通过这种方法,很快就会发现所有ThreadPool线程都在忙于进行阻塞调用。 我们得出的结论是当前版本也不适合。
剩下的事情仅仅是对服务本身进行一些处理。 您可以修改查询并使它们成为非阻塞。 然后,调用服务的方法将返回CompletableFuture,Flux,Observable,Deferred,Promise或在其上构建期望链的类似对象。 使用这种方法,我们不需要在单独的流上进行调用-拥有一个我们已经借来的用于处理请求的(或至少一个小的单独的流池)就足够了。
我们现在可以承受服务的沉重负担吗? 要回答这个问题,请仔细阅读Tomcat,它在启动程序org.springframework.boot:spring-boot-starter-web
中的Spring boot 2.2.1中使用。 它被构建为将ThreadPool中的线程分配给每个传入请求进行处理。 在没有自由流通的情况下,新的请求将成为等待的“队列”。 但是我们的服务本身仅向其他服务发送请求。 分配一个完整的流,并阻塞它,直到每个人的答案都出现为止,看上去,这是多余的。
幸运的是,Spring最近使使用基于Netty或Undertow的非阻塞Web服务器成为可能。 为此,您只需要将spring-boot-starter-web
更改为spring-boot-starter-webflux
并略微更改处理请求的方法,即可在Mono中“包装”请求和响应。 这是由于Webflux是基于Reactor构建的,因此现在您需要使用该方法来构建Mono转换链。
尝试编写自己的方法的非阻塞实现。 为此,请转到spring-webflux-start
分支。 请注意,Spring Boot的启动程序已更改,现在使用带有Webflux的版本,并且对已重写为使用非阻塞WebClient
其他服务的请求的实现也已更改。
调用AUTH的示例方法:
private fun getAuthInfo(token: String): Mono<AuthInfo> { log.info("getAuthInfo") return WebClient.create().get() .uri("${demoConfig.auth}/$token") .retrieve() .bodyToMono(AuthInfo::class.java) }
第一个示例的实现在注释中插入到processRequest
方法的内容中。 尝试自己在Reactor上重写它。 像上次一样,首先制作版本而不考虑AUTH
的检查,然后查看添加检查的难度:
fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> {
处理spring-webflux
之后,您可以从spring-webflux
与我的实现进行比较:
从spring-webflux分支实现 fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> { val cacheRequest = serviceRequest.cache() val userInfoMono = cacheRequest.flatMap { getAuthInfo(it.authToken) }.flatMap { findUser(it.userId) } val cardFromInfoMono = cacheRequest.flatMap { findCardInfo(it.cardFrom) } val cardToInfoMono = cacheRequest.flatMap { findCardInfo(it.cardTo) } val paymentInfoMono = cardFromInfoMono.zipWith(cardToInfoMono) .flatMap { (cardFromInfo, cardToInfo) -> cacheRequest.flatMap { request -> sendMoney(cardFromInfo.cardId, cardToInfo.cardId, request.amount).map { cardFromInfo } } }.flatMap { getPaymentInfo(it.cardId) } return userInfoMono.zipWith(paymentInfoMono) .map { (userInfo, paymentInfo) -> log.info("result") SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) } }
同意现在编写实现(与以前的阻止方法相比)变得更加困难。 而且,如果我们想从AUTH
添加“忘记的”支票,那么这将不是一件容易的事。
这是反应性方法的本质。 这对于构建无分支的处理链非常有用。 但是,如果出现分支,则代码不再那么简单。
Kotlin协程对于任何异步/反应式代码都非常友好,可以为您提供帮助。 此外, Reactor , CompletableFuture等还有大量书面包装。 但是,即使找不到合适的工具,也可以随时使用特殊的构建器自己编写。
让我们自己重写协程的实现。 为此,请转到spring-webflux-coroutines-start
分支。 必需的依赖项已添加到build.gradle.kts中:
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion")
并且processRequest
方法有所变化:
suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope {
它不再需要Mono并可以简单地转换为暂停函数(由于Spring和Kotlin的集成)。 考虑到我们将在该方法中创建其他协程,我们将需要创建一个子侦察员coroutineScope
(出于对创建附加范围的原因的理解,请参见Roman Elizarov的有关结构化并发的文章)。 请注意,其他服务电话完全没有改变。 它们返回相同的Mono,可以在其上调用awaitFirst suspend
方法以“等待”查询结果。
如果协程对于您来说仍然是一个新概念,那么有一个很好的指南 ,里面有详细的说明。 尝试编写您自己的processRequest
方法实现,或转到spring-webflux-coroutines
分支:
从spring-webflux-coroutines分支实现 suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope { log.info("start") val userInfoDeferred = async { val authInfo = getAuthInfo(serviceRequest.authToken).awaitFirst() findUser(authInfo.userId).awaitFirst() } val paymentInfoDeferred = async { val cardFromInfoDeferred = async { findCardInfo(serviceRequest.cardFrom).awaitFirst() } val cardToInfoDeferred = async { findCardInfo(serviceRequest.cardTo).awaitFirst() } val cardFromInfo = cardFromInfoDeferred.await() sendMoney(cardFromInfo.cardId, cardToInfoDeferred.await().cardId, serviceRequest.amount).awaitFirst() getPaymentInfo(cardFromInfo.cardId).awaitFirst() } val userInfo = userInfoDeferred.await() val paymentInfo = paymentInfoDeferred.await() log.info("result") SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) }
您可以将代码与反应性方法进行比较。 使用协程,您不必事先考虑所有分支点。 我们可以只调用await
方法,并在正确的位置以async
方式分支出异步任务。 该代码与原始的直接版本尽可能保持相似,这根本不难更改。 一个重要的因素是协程简单地嵌入到响应代码中。
您甚至可能更喜欢被动式方法来完成此任务,但是许多接受调查的人发现它比较困难。 通常,两种方法都可以解决它们的问题,您可以使用自己喜欢的一种。 顺便说一下,最近在Kotlin,也有机会使用Flow创建“冷”协程,这与Reactor非常相似。 没错,它们仍处于试验阶段,但是现在您可以查看当前的实现并在您的代码中进行尝试。
我想在这里结束,最后留下有用的链接:
希望您感兴趣,并且您设法自己为所有方法编写了该方法的实现。 而且,当然,我想相信您喜欢带有协程的选项more =)
感谢所有读完本书的人!