Reaktive Programmierung mit JAX-RS

Hallo allerseits!

Das letzte Jahr, der Java Enterprise Developer- Kurs , wurde erfolgreich gestartet. Wir haben das letzte Material zu diesem Thema, das wir mit Ihnen teilen möchten, in dem die Verwendung des asynchronen Ansatzes und des Staging für die Entwicklung reaktionsschneller reaktionsfähiger Anwendungen erläutert wird.

Lass uns gehen.

Reaktive Programmierung klingt zunächst wie der Name eines aufkommenden Paradigmas, bezieht sich jedoch tatsächlich auf eine Programmiermethode, bei der ein ereignisorientierter Ansatz verwendet wird, um mit asynchronen Datenströmen zu arbeiten. Basierend auf ständig aktuellen Daten reagieren reaktive Systeme auf sie, indem sie eine Reihe von Ereignissen ausführen.
Die reaktive Programmierung folgt dem Entwurfsmuster „Observer“, das wie folgt definiert werden kann: Wenn sich ein Status in einem Objekt ändert, werden alle anderen Objekte benachrichtigt und entsprechend aktualisiert. Anstatt Ereignisse nach Änderungen abzufragen, werden Ereignisse daher asynchron verschoben, damit Beobachter sie verarbeiten können. In diesem Beispiel sind Beobachter Funktionen, die ausgeführt werden, wenn das Ereignis ausgelöst wird. Und der erwähnte Datenstrom ist der tatsächlich beobachtbare.

Fast alle Sprachen und Frameworks verwenden diesen Ansatz in ihrem Ökosystem, und die neuesten Versionen von Java sind keine Ausnahme. In diesem Artikel werde ich erklären, wie reaktive Programmierung mit der neuesten Version von JAX-RS in Java EE 8 und Java 8 angewendet werden kann.



Jet Manifest

Das Jet-Manifest listet vier grundlegende Aspekte auf, nach denen eine Anwendung flexibler, lose gekoppelt und einfach zu skalieren sein muss und daher reaktiv sein kann. Es heißt, dass die Anwendung reaktionsschnell, flexibel (und daher skalierbar), belastbar und nachrichtengesteuert sein muss.

Das zugrunde liegende Ziel ist eine wirklich reaktionsschnelle Anwendung. Angenommen, es gibt eine Anwendung, in der ein großer Thread Benutzeranforderungen verarbeitet, und nach Abschluss der Arbeit sendet dieser Thread Antworten an die ursprünglichen Anforderer zurück. Wenn eine Anwendung mehr Anforderungen empfängt, als sie verarbeiten kann, wird dieser Thread zu einem Engpass, und die Anwendung verliert ihre frühere Reaktionsfähigkeit. Um die Reaktionsfähigkeit aufrechtzuerhalten, muss die Anwendung skalierbar und belastbar sein. Nachhaltig kann als eine Anwendung betrachtet werden, die Funktionen für die automatische Wiederherstellung bietet. Nach den Erfahrungen der meisten Entwickler ermöglicht nur eine nachrichtengesteuerte Architektur, dass die Anwendung skalierbar, belastbar und reaktionsschnell ist.

Reaktive Programmierung wurde in Java 8 und Java EE 8 eingeführt. Java führte Konzepte wie CompletionStage und die Implementierung von CompletableFuture , und Java begann, diese Funktionen in Spezifikationen wie der Reactive Client-API in JAX-RS zu verwenden.

JAX-RS 2.1 Reactive Client API

Lassen Sie uns sehen, wie reaktive Programmierung in Java EE 8-Anwendungen verwendet werden kann. Um den Prozess zu verstehen, benötigen Sie einige Grundkenntnisse der Java EE-API.

JAX-RS 2.1 hat eine neue Methode zum Erstellen eines REST-Clients mit Unterstützung für reaktive Programmierung eingeführt. Die in JAX-RS angebotene Standard-Invoker-Implementierung ist synchron. Dies bedeutet, dass der erstellte Client einen blockierenden Anruf an den Endpunkt des Servers sendet. Eine Beispielimplementierung ist in Listing 1 dargestellt.

