Einführung der reaktiven Programmierung im Frühjahr

Hallo Habr!

Diese Woche erwarten wir ein neues Spring 5- Buch aus der Druckerei:


Unter den interessanten Merkmalen von Spring 5 verdient die reaktive Programmierung eine besondere Erwähnung, deren Implementierung in diesem Rahmen in dem vorgeschlagenen Artikel von Matt Raible kurz beschrieben wird. In dem oben genannten Buch werden reaktive Muster in Kapitel 11 erörtert.

Matt wurde von Josh Long mitautorisiert, Autor eines weiteren großartigen Buches über Java und Frühling, " Java in the Cloud ", das im letzten Sommer veröffentlicht wurde.

Reaktive Programmierung ist Ihr Weg, um Systeme zu erstellen, die gegen hohe Lasten beständig sind. Die Verarbeitung von großem Datenverkehr ist kein Problem mehr, da der Server nicht blockiert und Client-Prozesse nicht auf Antworten warten müssen. Der Client kann nicht direkt beobachten, wie das Programm auf dem Server ausgeführt wird, und mit ihm synchronisieren. Wenn die API Schwierigkeiten hat, Anforderungen zu verarbeiten, sollte sie dennoch angemessene Antworten geben. Sollte Nachrichten nicht unkontrolliert ablehnen und verwerfen. Es muss die höheren Komponenten darüber informieren, dass es unter Last arbeitet, damit sie es teilweise von dieser Last befreien können. Diese Technik wird als Gegendruck bezeichnet, ein wichtiger Aspekt der reaktiven Programmierung.

Wir haben diesen Artikel gemeinsam mit Josh Long verfasst . Josh ist ein Java-Champion, Spring Developer Advocate und im Allgemeinen ein globaler Mitarbeiter von Pivotal. Ich habe lange mit Spring gearbeitet, aber es war Josh, der mir den Spring Boot zeigte, es war auf der Devoxx-Konferenz in Belgien. Seitdem sind wir starke Freunde geworden, wir lieben Java und schreiben coole Anwendungen.

Reaktive Programmierung oder E / A, E / A, wir machen uns an die Arbeit ...

Reaktive Programmierung ist ein Ansatz zum Erstellen von Software, die aktiv asynchrone E / A verwendet. Asynchrone E / A ist eine kleine Idee, die mit großen Änderungen in der Programmierung behaftet ist. Die Idee selbst ist einfach: Die Situation mit der ineffizienten Zuweisung von Ressourcen zu korrigieren, die Ressourcen freizusetzen, die ohne unser Eingreifen nicht verfügbar gewesen wären, und auf den Abschluss von I / O zu warten. Die asynchrone Eingabe / Ausgabe kehrt den üblichen Ansatz für die E / A-Verarbeitung um: Der Client wird freigegeben und kann andere Aufgaben ausführen und auf neue Benachrichtigungen warten.

Überlegen Sie, was zwischen synchroner und asynchroner Eingabe / Ausgabe gemeinsam ist und welche Unterschiede zwischen ihnen bestehen.

Wir werden ein einfaches Programm schreiben, das Daten aus der Quelle liest (insbesondere sprechen wir über den Link java.io.File ). Beginnen wir mit einer Implementierung, die den guten alten java.io.InputStream :

Beispiel 1. Synchrones Lesen von Daten aus einer Datei

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } } 

  1. Wir stellen eine Datei zum Lesen mit der üblichen java.io.File
  2. Ziehen Sie die Ergebnisse zeilenweise aus der Quelle ...
  3. Ich habe diesen Code geschrieben, um Consumer<BytesPayloadgt; wird aufgerufen, wenn neue Daten eintreffen

