
Viele Dienste in der modernen Welt tun größtenteils „nichts“. Ihre Aufgaben beschränken sich darauf, andere Datenbanken / Dienste / Caches anzufordern und all diese Daten nach verschiedenen Regeln und verschiedenen Geschäftslogiken zu aggregieren. Aus diesem Grund ist es nicht verwunderlich, dass Sprachen wie Golang mit einem praktischen integrierten Wettbewerbssystem angezeigt werden, mit dem sich nicht blockierender Code einfach organisieren lässt.
In der JVM-Welt sind die Dinge etwas komplizierter. Es gibt eine Vielzahl von Frameworks und Bibliotheken, die Threads blockieren, wenn sie verwendet werden. So kann stdlib selbst manchmal das Gleiche tun. Und in Java gibt es keinen ähnlichen Mechanismus wie in Golang.
Trotzdem entwickelt sich JVM aktiv weiter und es ergeben sich neue interessante Möglichkeiten. Es gibt Kotlin mit Coroutinen, die in ihrer Verwendung den Golang-Goroutinen sehr ähnlich sind (obwohl sie auf völlig andere Weise implementiert sind). Es gibt JEP Loom, das der JVM in Zukunft Fasern bringen wird. Eines der beliebtesten Webframeworks - Spring - hat kürzlich die Möglichkeit hinzugefügt, vollständig blockierungsfreie Dienste auf Webflux zu erstellen. Und mit der jüngsten Version von Spring Boot 2.2 ist die Integration mit Kotlin noch besser.
Ich schlage vor, am Beispiel eines kleinen Dienstes zum Übertragen von Geld von einer Karte auf eine andere eine Anwendung für Spring Boot 2.2 und Kotlin zur Integration mit mehreren externen Diensten zu schreiben.
Es ist gut, wenn Sie bereits mit Java, Kotlin, Gradle, Spring, Spring Boot 2, Reactor, Web Flux, Tomcat, Netty, Kotlin Coroutines, Gradle Kotlin DSL vertraut sind oder sogar einen Doktortitel haben. Wenn nicht, spielt es keine Rolle. Der Code wird maximal vereinfacht, und selbst wenn Sie nicht aus der JVM-Welt stammen, hoffe ich, dass Ihnen alles klar ist.
Wenn Sie vorhaben, einen Dienst selbst zu schreiben, stellen Sie sicher, dass alles installiert ist, was Sie benötigen:
- Java 8+
- Docker und Docker Compose;
- cURL und vorzugsweise jq ;
- Git
- vorzugsweise eine IDE für Kotlin (Intellij Idea, Eclipse, VS,
vim usw.). Aber es ist in einem Notebook möglich.
Die Beispiele enthalten sowohl Leerzeichen für die Implementierung im Service als auch eine bereits geschriebene Implementierung. Führen Sie zuerst die Installation und Assembly aus, und werfen Sie einen genaueren Blick auf die Dienste und ihre APIs.
Das Beispiel für Services und API selbst dient nur zu Illustrationszwecken, übertragen Sie nicht alle AS IS
Daten auf Ihr Produkt!
Zuerst klonen wir das Repository mit den Diensten für uns selbst, die Integration, mit der wir das tun werden, und wechseln in das Verzeichnis:
git clone https://github.com/evgzakharov/spring-demo-services && cd spring-demo-services
In einem separaten Terminal sammeln wir alle Anwendungen mit gradle
, wobei nach einem erfolgreichen Build alle Dienste mit gradle
docker-compose
gestartet werden.
./gradlew build && docker-compose up
Stellen Sie sich ein Projekt mit Diensten vor, während alles heruntergeladen und installiert wird.

