Hallo Habr!
Diese Woche erwarten wir ein neues Spring 5-
Buch aus der Druckerei:
Unter den interessanten Merkmalen von Spring 5 verdient die reaktive Programmierung eine besondere Erwähnung, deren Implementierung in diesem Rahmen in dem vorgeschlagenen Artikel von Matt Raible kurz beschrieben wird. In dem oben genannten Buch werden reaktive Muster in Kapitel 11 erörtert.
Matt wurde von Josh Long mitautorisiert, Autor eines weiteren großartigen Buches über Java und Frühling, "
Java in the Cloud ", das im letzten Sommer veröffentlicht wurde.
Reaktive Programmierung ist Ihr Weg, um Systeme zu erstellen, die gegen hohe Lasten beständig sind. Die Verarbeitung von großem Datenverkehr ist kein Problem mehr, da der Server nicht blockiert und Client-Prozesse nicht auf Antworten warten müssen. Der Client kann nicht direkt beobachten, wie das Programm auf dem Server ausgeführt wird, und mit ihm synchronisieren. Wenn die API Schwierigkeiten hat, Anforderungen zu verarbeiten, sollte sie dennoch angemessene Antworten geben. Sollte Nachrichten nicht unkontrolliert ablehnen und verwerfen. Es muss die höheren Komponenten darüber informieren, dass es unter Last arbeitet, damit sie es teilweise von dieser Last befreien können. Diese Technik wird als Gegendruck bezeichnet, ein wichtiger Aspekt der reaktiven Programmierung.
Wir haben diesen Artikel gemeinsam mit
Josh Long verfasst . Josh ist ein Java-Champion, Spring Developer Advocate und im Allgemeinen ein globaler Mitarbeiter von Pivotal. Ich habe lange mit Spring gearbeitet, aber es war Josh, der mir den Spring Boot zeigte, es war auf der Devoxx-Konferenz in Belgien. Seitdem sind wir starke Freunde geworden, wir lieben Java und schreiben coole Anwendungen.
Reaktive Programmierung oder E / A, E / A, wir machen uns an die Arbeit ...Reaktive Programmierung ist ein Ansatz zum Erstellen von Software, die aktiv asynchrone E / A verwendet. Asynchrone E / A ist eine kleine Idee, die mit großen Änderungen in der Programmierung behaftet ist. Die Idee selbst ist einfach: Die Situation mit der ineffizienten Zuweisung von Ressourcen zu korrigieren, die Ressourcen freizusetzen, die ohne unser Eingreifen nicht verfügbar gewesen wären, und auf den Abschluss von I / O zu warten. Die asynchrone Eingabe / Ausgabe kehrt den üblichen Ansatz für die E / A-Verarbeitung um: Der Client wird freigegeben und kann andere Aufgaben ausführen und auf neue Benachrichtigungen warten.
Überlegen Sie, was zwischen synchroner und asynchroner Eingabe / Ausgabe gemeinsam ist und welche Unterschiede zwischen ihnen bestehen.
Wir werden ein einfaches Programm schreiben, das Daten aus der Quelle liest (insbesondere sprechen wir über den Link
java.io.File
). Beginnen wir mit einer Implementierung, die den guten alten
java.io.InputStream
:
Beispiel 1. Synchrones Lesen von Daten aus einer Datei package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) {
- Wir stellen eine Datei zum Lesen mit der üblichen
java.io.File
- Ziehen Sie die Ergebnisse zeilenweise aus der Quelle ...
- Ich habe diesen Code geschrieben, um
Consumer<BytesPayloadgt;
wird aufgerufen, wenn neue Daten eintreffen
Einfach genug, was sagst du? Wenn Sie diesen Code ausführen, wird in der Protokollausgabe (links von jeder Zeile) angezeigt, dass alle Aktionen in einem einzelnen Thread ausgeführt werden.
Hier extrahieren wir Bytes aus unseren Daten aus der Quelle (in diesem Fall handelt es sich um eine Unterklasse von
java.io.FileInputStream
die von
java.io.InputStream
geerbt wurde). Was ist falsch an diesem Beispiel? In diesem Fall verwenden wir einen InputStream, der auf Daten in unserem Dateisystem verweist. Wenn die Datei vorhanden ist und die Festplatte funktioniert, funktioniert dieser Code wie erwartet.
Was passiert jedoch, wenn wir die Daten nicht aus der
File
, sondern aus einem Netzwerk-Socket lesen und eine andere Implementierung von
InputStream
? Kein Grund zur Sorge! Natürlich gibt es absolut keinen Grund zur Sorge, wenn die Geschwindigkeit des Netzwerks unendlich hoch ist. Und wenn der Netzwerkkanal zwischen diesem und dem anderen Knoten niemals ausfällt. Wenn diese Bedingungen erfüllt sind, funktioniert der Code einwandfrei.
Aber was passiert, wenn das Netzwerk langsamer wird oder sich hinlegt? In diesem Fall meine ich, dass wir den Zeitraum verlängern, bis die Operation
in.read(…)
. Tatsächlich kann sie überhaupt nicht zurückkommen! Dies ist ein Problem, wenn wir versuchen, mit dem Stream, aus dem wir Daten lesen, etwas anderes zu tun. Natürlich können Sie jederzeit einen anderen Stream erstellen und Daten darin lesen. Dies kann bis zu einem gewissen Punkt geschehen, aber am Ende werden wir die Grenze erreichen, an der das einfache Hinzufügen von Threads zur weiteren Skalierung nicht mehr ausreicht. Wir werden keine echte Konkurrenz haben, die über die Anzahl der Kerne auf unserer Maschine hinausgeht. Sackgasse! In diesem Fall können wir die Eingabe- / Ausgabeverarbeitung (Lesen ist hier gemeint) nur aufgrund zusätzlicher Flüsse erhöhen, aber hier werden wir früher oder später das Limit erreichen.
In diesem Beispiel ist das Hauptwerk das Lesen - an anderen Fronten passiert fast nichts. Wir sind abhängig von I / O. Überlegen Sie, wie eine asynchrone Lösung uns hilft, die Monopolisierung unserer Flüsse teilweise zu überwinden.
Beispiel 2. Asynchrones Lesen von Daten aus einer Datei package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath();
- Dieses Mal passen wir
java.io.File
und machen daraus Java NIO java.nio.file.Path
- Beim Erstellen eines
Channel
wir insbesondere den Dienst java.util.concurrent.ExecutorService
an, mit dem der CompletionHandler
Handler aufgerufen wird, wenn die erforderlichen Daten angezeigt werden - Wir beginnen mit dem Lesen, indem wir einen Link zu
CompletionHandler<Integer, ByteBuffer> (this)
- Lesen Sie im Rückruf die Bytes aus dem
ByteBuffer
in die ByteBuffer
byte[]
- Genau wie im Beispiel "
Synchronous
werden byte[]
-Daten an den Verbraucher übergeben.
Wir werden sofort eine Reservierung vornehmen: Dieser Code erwies sich als viel schwieriger! Hier ist so viel los, dass sich Ihr Kopf sofort dreht. Lassen Sie mich jedoch darauf hinweisen, dass dieser Code Daten aus dem
Java NIO Channel
liest und diese Daten dann in einem separaten Thread verarbeitet, der für Rückrufe verantwortlich ist. Somit wird der Strom, in dem das Lesen begann, nicht monopolisiert. Wir kehren fast sofort nach dem Aufruf von
.read(..)
, und wenn wir endlich die Daten zur Verfügung haben, erfolgt ein Rückruf - bereits in einem anderen Thread. Wenn zwischen den Aufrufen von
.read()
eine Verzögerung
.read()
Sie zu anderen Themen
.read()
indem Sie sie in unserem Thread ausführen. Die Dauer einer asynchronen Leseoperation vom ersten bis zum letzten Byte ist bestenfalls nicht länger als die einer synchronen Leseoperation. In der Regel ist eine asynchrone Operation nicht länger. Wenn wir jedoch zu solchen zusätzlichen Schwierigkeiten kommen, können wir unsere Ströme effektiver handhaben. Machen Sie mehr Arbeit, multiplexen Sie E / A in einem Pool mit einer endlichen Anzahl von Threads.
Ich arbeite für ein Cloud-Computing-Unternehmen. Wir möchten, dass Sie neue Instanzen der Anwendung erhalten, um Probleme mit der horizontalen Skalierung zu lösen! Natürlich bin ich hier etwas unaufrichtig. Asynchrone E / A erschweren die Arbeit ein wenig, aber ich hoffe, dieses Beispiel zeigt, wie nützlich reaktiver Code ist: Sie können mehr Anforderungen verarbeiten und mehr an Ihrer vorhandenen Hardware arbeiten, wenn die Leistung stark von E / A abhängt. Wenn die Leistung von der Verwendung des Prozessors abhängt (z. B. sprechen wir über Operationen mit Fibonacci-Zahlen, das Mining von Bitcoins oder Kryptografie), gibt uns die reaktive Programmierung nichts.
Derzeit verwenden die meisten von uns keine
Channel
oder
InputStream
Implementierungen in ihrer täglichen Arbeit! Wir müssen über Probleme auf der Ebene höherer Abstraktionen nachdenken. Es geht um Dinge wie Arrays oder vielmehr die Hierarchie
java.util.Collection
. Die Sammlung
java.util.Collection
wird in einem InputStream sehr gut angezeigt: Beide Entitäten gehen davon aus, dass Sie alle Daten gleichzeitig und fast sofort bearbeiten können. Es wird erwartet, dass Sie das Lesen der meisten
InputStreams
eher früher als später beenden können. Sammlungstypen werden beim Verschieben auf größere Datenmengen etwas unangenehm. Was ist, wenn Sie es mit etwas potenziell Unendlichem (Unbegrenztem) zu tun haben - zum Beispiel mit Web-Sockets oder Serverereignissen? Was tun, wenn zwischen den Aufnahmen eine Verzögerung auftritt?
Wir brauchen einen besseren Weg, um diese Art von Daten zu beschreiben. Wir sprechen von asynchronen Ereignissen, die am Ende auftreten werden. Es mag scheinen, dass
Future<T>
oder
CompletableFuture<T>
für diesen Zweck gut geeignet sind, aber sie beschreiben nur eine Sache, die am Ende passiert. Tatsächlich bietet Java keine geeignete Metapher, um diese Art von Daten zu beschreiben. Sowohl der
Iterator
als auch der
Stream
Typ von Java 8 sind möglicherweise nicht miteinander verbunden. Beide sind jedoch auf Pull ausgerichtet. Sie selbst fordern den nächsten Eintrag an, nicht der Typ sollte einen Rückruf an Ihren Code senden. Es wird davon ausgegangen, dass, wenn in diesem Fall eine Push-basierte Verarbeitung unterstützt würde, die es ermöglichen würde, auf Thread-Ebene viel mehr zu erreichen, die API auch eine Threading- und Planungssteuerung bereitstellen würde.
Iterator
Implementierungen sagen nichts über das Threading aus, und alle Java 8-Threads verwenden denselben Fork-Join-Pool.
Wenn
Iterator
und
Stream
die Push-Verarbeitung wirklich unterstützen würden, würden wir auf ein anderes Problem stoßen, das sich gerade im Zusammenhang mit E / A wirklich verschärft: Wir brauchen eine Art Rückpenetrationsmechanismus! Da der Datenkonsument asynchron verarbeitet wird, wissen wir nicht, wann und in welcher Menge die Daten in der Pipeline sind. Wir wissen nicht, wie viele Daten beim nächsten Rückruf verarbeitet werden müssen: ein Byte oder ein Terabyte!
Wenn Sie Daten aus einem
InputStream
, lesen Sie so viele Informationen, wie Sie verarbeiten
InputStream
, und nicht mehr. In den vorherigen Beispielen lesen wir Daten mit einer festen und bekannten Länge in den
byte[]
-Puffer. In einem asynchronen Kontext müssen wir dem Anbieter eine Möglichkeit mitteilen, wie viele Daten wir verarbeiten möchten.
Ja, Sir. Hier fehlt sicher etwas.
Suche nach der fehlenden MetapherIn diesem Fall suchen wir nach einer Metapher, die das Wesen der asynchronen E / A auf wunderbare Weise widerspiegelt, einen solchen Mechanismus für die umgekehrte Datenübertragung unterstützt und es uns ermöglicht, den Ausführungsfluss in verteilten Systemen zu steuern. Bei der reaktiven Programmierung wird die Fähigkeit eines Clients, zu signalisieren, welche Last er bewältigen kann, als "Rückfluss" bezeichnet.
Jetzt gibt es eine Reihe guter Projekte - Vert.x, Akka Streams und RxJava -, die reaktive Programmierung unterstützen. Das Spring-Team führt auch ein Projekt namens
Reactor durch . Zwischen diesen verschiedenen Standards gibt es ein ziemlich breites allgemeines Feld, das de facto dem Standard der
Reactive Streams-Initiative zugeordnet ist . Die Initiative Reactive Streams definiert vier Typen:
Publisher<T>
Schnittstelle
Publisher<T>
; erzeugt Werte, die letztendlich eintreffen können.
Publisher<T>
Schnittstelle
Publisher<T>
; erzeugt Werte vom Typ
T
für
Subscriber<T>
.
Beispiel 3. Reaktive Streams: Publisher<T>
-Schnittstelle .
package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); }
Der
Subscriber
abonniert
Publisher<T>
und erhält über seine
onNext(T)
-Methode Benachrichtigungen über neue Werte vom Typ
T
Wenn Fehler auftreten, wird die Methode
onError(Throwable)
. Wenn die Verarbeitung normal abgeschlossen ist, wird die
onComplete
Methode des Teilnehmers aufgerufen.
Beispiel 4. Jetstreams: Subscriber<T>
-Schnittstelle. package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Wenn der
Subscriber
ersten Mal eine Verbindung zu
Publisher
, erhält er ein
Subscription
in der
Subscriber#onSubscribe
. Abonnement Das
Subscription
ist möglicherweise der wichtigste Teil der gesamten Spezifikation. sie ist es, die den Rückfluss liefert. Ein Abonnent-Abonnent verwendet die
Subscription#request
, um zusätzliche Daten anzufordern, oder die
Subscription#cancel
, um die Verarbeitung zu beenden.
Beispiel 5. Reaktive Streams: Subscription<T>
-Schnittstelle .
package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); }
Die reaktive Stream-Spezifikation bietet einen weiteren nützlichen, wenn auch offensichtlichen Typ:
Processor<A,B>
ist nur eine Schnittstelle, die sowohl
Subscriber<A>
als auch
Publisher<B>
erbt.
Beispiel 6. Jetstreams: Processor<T>
-Schnittstelle .
package org.reactivestreams; public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Eine Spezifikation ist nicht als Rezept für Implementierungen positioniert. Tatsächlich besteht ihr Zweck darin, Typen zu definieren, um die Interoperabilität zu unterstützen. Der offensichtliche Vorteil von Typen, die mit reaktiven Flows verbunden sind, besteht darin, dass sie dennoch einen Platz in der Java 9-Version gefunden haben. Außerdem sind sie semantisch „eins zu eins“ und entsprechen Schnittstellen der Klasse
java.util.concurrent.Flow
, zum Beispiel:
java.util.concurrent.Flow.Publisher
.
Treffen Sie ReactorArten von reaktiven Strömen allein reichen nicht aus; Implementierungen höherer Ordnung sind erforderlich, um Operationen wie Filtern und Transformieren zu unterstützen. Als solches ist das Reaktorprojekt praktisch; Es baut auf der Reactive Streams-Spezifikation auf und bietet zwei
Publisher<T>
-Spezialisierungen.
Erstens ist
Flux<T>
ein
Publisher
, der null oder mehr Werte erzeugt. Das zweite,
Mono<T>
, ist
Publisher<T>
und erzeugt null oder einen Wert. Beide veröffentlichen Werte und können sie entsprechend behandeln. Ihre Funktionen sind jedoch viel umfassender als die der Reactive Streams-Spezifikation. Beide bieten Operatoren, mit denen Sie Wertströme verarbeiten können. Reaktortypen lassen sich gut zusammensetzen - die Ausgabe eines von ihnen kann als Eingabe für den anderen dienen, und wenn ein Typ mit anderen Datenströmen arbeiten muss, sind sie auf
Publisher<T>
-Instanzen angewiesen.
Sowohl
Mono<T>
als auch
Flux<T>
implementieren
Publisher<T>
. Wir empfehlen, dass Ihre Methoden
Publisher<T>
-Instanzen akzeptieren, aber
Flux<T>
oder
Mono<T>
. Dies hilft dem Kunden zu unterscheiden, welche Art von Daten er erhält.
Angenommen, Sie haben
Publisher<T>
und wurden aufgefordert, die Benutzeroberfläche für diesen
Publisher<T>
anzuzeigen. Sollte ich dann eine Seite mit Details für einen Datensatz anzeigen, da Sie
CompletableFuture<T>
? Oder eine Übersichtsseite mit einer Liste oder einem Raster anzeigen, auf der alle Einträge Seite für Seite angezeigt werden? Es ist schwer zu sagen.
Flux<T>
und
Mono<T>
sehr spezifisch. Sie wissen, dass Sie eine Überprüfungsseite anzeigen müssen, wenn
Flux<T>
empfangen wird, und eine Seite mit Details für einen (oder keinen einzelnen) Datensatz, wenn Sie
Mono<T>
empfangen.
Reactor ist ein Open-Source-Projekt von Pivotal. Jetzt ist er sehr beliebt geworden. Facebook verwendet es in seinem Düsentriebwerk
, um Remote-Prozeduren aufzurufen , und verwendet es auch in
Rsocket , das vom RxJava-Erfinder Ben Christensen geleitet wird. Salesforce verwendet es in seiner
reaktiven gRPC-Implementierung . Reactor implementiert reaktive Streams-Typen, sodass es mit anderen Technologien interagieren kann, die diese Typen unterstützen, z. B. mit
RxJava 2 von Netflix,
Akka Streams von Lightbend und mit dem
Vert.x- Projekt von Eclipse Foundation. David Cairnock, Direktor von RxJava 2, arbeitete auch aktiv mit Pivotal zusammen, um Reactor zu entwickeln und das Projekt noch besser zu machen. Außerdem ist es natürlich in der einen oder anderen Form im Spring Framework vorhanden, beginnend mit Spring Framework 4.0.
Reaktive Programmierung mit Spring WebFluxBei aller Nützlichkeit ist der Reaktor nur die Basis. Unsere Anwendungen müssen mit Datenquellen kommunizieren. Muss Authentifizierung und Autorisierung unterstützen. Der Frühling bietet all dies. Wenn Reactor uns die fehlende Metapher gibt, hilft uns der Frühling alle, eine gemeinsame Sprache zu sprechen.
Spring Framework 5.0 wurde im September 2017 veröffentlicht. Es baut auf der Reactor- und der Reactive Streams-Spezifikation auf. Es verfügt über ein neues reaktives Laufzeit- und Komponentenmodell namens
Spring WebFlux .
Spring WebFlux ist unabhängig von der Servlet-API und erfordert keine Funktion. Es wird mit Adaptern geliefert, mit denen Sie es bei Bedarf über dem Servlet-Motor verwenden können. Dies ist jedoch nicht erforderlich. Es bietet auch eine völlig neue Netty-basierte Laufzeit namens Spring WebFlux. Spring Framework 5, das mit Java 8 und Java EE 7 und höher arbeitet, dient jetzt als Grundlage für einen Großteil des Spring-Ökosystems, einschließlich Spring Data Kay, Spring Security 5, Spring Boot 2 und Spring Cloud Finchley.