Einfach genug, was sagst du? Wenn Sie diesen Code ausführen, wird in der Protokollausgabe (links von jeder Zeile) angezeigt, dass alle Aktionen in einem einzelnen Thread ausgeführt werden.
Hier extrahieren wir Bytes aus unseren Daten aus der Quelle (in diesem Fall handelt es sich um eine Unterklasse von java.io.FileInputStream die von java.io.InputStream geerbt wurde). Was ist falsch an diesem Beispiel? In diesem Fall verwenden wir einen InputStream, der auf Daten in unserem Dateisystem verweist. Wenn die Datei vorhanden ist und die Festplatte funktioniert, funktioniert dieser Code wie erwartet.

Was passiert jedoch, wenn wir die Daten nicht aus der File , sondern aus einem Netzwerk-Socket lesen und eine andere Implementierung von InputStream ? Kein Grund zur Sorge! Natürlich gibt es absolut keinen Grund zur Sorge, wenn die Geschwindigkeit des Netzwerks unendlich hoch ist. Und wenn der Netzwerkkanal zwischen diesem und dem anderen Knoten niemals ausfällt. Wenn diese Bedingungen erfüllt sind, funktioniert der Code einwandfrei.

Aber was passiert, wenn das Netzwerk langsamer wird oder sich hinlegt? In diesem Fall meine ich, dass wir den Zeitraum verlängern, bis die Operation in.read(…) . Tatsächlich kann sie überhaupt nicht zurückkommen! Dies ist ein Problem, wenn wir versuchen, mit dem Stream, aus dem wir Daten lesen, etwas anderes zu tun. Natürlich können Sie jederzeit einen anderen Stream erstellen und Daten darin lesen. Dies kann bis zu einem gewissen Punkt geschehen, aber am Ende werden wir die Grenze erreichen, an der das einfache Hinzufügen von Threads zur weiteren Skalierung nicht mehr ausreicht. Wir werden keine echte Konkurrenz haben, die über die Anzahl der Kerne auf unserer Maschine hinausgeht. Sackgasse! In diesem Fall können wir die Eingabe- / Ausgabeverarbeitung (Lesen ist hier gemeint) nur aufgrund zusätzlicher Flüsse erhöhen, aber hier werden wir früher oder später das Limit erreichen.

In diesem Beispiel ist das Hauptwerk das Lesen - an anderen Fronten passiert fast nichts. Wir sind abhängig von I / O. Überlegen Sie, wie eine asynchrone Lösung uns hilft, die Monopolisierung unserer Flüsse teilweise zu überwinden.

Beispiel 2. Asynchrones Lesen von Daten aus einer Datei

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } } 

  1. Dieses Mal passen wir java.io.File und machen daraus Java NIO java.nio.file.Path
  2. Beim Erstellen eines Channel wir insbesondere den Dienst java.util.concurrent.ExecutorService an, mit dem der CompletionHandler Handler aufgerufen wird, wenn die erforderlichen Daten angezeigt werden
  3. Wir beginnen mit dem Lesen, indem wir einen Link zu CompletionHandler<Integer, ByteBuffer> (this)
  4. Lesen Sie im Rückruf die Bytes aus dem ByteBuffer in die ByteBuffer byte[]
  5. Genau wie im Beispiel " Synchronous werden byte[] -Daten an den Verbraucher übergeben.

Wir werden sofort eine Reservierung vornehmen: Dieser Code erwies sich als viel schwieriger! Hier ist so viel los, dass sich Ihr Kopf sofort dreht. Lassen Sie mich jedoch darauf hinweisen, dass dieser Code Daten aus dem Java NIO Channel liest und diese Daten dann in einem separaten Thread verarbeitet, der für Rückrufe verantwortlich ist. Somit wird der Strom, in dem das Lesen begann, nicht monopolisiert. Wir kehren fast sofort nach dem Aufruf von .read(..) , und wenn wir endlich die Daten zur Verfügung haben, erfolgt ein Rückruf - bereits in einem anderen Thread. Wenn zwischen den Aufrufen von .read() eine Verzögerung .read() Sie zu anderen Themen .read() indem Sie sie in unserem Thread ausführen. Die Dauer einer asynchronen Leseoperation vom ersten bis zum letzten Byte ist bestenfalls nicht länger als die einer synchronen Leseoperation. In der Regel ist eine asynchrone Operation nicht länger. Wenn wir jedoch zu solchen zusätzlichen Schwierigkeiten kommen, können wir unsere Ströme effektiver handhaben. Machen Sie mehr Arbeit, multiplexen Sie E / A in einem Pool mit einer endlichen Anzahl von Threads.