Eine Anfrage mit einem Token, Kartennummern für die Übertragung und dem Betrag, der zwischen den Karten übertragen werden soll, wird am Eingang des Dienstes eingehen (Demo-Dienst):
{ "authToken": "auth-token1", "cardFrom": "55593478", "cardTo": "55592020", "amount": "10.1" }
Entsprechend dem Token authToken
müssen authToken
zum Dienst userId
gehen und die userId
, mit der Sie eine Anforderung an USER
und alle zusätzlichen Informationen zum Benutzer userId
können. AUTH
gibt auch Informationen darüber zurück, auf welche der drei Dienste wir zugreifen können. Beispielantwort von AUTH
:
{ "userId": 158, "cardAccess": true, "paymentAccess": true, "userAccess": true }
Um zwischen Karten zu wechseln, gehen Sie zunächst mit jeder Kartennummer in der CARD
. Als Antwort auf Anfragen erhalten wir eine cardId
, mit der wir eine Anfrage an PAYMENT
und eine Überweisung vornehmen. Und zum PAYMENT
senden wir noch einmal eine Anfrage an PAYMENT
mit fromCardId
und ermitteln den aktuellen Kontostand.
Um eine kleine Verzögerung bei Diensten zu emulieren, wird der Wert der Umgebungsvariablen TIMEOUT in allen Containern ausgegeben, in denen die Antwortverzögerung in Millisekunden festgelegt ist. Und um die Antworten von AUTH
zu diversifizieren, ist es möglich, den Wert von SUCCESS_RATE
zu variieren, der die Wahrscheinlichkeit einer true
Antwort für den Dienst steuert.
Docker-compose.yaml-Datei:
version: '3' services: service-auth: build: service-auth image: service-auth:1.0.0 environment: - SUCCESS_RATE=1.0 - TIMEOUT=100 ports: - "8081:8080" service-card: build: service-card image: service-card:1.0.0 environment: - TIMEOUT=100 ports: - "8082:8080" service-payment: build: service-payment image: service-payment:1.0.0 environment: - TIMEOUT=100 ports: - "8083:8080" service-user: build: service-user image: service-user:1.0.0 environment: - TIMEOUT=100 ports: - "8084:8080"
Für alle Dienste wird eine Portweiterleitung von 8081 nach 8084 durchgeführt, um sie direkt zu erreichen.
Fahren wir mit dem Schreiben des Demo service
. Versuchen wir zunächst, die Implementierung so umständlich wie möglich zu schreiben, ohne Asynchronität und Parallelität. Dazu nehmen Sie Spring Boot 2.2.1, Kotlin und einen Blank für den Service. Wir klonen das Repository und gehen zum Zweig spring-mvc-start
:
git clone https://github.com/evgzakharov/demo-service && cd demo-service && git checkout spring-mvc-start
Gehen Sie zur demo.Controller
Datei. Es hat die einzige leere processRequest
Methode, für die eine Implementierung geschrieben werden muss.
@PostMapping fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { .. }
Eine Aufforderung zur Übertragung zwischen Karten erhalten Sie am Eingang der Methode.
data class ServiceRequest( val authToken: String, val cardFrom: String, val cardTo: String, val amount: BigDecimal )
Für diejenigen, die Spring nicht kennenSpring verfügt über eine integrierte DI, die auf Anmerkungen basiert. Der DemoController ist mit der speziellen Anmerkung RestController gekennzeichnet: Er registriert nicht nur die Bean in der DI, sondern fügt auch deren Verarbeitung als Controller hinzu. PostProcessor findet alle mit der PostMapping
Annotation markierten Methoden und fügt sie mit der POST
Methode als Endpunkt für den Service hinzu.
Der Handler erstellt auch eine Proxy-Klasse für den DemoController, in der alle erforderlichen Argumente an die processRequest
Methode übergeben werden. In unserem Fall ist dies nur ein Argument, das mit der Annotation @RequestBody
. Daher wird diese Methode im Proxy mit dem in der ServiceRequest
Klasse deserialisierten JSON-Inhalt ServiceRequest
.
Um es einfacher zu machen, wurden bereits alle Methoden zur Integration mit anderen Diensten durchgeführt. Sie müssen sie nur richtig verbinden. Es gibt nur fünf Methoden, eine für jede Aktion. Aufrufe an andere Dienste selbst werden im Blockierungsaufruf von Spring RestTemplate
implementiert.
Beispielmethode zum Aufrufen von AUTH
:
private fun getAuthInfo(token: String): AuthInfo { log.info("getAuthInfo") return restTemplate.getForEntity("${demoConfig.auth}/{token}", AuthInfo::class.java, token) .body ?: throw RuntimeException("couldn't find user by token='$token'") }
Fahren wir mit der Implementierung der Methode fort. Die Kommentare zeigen die Prozedur und welche Antwort am Ausgang erwartet wird:
@PostMapping fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response {
Zunächst implementieren wir die Methode so einfach wie möglich, ohne zu berücksichtigen, dass AUTH
uns den Zugriff auf andere Dienste verweigern kann. Versuchen Sie es selbst zu tun. Wenn sich herausstellt (oder nachdem Sie in den spring-mvc
Zweig spring-mvc
), können Sie die Funktionsweise des Dienstes wie folgt überprüfen:
umsetzung von spring-mvc branch fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { val authInfo = getAuthInfo(serviceRequest.authToken) val userInfo = findUser(authInfo.userId) val cardFromInfo = findCardInfo(serviceRequest.cardFrom) val cardToInfo = findCardInfo(serviceRequest.cardTo) sendMoney(cardFromInfo.cardId, cardToInfo.cardId, serviceRequest.amount) val paymentInfo = getPaymentInfo(cardFromInfo.cardId) return SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) }
Starten Sie den Dienst (aus dem Demo-Dienst-Ordner):
./gradlew bootRun
Wir senden eine Anfrage an den Endpunkt:
./demo-request.sh
Als Antwort erhalten wir so etwas:
➜ demo-service git:(spring-mvc) ✗ ./demo-request.sh + curl -XPOST http://localhost:8080/ -d @demo-payment-request.json -H 'Content-Type: application/json; charset=UTF-8' + jq . % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 182 0 85 100 97 20 23 0:00:04 0:00:04 --:--:-- 23 { "amount": 989.9, "userName": "Vasia", "userSurname": "Pupkin", "userAge": 18, "status": true }
Insgesamt müssen Sie 6 Anfragen stellen, um den Service zu implementieren. Und da jeder von ihnen mit einer Verzögerung von 100 ms antwortet, kann die Gesamtzeit nicht weniger als 600 ms betragen. In der Realität ergeben sich unter Berücksichtigung des gesamten Overheads etwa 700 ms. Bisher ist der Code recht einfach, und wenn wir jetzt eine AUTH
Antwortprüfung für den Zugriff auf andere Dienste hinzufügen möchten, ist dies nicht schwierig (wie bei jedem anderen Refactoring).
Aber lassen Sie uns darüber nachdenken, wie Sie die Ausführung von Abfragen beschleunigen können. Wenn Sie die Überprüfung der Antwort von AUTH
nicht berücksichtigen, haben wir zwei unabhängige Aufgaben:
userId
Benutzer- userId
und Anfordern von Daten von USER
;cardId
für jede Karte erhalten, eine Zahlung cardId
und den Gesamtbetrag erhalten.
Diese Aufgaben können unabhängig voneinander durchgeführt werden. Die Gesamtausführungszeit hängt dann von der längsten Kette von Aufrufen ab (in diesem Fall von der zweiten) und wird insgesamt für 300 ms + X ms Overhead ausgeführt.
Da die Aufrufe selbst blockiert sind, besteht die einzige Möglichkeit, parallele Anforderungen auszuführen, darin, sie auf separaten Threads auszuführen. Sie können für jeden Aufruf einen eigenen Thread erstellen, dies ist jedoch sehr teuer. Eine andere Möglichkeit besteht darin, Aufgaben in ThreadPool auszuführen. Auf den ersten Blick sieht eine solche Lösung angemessen aus und die Zeit wird sich verkürzen. Beispielsweise können wir Abfragen auf CompletableFuture ausführen. Sie können Hintergrundaufgaben ausführen, indem Sie Methoden mit dem async
Postfix aufrufen. Wenn Sie beim Aufrufen von Methoden keinen bestimmten ThreadPool angeben, werden Tasks in ForkJoinPool.commonPool()
gestartet. Versuchen Sie, eine Implementierung selbst zu schreiben, oder gehen Sie zum Zweig spring-mvc-async
.
Implementierung aus dem Bereich spring-mvc-async fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response { val authInfoFuture = CompletableFuture.supplyAsync { getAuthInfo(serviceRequest.authToken) } val userInfoFuture = authInfoFuture.thenApplyAsync { findUser(it.userId) } val cardFromInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardFrom) } val cardToInfo = CompletableFuture.supplyAsync { findCardInfo(serviceRequest.cardTo) } val waitAll = CompletableFuture.allOf(cardFromInfo, cardToInfo) val paymentInfoFuture = waitAll .thenApplyAsync { sendMoney(cardFromInfo.get().cardId, cardToInfo.get().cardId, serviceRequest.amount) } .thenApplyAsync { getPaymentInfo(cardFromInfo.get().cardId) } val paymentInfo = paymentInfoFuture.get() val userInfo = userInfoFuture.get() log.info("result") return SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) }
Wenn wir jetzt die Anforderungszeit messen, wird sie im Bereich von 360 ms liegen. Gegenüber der Originalversion hat sich die Gesamtzeit um fast das 2-fache verringert. Der Code selbst ist etwas komplizierter geworden, aber bis jetzt ist es nicht schwierig, ihn zu ändern. Und wenn wir hier eine Antwortprüfung von AUTH
hinzufügen möchten, ist dies nicht schwierig.
Aber was ist, wenn wir eine große Anzahl eingehender Anfragen für den Dienst selbst haben? Sagen Sie etwa 1000 gleichzeitige Anfragen? Bei diesem Ansatz stellt sich schnell heraus, dass alle ThreadPool-Threads damit beschäftigt sind, Aufrufe zu blockieren. Und wir kommen zu dem Schluss, dass die aktuelle Version auch nicht passt.
Es bleibt nur noch etwas mit den Serviceaufrufen selbst zu tun. Sie können Abfragen ändern und sie nicht blockieren. Dann geben die Methoden zum Aufrufen der Services CompletableFuture, Flux, Observable, Deferred, Promise oder ein ähnliches Objekt zurück, auf dem eine Kette von Erwartungen aufgebaut werden kann. Mit diesem Ansatz müssen wir keine Aufrufe für separate Streams tätigen. Es reicht aus, einen (oder zumindest einen kleinen separaten Pool von Streams) zu haben, den wir bereits für die Verarbeitung von Anforderungen ausgeliehen haben.
Können wir jetzt der hohen Belastung des Dienstes standhalten? Um diese Frage zu beantworten, werfen Sie einen Blick auf Tomcat, das in Spring Boot 2.2.1 im Starter org.springframework.boot:spring-boot-starter-web
. Es ist so aufgebaut, dass für jede eingehende Anforderung ein Thread aus ThreadPool für dessen Verarbeitung zugewiesen wird. Und wenn kein freier Datenfluss besteht, werden neue Anforderungen zu einer Warteschlange. Unser Dienst selbst sendet jedoch nur Anfragen an andere Dienste. Ordnen Sie einen ganzen Stream darunter zu und blockieren Sie ihn, bis Antworten von allen kommen, um es milde auszudrücken, überflüssig.
Glücklicherweise hat Spring kürzlich die Verwendung eines nicht blockierenden Webservers auf Basis von Netty oder Undertow ermöglicht. Dazu müssen Sie nur den spring-boot-starter-web
spring-boot-starter-webflux
in spring-boot-starter-webflux
ändern und die Methode für die Verarbeitung von Anforderungen, in denen die Anforderung und die Antwort in Mono "verpackt" werden, geringfügig ändern. Dies liegt an der Tatsache, dass Webflux auf der Basis von Reactor erstellt wurde und daher jetzt in der Methode eine Kette von Mono-Transformationen erstellt werden muss.
Versuchen Sie, Ihre eigene nicht blockierende Implementierung der Methode zu schreiben. Gehen Sie dazu in den Zweig spring-webflux-start
. Beachten Sie, dass der Starter für Spring Boot geändert wurde, wobei jetzt die Version mit Webflux verwendet wird, und die Implementierung von Anforderungen an andere Dienste, die für die Verwendung des nicht blockierenden WebClient
ebenfalls geändert wurde.
Beispielmethode zum Aufrufen von AUTH:
private fun getAuthInfo(token: String): Mono<AuthInfo> { log.info("getAuthInfo") return WebClient.create().get() .uri("${demoConfig.auth}/$token") .retrieve() .bodyToMono(AuthInfo::class.java) }
Die Implementierung des ersten Beispiels wird in einem Kommentar in den Inhalt der processRequest
Methode eingefügt. Versuchen Sie, es in Reactor selbst neu zu schreiben. Machen Sie wie beim letzten Mal zuerst die Version, ohne die Prüfungen von AUTH
zu berücksichtigen, und sehen Sie dann, wie schwierig es ist, sie hinzuzufügen:
fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> {
Nachdem Sie sich damit beschäftigt haben, können Sie sich mit meiner Implementierung aus dem spring-webflux
:
Umsetzung aus der spring-webflux-Branche fun processRequest(@RequestBody serviceRequest: Mono<ServiceRequest>): Mono<Response> { val cacheRequest = serviceRequest.cache() val userInfoMono = cacheRequest.flatMap { getAuthInfo(it.authToken) }.flatMap { findUser(it.userId) } val cardFromInfoMono = cacheRequest.flatMap { findCardInfo(it.cardFrom) } val cardToInfoMono = cacheRequest.flatMap { findCardInfo(it.cardTo) } val paymentInfoMono = cardFromInfoMono.zipWith(cardToInfoMono) .flatMap { (cardFromInfo, cardToInfo) -> cacheRequest.flatMap { request -> sendMoney(cardFromInfo.cardId, cardToInfo.cardId, request.amount).map { cardFromInfo } } }.flatMap { getPaymentInfo(it.cardId) } return userInfoMono.zipWith(paymentInfoMono) .map { (userInfo, paymentInfo) -> log.info("result") SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) } }
Stimmen Sie zu, dass das Schreiben einer Implementierung (im Vergleich zum vorherigen Blockierungsansatz) schwieriger geworden ist. Und wenn wir "vergessene" Schecks von AUTH
hinzufügen wollen, ist dies nicht so einfach.
Dies ist der Kern des reaktiven Ansatzes. Es eignet sich hervorragend zum Aufbau von unverzweigten Verarbeitungsketten. Wenn jedoch eine Verzweigung auftritt, ist der Code nicht mehr so einfach.
Kotlin-Coroutinen, die mit jedem asynchronen / reaktiven Code sehr vertraut sind, können hier Abhilfe schaffen. Darüber hinaus gibt es eine große Anzahl schriftlicher Wrapper für Reactor , CompletableFuture usw. Aber auch wenn Sie nicht den richtigen finden, können Sie ihn mit speziellen Buildern immer selbst schreiben.
Lassen Sie uns die Implementierung für Coroutinen selbst umschreiben. Gehen Sie dazu in den Zweig spring-webflux-coroutines-start
. Die erforderlichen Abhängigkeiten werden in build.gradle.kts hinzugefügt:
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion")
Und die processRequest
Methode ändert sich ein processRequest
:
suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope {
Es benötigt kein Mono mehr und wird einfach in eine Suspend-Funktion umgewandelt (dank der Integration von Spring und Kotlin). In Anbetracht der Tatsache, dass wir zusätzliche Coroutinen in der Methode erstellen werden, müssen wir ein untergeordnetes Scout- coroutineScope
erstellen (um die Gründe für das Erstellen eines zusätzlichen Scopes zu verstehen, siehe Roman Elizarovs Beitrag über strukturierte Parallelität ). Bitte beachten Sie, dass sich andere Serviceabrufe überhaupt nicht geändert haben. Sie geben dasselbe Mono zurück, in dem die Methode awaitFirst suspend
aufgerufen werden kann, um auf das Ergebnis der Abfrage zu warten.
Wenn Coroutinen für Sie immer noch ein neues Konzept sind, gibt es eine wunderbare Anleitung mit einer detaillierten Beschreibung. Versuchen Sie, eine eigene Implementierung der processRequest
Methode zu processRequest
, oder gehen Sie zum Zweig spring-webflux-coroutines
:
umsetzung aus dem bereich spring-webflux-coroutines suspend fun processRequest(@RequestBody serviceRequest: ServiceRequest): Response = coroutineScope { log.info("start") val userInfoDeferred = async { val authInfo = getAuthInfo(serviceRequest.authToken).awaitFirst() findUser(authInfo.userId).awaitFirst() } val paymentInfoDeferred = async { val cardFromInfoDeferred = async { findCardInfo(serviceRequest.cardFrom).awaitFirst() } val cardToInfoDeferred = async { findCardInfo(serviceRequest.cardTo).awaitFirst() } val cardFromInfo = cardFromInfoDeferred.await() sendMoney(cardFromInfo.cardId, cardToInfoDeferred.await().cardId, serviceRequest.amount).awaitFirst() getPaymentInfo(cardFromInfo.cardId).awaitFirst() } val userInfo = userInfoDeferred.await() val paymentInfo = paymentInfoDeferred.await() log.info("result") SuccessResponse( amount = paymentInfo.currentAmount, userName = userInfo.name, userSurname = userInfo.surname, userAge = userInfo.age ) }
Sie können den Code mit dem reaktiven Ansatz vergleichen. Bei Coroutinen müssen Sie nicht alle Verzweigungspunkte im Voraus durchdenken. Wir können einfach await
Methoden aufrufen und asynchrone Tasks asynchron an den richtigen Stellen verzweigen. Der Code bleibt so ähnlich wie möglich der ursprünglichen, unkomplizierten Version, die überhaupt nicht schwer zu ändern ist. Ein wichtiger Faktor ist, dass Koroutinen einfach in reaktiven Code eingebettet sind.
Vielleicht gefällt Ihnen der reaktive Ansatz für diese Aufgabe sogar besser, aber viele der Befragten finden es schwieriger. Im Allgemeinen lösen beide Ansätze ihr Problem, und Sie können dasjenige verwenden, das Ihnen gefällt. Übrigens gibt es in letzter Zeit in Kotlin auch die Möglichkeit, mit Flow „kalte“ Coroutinen zu erstellen, die dem Reaktor sehr ähnlich sind. Sie befinden sich zwar noch im experimentellen Stadium, aber jetzt können Sie sich die aktuelle Implementierung ansehen und sie in Ihrem Code ausprobieren.
Ich möchte hier enden und endlich nützliche Links hinterlassen:
Ich hoffe, Sie waren interessiert und haben es geschafft, eine Implementierung der Methode für alle Methoden selbst zu schreiben. Und natürlich möchte ich glauben, dass Ihnen die Option mit Coroutinen besser gefällt =)
Vielen Dank an alle, die bis zum Ende gelesen haben!