Listing 1

 Response response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .get(); 

Ab Version 2.0 bietet JAX-RS Unterstützung für das Erstellen eines asynchronen Aufrufers auf der Client-API mit einem einfachen Aufruf der async() -Methode (siehe Listing 2).

Listing 2:

 Future<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .async() .get(); 

Die Verwendung eines asynchronen Aufrufers auf dem Client gibt eine Future Instanz vom Typ javax.ws.rs.core.Response . Dies kann dazu führen, dass die Antwort mit einem Aufruf von future.get() oder ein Rückruf registriert wird, der aufgerufen wird, wenn eine HTTP-Antwort verfügbar ist. Beide Implementierungen eignen sich für die asynchrone Programmierung. In der Regel wird es jedoch kompliziert, wenn Sie Rückrufe gruppieren oder diesen asynchronen Ausführungsminima bedingte Fälle hinzufügen möchten.

JAX-RS 2.1 bietet eine reaktive Möglichkeit, diese Probleme mit der neuen JAX-RS Reactive Client-API zum Erstellen eines Clients zu lösen. Es ist so einfach wie das Aufrufen der Methode rx() während der rx() . In Listing 3 gibt die Methode rx() den reaktiven Aufrufer zurück, der während der Clientausführung vorhanden ist, und der Client gibt eine Antwort vom Typ CompletionStage.rx() , die den Übergang vom synchronen Aufrufer zum asynchronen Aufrufer mit einem einfachen Aufruf ermöglicht.

Listing 3:

 CompletionStage<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .rx() .get(); 

CompletionStage<> ist eine neue Schnittstelle, die in Java 8 eingeführt wurde. Sie stellt eine Berechnung dar, die, wie der Name schon sagt, ein Schritt im Rahmen einer größeren Berechnung sein kann. Dies ist der einzige Vertreter der Java 8-Reaktivität, der den JAX-RS erreicht.
Nach dem Empfang der AcceptAsync() kann ich AcceptAsync() aufrufen, wo ich einen Code bereitstellen kann, der asynchron ausgeführt wird, wenn die Antwort verfügbar wird, wie in Listing 4 gezeigt.

Listing 4:

 response.thenAcceptAsync(res -> { Temperature t = res.readEntity(Temperature.class); //do stuff with t }); 

Hinzufügen von Reaktivität zu einem Endpunkt-REST

Der reaktive Ansatz ist in JAX-RS nicht auf die Client-Seite beschränkt. Es kann auch auf der Serverseite verwendet werden. Zum Beispiel werde ich zuerst ein einfaches Skript erstellen, in dem ich eine Liste der Standorte eines Ziels anfordern kann. Für jede Position werde ich einen separaten Anruf mit Standortdaten an einen anderen Punkt tätigen, um die Temperaturwerte zu erhalten. Die Interaktion der Ziele erfolgt wie in Abbildung 1 dargestellt.


Abbildung 1. Interaktion zwischen Zielpunkten

Zuerst definiere ich einfach das Domänenmodell und dann die Dienste für jedes Modell. Listing 5 zeigt, wie die Forecast Klasse definiert ist, die die Location und Temperature Klassen Temperature .

Listing 5:

 public class Temperature { private Double temperature; private String scale; // getters & setters } public class Location { String name; public Location() {} public Location(String name) { this.name = name; } // getters & setters } public class Forecast { private Location location; private Temperature temperature; public Forecast(Location location) { this.location = location; } public Forecast setTemperature( final Temperature temperature) { this.temperature = temperature; return this; } // getters } 

Um die Prognoseliste zu verpacken, ist die ServiceResponse Klasse in Listing 6 implementiert.

Listing 6:

 public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; } // getters } 

LocationResource in Listing 7 gezeigte LocationResource definiert drei Beispielpositionen, die mit dem Pfad /location .

Listing 7:

 @Path("/location") public class LocationResource { @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocations() { List<Location> locations = new ArrayList<>(); locations.add(new Location("London")); locations.add(new Location("Istanbul")); locations.add(new Location("Prague")); return Response.ok(new GenericEntity<List<Location>>(locations){}).build(); } } 

TemperatureResource in Listing 8 gezeigte TemperatureResource gibt einen zufällig generierten Temperaturwert zwischen 30 und 50 für einen bestimmten Standort zurück. Der Implementierung wurde eine Verzögerung von 500 ms hinzugefügt, um einen Sensorlesevorgang zu simulieren.

Listing 8:

 @Path("/temperature") public class TemperatureResource { @GET @Path("/{city}") @Produces(MediaType.APPLICATION_JSON) public Response getAverageTemperature(@PathParam("city") String cityName) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { ignored.printStackTrace(); } return Response.ok(temperature).build(); } } 

Zunächst werde ich die Implementierung der synchronen ForecastResource (siehe Listing 9) zeigen, die alle Speicherorte zurückgibt. Dann ruft er für jede Position den Temperaturdienst an, um die Werte in Grad Celsius zu erhalten.

Listing 9:

 @Path("/forecast") public class ForecastResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocationsWithTemperature() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = locationTarget .request() .get(new GenericType<List<Location>>(){}); locations.forEach(location -> { Temperature temperature = temperatureTarget .resolveTemplate("city", location.getName()) .request() .get(Temperature.class); response.getForecasts().add( new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return Response.ok(response).build(); } } 

Wenn das Prognoseziel als /forecast angefordert wird, erhalten Sie eine Ausgabe ähnlich der in Listing 10 gezeigten. Beachten Sie, dass die Anforderungsverarbeitungszeit 1,533 ms betrug, was logisch ist, da eine synchrone Anforderung von Temperaturwerten von drei verschiedenen Standorten 1,5 ergibt ms

Listing 10:

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 33 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 38 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 46 } } ], "processingTime": 1533 } 