Ich arbeite für ein Cloud-Computing-Unternehmen. Wir möchten, dass Sie neue Instanzen der Anwendung erhalten, um Probleme mit der horizontalen Skalierung zu lösen! Natürlich bin ich hier etwas unaufrichtig. Asynchrone E / A erschweren die Arbeit ein wenig, aber ich hoffe, dieses Beispiel zeigt, wie nützlich reaktiver Code ist: Sie können mehr Anforderungen verarbeiten und mehr an Ihrer vorhandenen Hardware arbeiten, wenn die Leistung stark von E / A abhängt. Wenn die Leistung von der Verwendung des Prozessors abhängt (z. B. sprechen wir über Operationen mit Fibonacci-Zahlen, das Mining von Bitcoins oder Kryptografie), gibt uns die reaktive Programmierung nichts.

Derzeit verwenden die meisten von uns keine Channel oder InputStream Implementierungen in ihrer täglichen Arbeit! Wir müssen über Probleme auf der Ebene höherer Abstraktionen nachdenken. Es geht um Dinge wie Arrays oder vielmehr die Hierarchie java.util.Collection . Die Sammlung java.util.Collection wird in einem InputStream sehr gut angezeigt: Beide Entitäten gehen davon aus, dass Sie alle Daten gleichzeitig und fast sofort bearbeiten können. Es wird erwartet, dass Sie das Lesen der meisten InputStreams eher früher als später beenden können. Sammlungstypen werden beim Verschieben auf größere Datenmengen etwas unangenehm. Was ist, wenn Sie es mit etwas potenziell Unendlichem (Unbegrenztem) zu tun haben - zum Beispiel mit Web-Sockets oder Serverereignissen? Was tun, wenn zwischen den Aufnahmen eine Verzögerung auftritt?

Wir brauchen einen besseren Weg, um diese Art von Daten zu beschreiben. Wir sprechen von asynchronen Ereignissen, die am Ende auftreten werden. Es mag scheinen, dass Future<T> oder CompletableFuture<T> für diesen Zweck gut geeignet sind, aber sie beschreiben nur eine Sache, die am Ende passiert. Tatsächlich bietet Java keine geeignete Metapher, um diese Art von Daten zu beschreiben. Sowohl der Iterator als auch der Stream Typ von Java 8 sind möglicherweise nicht miteinander verbunden. Beide sind jedoch auf Pull ausgerichtet. Sie selbst fordern den nächsten Eintrag an, nicht der Typ sollte einen Rückruf an Ihren Code senden. Es wird davon ausgegangen, dass, wenn in diesem Fall eine Push-basierte Verarbeitung unterstützt würde, die es ermöglichen würde, auf Thread-Ebene viel mehr zu erreichen, die API auch eine Threading- und Planungssteuerung bereitstellen würde. Iterator Implementierungen sagen nichts über das Threading aus, und alle Java 8-Threads verwenden denselben Fork-Join-Pool.

Wenn Iterator und Stream die Push-Verarbeitung wirklich unterstützen würden, würden wir auf ein anderes Problem stoßen, das sich gerade im Zusammenhang mit E / A wirklich verschärft: Wir brauchen eine Art Rückpenetrationsmechanismus! Da der Datenkonsument asynchron verarbeitet wird, wissen wir nicht, wann und in welcher Menge die Daten in der Pipeline sind. Wir wissen nicht, wie viele Daten beim nächsten Rückruf verarbeitet werden müssen: ein Byte oder ein Terabyte!

Wenn Sie Daten aus einem InputStream , lesen Sie so viele Informationen, wie Sie verarbeiten InputStream , und nicht mehr. In den vorherigen Beispielen lesen wir Daten mit einer festen und bekannten Länge in den byte[] -Puffer. In einem asynchronen Kontext müssen wir dem Anbieter eine Möglichkeit mitteilen, wie viele Daten wir verarbeiten möchten.
Ja, Sir. Hier fehlt sicher etwas.

Suche nach der fehlenden Metapher

In diesem Fall suchen wir nach einer Metapher, die das Wesen der asynchronen E / A auf wunderbare Weise widerspiegelt, einen solchen Mechanismus für die umgekehrte Datenübertragung unterstützt und es uns ermöglicht, den Ausführungsfluss in verteilten Systemen zu steuern. Bei der reaktiven Programmierung wird die Fähigkeit eines Clients, zu signalisieren, welche Last er bewältigen kann, als "Rückfluss" bezeichnet.

Jetzt gibt es eine Reihe guter Projekte - Vert.x, Akka Streams und RxJava -, die reaktive Programmierung unterstützen. Das Spring-Team führt auch ein Projekt namens Reactor durch . Zwischen diesen verschiedenen Standards gibt es ein ziemlich breites allgemeines Feld, das de facto dem Standard der Reactive Streams-Initiative zugeordnet ist . Die Initiative Reactive Streams definiert vier Typen:

Publisher<T> Schnittstelle Publisher<T> ; erzeugt Werte, die letztendlich eintreffen können. Publisher<T> Schnittstelle Publisher<T> ; erzeugt Werte vom Typ T für Subscriber<T> .

Beispiel 3. Reaktive Streams: Publisher<T> -Schnittstelle .

 package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); } 

Der Subscriber abonniert Publisher<T> und erhält über seine onNext(T) -Methode Benachrichtigungen über neue Werte vom Typ T Wenn Fehler auftreten, wird die Methode onError(Throwable) . Wenn die Verarbeitung normal abgeschlossen ist, wird die onComplete Methode des Teilnehmers aufgerufen.

Beispiel 4. Jetstreams: Subscriber<T> -Schnittstelle.

 package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } 

Wenn der Subscriber ersten Mal eine Verbindung zu Publisher , erhält er ein Subscription in der Subscriber#onSubscribe . Abonnement Das Subscription ist möglicherweise der wichtigste Teil der gesamten Spezifikation. sie ist es, die den Rückfluss liefert. Ein Abonnent-Abonnent verwendet die Subscription#request , um zusätzliche Daten anzufordern, oder die Subscription#cancel , um die Verarbeitung zu beenden.

Beispiel 5. Reaktive Streams: Subscription<T> -Schnittstelle .

 package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); } 

Die reaktive Stream-Spezifikation bietet einen weiteren nützlichen, wenn auch offensichtlichen Typ: Processor<A,B> ist nur eine Schnittstelle, die sowohl Subscriber<A> als auch Publisher<B> erbt.

Beispiel 6. Jetstreams: Processor<T> -Schnittstelle .

 package org.reactivestreams; public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> { } 

Eine Spezifikation ist nicht als Rezept für Implementierungen positioniert. Tatsächlich besteht ihr Zweck darin, Typen zu definieren, um die Interoperabilität zu unterstützen. Der offensichtliche Vorteil von Typen, die mit reaktiven Flows verbunden sind, besteht darin, dass sie dennoch einen Platz in der Java 9-Version gefunden haben. Außerdem sind sie semantisch „eins zu eins“ und entsprechen Schnittstellen der Klasse java.util.concurrent.Flow , zum Beispiel: java.util.concurrent.Flow.Publisher .

Treffen Sie Reactor

Arten von reaktiven Strömen allein reichen nicht aus; Implementierungen höherer Ordnung sind erforderlich, um Operationen wie Filtern und Transformieren zu unterstützen. Als solches ist das Reaktorprojekt praktisch; Es baut auf der Reactive Streams-Spezifikation auf und bietet zwei Publisher<T> -Spezialisierungen.