Bisher läuft alles nach Plan. Es ist an der Zeit, eine reaktive Programmierung auf der Serverseite einzuführen, bei der Anrufe an jeden Standort parallel getätigt werden können, nachdem alle Standorte empfangen wurden. Dies kann den zuvor gezeigten synchronen Strom deutlich verbessern. Dies erfolgt in Listing 11, in dem die Definition der Version des reaktiven Prognosedienstes aufgeführt ist.

Listing 11:

 @Path("/reactiveForecast") public class ForecastReactiveResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public void getLocationsWithTemperature(@Suspended final AsyncResponse async) { long startTime = System.currentTimeMillis(); //   (stage)    CompletionStage<List<Location>> locationCS = locationTarget.request() .rx() .get(new GenericType<List<Location>>() {}); //      , //  ,   , //    CompletionStage final CompletionStage<List<Forecast>> forecastCS = locationCS.thenCompose(locations -> { //      //   ompletionStage List<CompletionStage<Forecast>> forecastList = //      //     locations.stream().map(location -> { //     //      //    final CompletionStage<Temperature> tempCS = temperatureTarget .resolveTemplate("city", location.getName()) .request() .rx() .get(Temperature.class); //   CompletableFuture,   //    //      return CompletableFuture.completedFuture( new Forecast(location)) .thenCombine(tempCS, Forecast::setTemperature); }).collect(Collectors.toList()); //    CompletableFuture, //     completable future //  return CompletableFuture.allOf( forecastList.toArray( new CompletableFuture[forecastList.size()])) .thenApply(v -> forecastList.stream() .map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join) .collect(Collectors.toList())); }); //   ServiceResponse, //       //    . //   future    // forecastCS,    //      CompletableFuture.completedFuture( new ServiceResponse()) .thenCombine(forecastCS, ServiceResponse::forecasts) .whenCompleteAsync((response, throwable) -> { response.setProcessingTime( System.currentTimeMillis() - startTime); async.resume(response); }); } } 

Die reaktive Implementierung mag auf den ersten Blick kompliziert erscheinen, aber nach sorgfältigerem Studium werden Sie feststellen, dass sie recht einfach ist. Bei der Implementierung von ForecastReactiveResource erstelle ich zunächst einen Clientaufruf an Ortungsdienste mithilfe der JAX-RS Reactive Client-API. Wie oben erwähnt, ist dies ein Add-On für Java EE 8, und es hilft, einen reaktiven Aufruf einfach mit der Methode rx() zu erstellen.

Jetzt erstelle ich eine neue Phase basierend auf dem Standort, um eine Liste von Prognosen zusammenzustellen. Sie werden als Prognoseliste in einer großen Fertigstellungsstufe namens forecastCS gespeichert. Letztendlich werde ich eine Serviceanrufantwort nur mit forecastCS .

Lassen Sie uns nun die Prognosen in Form einer Liste von Abschlussphasen erfassen, die in der Variablen forecastList . Um für jede Prognose eine Abschlussphase zu erstellen, übergebe ich die Daten nach Standort und erstelle dann die tempCS Variable mithilfe der JAX-RS Reactive Client-API, die den Temperaturdienst mit dem Namen der Stadt aufruft. Hier verwende ich die Methode resolveTemplate() , um den Client zu erstellen. Auf diese Weise kann ich den resolveTemplate() als Parameter an den Collector übergeben.

Als letzten Schritt beim Streaming rufe ich CompletableFuture.completedFuture() und übergebe die neue Forecast Instanz als Parameter. Ich kombiniere diese Zukunft mit der tempCS Stufe, damit ich einen Temperaturwert für die überwachten Standorte habe.

Die CompletableFuture.allOf() -Methode in Listing 11 konvertiert die Abschlussstufenliste in forecastCS . Durch Ausführen dieses Schritts wird eine große abschließbare zukünftige Instanz zurückgegeben, wenn alle bereitgestellten abschließbaren zukünftigen Objekte vollständig sind.

Die Serviceantwort ist eine Instanz der ServiceResponse Klasse. ServiceResponse erstelle ich eine abgeschlossene Zukunft und kombiniere dann die Abschlussphase der forecastCS ServiceResponse mit einer Liste von Prognosen und berechne die Antwortzeit des Dienstes.

Natürlich zwingt die reaktive Programmierung nur die Serverseite zur asynchronen Ausführung. Die Clientseite wird blockiert, bis der Server eine Antwort an den Anforderer zurücksendet. Um dieses Problem zu beheben, können Server Sent Events (SSEs) verwendet werden, um eine Antwort teilweise zu senden, sobald sie verfügbar ist, sodass die Temperaturwerte für jeden Standort einzeln an den Client übertragen werden. Die Ausgabe der ForecastReactiveResource ähnelt der in Listing 12 dargestellten. Wie in der Ausgabe gezeigt, beträgt die Verarbeitungszeit 515 ms. Dies ist eine ideale Laufzeit, um Temperaturwerte von einem Ort zu erhalten.

Listing 12:

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 49 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 32 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 45 } } ], "processingTime": 515 } 

Fazit

In den Beispielen in diesem Artikel habe ich zunächst eine synchrone Methode zum Abrufen von Vorhersagen mithilfe von Standort- und Temperaturdiensten gezeigt. Dann ging ich zum reaktiven Ansatz über, damit zwischen Serviceaufrufen eine asynchrone Verarbeitung durchgeführt wird. Wenn Sie die JAX-RS Reactive Client-API in Java EE 8 zusammen mit den in Java 8 verfügbaren Klassen CompletionStage und CompletableFuture verwenden, wird die Leistung der asynchronen Verarbeitung dank der reaktiven Programmierung unterbrochen.

Reaktive Programmierung ist mehr als nur die Implementierung eines asynchronen Modells aus einem synchronen. Es vereinfacht auch die Arbeit mit Konzepten wie der Verschachtelungsphase. Je häufiger es verwendet wird, desto einfacher ist es, komplexe Skripte in paralleler Programmierung zu verwalten.

DAS ENDE

Vielen Dank für Ihre Aufmerksamkeit. Wie immer warten wir auf Ihre Kommentare und Fragen.

Source: https://habr.com/ru/post/de424031/


All Articles