Erstens ist Flux<T> ein Publisher , der null oder mehr Werte erzeugt. Das zweite, Mono<T> , ist Publisher<T> und erzeugt null oder einen Wert. Beide veröffentlichen Werte und können sie entsprechend behandeln. Ihre Funktionen sind jedoch viel umfassender als die der Reactive Streams-Spezifikation. Beide bieten Operatoren, mit denen Sie Wertströme verarbeiten können. Reaktortypen lassen sich gut zusammensetzen - die Ausgabe eines von ihnen kann als Eingabe für den anderen dienen, und wenn ein Typ mit anderen Datenströmen arbeiten muss, sind sie auf Publisher<T> -Instanzen angewiesen.

Sowohl Mono<T> als auch Flux<T> implementieren Publisher<T> . Wir empfehlen, dass Ihre Methoden Publisher<T> -Instanzen akzeptieren, aber Flux<T> oder Mono<T> . Dies hilft dem Kunden zu unterscheiden, welche Art von Daten er erhält.

Angenommen, Sie haben Publisher<T> und wurden aufgefordert, die Benutzeroberfläche für diesen Publisher<T> anzuzeigen. Sollte ich dann eine Seite mit Details für einen Datensatz anzeigen, da Sie CompletableFuture<T> ? Oder eine Übersichtsseite mit einer Liste oder einem Raster anzeigen, auf der alle Einträge Seite für Seite angezeigt werden? Es ist schwer zu sagen.

Flux<T> und Mono<T> sehr spezifisch. Sie wissen, dass Sie eine Überprüfungsseite anzeigen müssen, wenn Flux<T> empfangen wird, und eine Seite mit Details für einen (oder keinen einzelnen) Datensatz, wenn Sie Mono<T> empfangen.

Reactor ist ein Open-Source-Projekt von Pivotal. Jetzt ist er sehr beliebt geworden. Facebook verwendet es in seinem Düsentriebwerk , um Remote-Prozeduren aufzurufen , und verwendet es auch in Rsocket , das vom RxJava-Erfinder Ben Christensen geleitet wird. Salesforce verwendet es in seiner reaktiven gRPC-Implementierung . Reactor implementiert reaktive Streams-Typen, sodass es mit anderen Technologien interagieren kann, die diese Typen unterstützen, z. B. mit RxJava 2 von Netflix, Akka Streams von Lightbend und mit dem Vert.x- Projekt von Eclipse Foundation. David Cairnock, Direktor von RxJava 2, arbeitete auch aktiv mit Pivotal zusammen, um Reactor zu entwickeln und das Projekt noch besser zu machen. Außerdem ist es natürlich in der einen oder anderen Form im Spring Framework vorhanden, beginnend mit Spring Framework 4.0.

Reaktive Programmierung mit Spring WebFlux

Bei aller Nützlichkeit ist der Reaktor nur die Basis. Unsere Anwendungen müssen mit Datenquellen kommunizieren. Muss Authentifizierung und Autorisierung unterstützen. Der Frühling bietet all dies. Wenn Reactor uns die fehlende Metapher gibt, hilft uns der Frühling alle, eine gemeinsame Sprache zu sprechen.

Spring Framework 5.0 wurde im September 2017 veröffentlicht. Es baut auf der Reactor- und der Reactive Streams-Spezifikation auf. Es verfügt über ein neues reaktives Laufzeit- und Komponentenmodell namens Spring WebFlux .

Spring WebFlux ist unabhängig von der Servlet-API und erfordert keine Funktion. Es wird mit Adaptern geliefert, mit denen Sie es bei Bedarf über dem Servlet-Motor verwenden können. Dies ist jedoch nicht erforderlich. Es bietet auch eine völlig neue Netty-basierte Laufzeit namens Spring WebFlux. Spring Framework 5, das mit Java 8 und Java EE 7 und höher arbeitet, dient jetzt als Grundlage für einen Großteil des Spring-Ökosystems, einschließlich Spring Data Kay, Spring Security 5, Spring Boot 2 und Spring Cloud Finchley.

Source: https://habr.com/ru/post/de435972/


All Articles