Pemrograman reaktif dengan JAX-RS

Halo semuanya!

Tahun lalu, kursus Java Enterprise Developer, telah berhasil diluncurkan dan kami memiliki materi terakhir tentang topik ini yang ingin kami bagikan dengan Anda, yang membahas penggunaan pendekatan asinkron dan pementasan untuk mengembangkan aplikasi responsif responsif.

Ayo pergi.

Pemrograman reaktif pada awalnya terdengar seperti nama paradigma yang muncul, tetapi pada kenyataannya, itu mengacu pada metode pemrograman di mana pendekatan berorientasi peristiwa digunakan untuk bekerja dengan aliran data asinkron. Berdasarkan data yang terus-menerus terkini, sistem reaktif meresponsnya dengan melakukan serangkaian acara.
Pemrograman reaktif mengikuti pola desain "Observer", yang dapat didefinisikan sebagai berikut: jika keadaan berubah dalam satu objek, maka semua objek lainnya akan diberi tahu dan diperbarui. Oleh karena itu, alih-alih polling acara untuk perubahan, acara didorong secara tidak sinkron sehingga pengamat dapat memprosesnya. Dalam contoh ini, pengamat adalah fungsi yang dieksekusi ketika acara dikirim. Dan aliran data yang disebutkan adalah yang sebenarnya dapat diamati.

Hampir semua bahasa dan kerangka kerja menggunakan pendekatan ini dalam ekosistem mereka, dan versi terbaru Jawa tidak terkecuali. Pada artikel ini, saya akan menjelaskan bagaimana pemrograman reaktif dapat diterapkan menggunakan versi terbaru JAX-RS di Java EE 8 dan fungsi Java 8.



Manifes Jet

Jet Manifesto mencantumkan empat aspek mendasar bahwa suatu aplikasi perlu lebih fleksibel, longgar, dan mudah untuk diukur, dan karenanya mampu reaktif. Ini menyatakan bahwa aplikasi harus responsif, fleksibel (dan karenanya dapat diskalakan), ulet dan didorong oleh pesan.

Tujuan yang mendasarinya adalah aplikasi yang benar-benar responsif. Misalkan ada aplikasi di mana satu utas besar terlibat dalam memproses permintaan pengguna, dan setelah menyelesaikan pekerjaan, utas ini mengirimkan tanggapan kembali ke pemohon asli. Ketika sebuah aplikasi menerima lebih banyak permintaan daripada yang bisa ditangani, utas ini menjadi hambatan, dan aplikasi kehilangan respons sebelumnya. Untuk menjaga daya tanggap, aplikasi harus scalable dan ulet. Berkelanjutan dapat dianggap sebagai aplikasi yang memiliki fungsi pemulihan otomatis. Dalam pengalaman sebagian besar pengembang, hanya arsitektur berbasis pesan yang memungkinkan aplikasi menjadi skalabel, tangguh, dan responsif.

Pemrograman reaktif diperkenalkan di Java 8 dan Java EE 8. Java memperkenalkan konsep seperti CompletionStage dan penerapan CompletableFuture , dan Java mulai menggunakan fitur ini dalam spesifikasi seperti API Klien Reaktif di JAX-RS.

JAX-RS 2.1 API Klien Reaktif

Mari kita lihat bagaimana pemrograman reaktif dapat digunakan dalam aplikasi Java EE 8. Untuk memahami prosesnya, Anda memerlukan pengetahuan dasar tentang Java EE API.

JAX-RS 2.1 memperkenalkan cara baru untuk membuat klien REST dengan dukungan untuk pemrograman reaktif. Implementasi invoker default yang ditawarkan di JAX-RS adalah sinkron, yang berarti bahwa klien yang dibuat akan mengirim panggilan pemblokiran ke titik akhir server. Contoh implementasi disajikan pada Listing 1.

Listing 1

 Response response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .get(); 

Dimulai dengan versi 2.0, JAX-RS menyediakan dukungan untuk membuat invoker asinkron pada API klien dengan panggilan sederhana ke metode async() , seperti yang ditunjukkan pada Listing 2.

Listing 2

 Future<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .async() .get(); 

Menggunakan penyerang asinkron pada klien mengembalikan contoh Future dari jenis javax.ws.rs.core.Response . Ini dapat menyebabkan polling respons, dengan panggilan ke future.get() , atau mendaftarkan panggilan balik yang akan dipanggil saat respons HTTP tersedia. Kedua implementasi cocok untuk pemrograman asinkron, tetapi hal-hal biasanya menjadi rumit jika Anda ingin mengelompokkan panggilan balik atau menambahkan kasus bersyarat ke minimum eksekusi asinkron ini.

JAX-RS 2.1 menyediakan cara reaktif untuk mengatasi masalah ini dengan API Klien Reaktif JAX-RS baru untuk membangun klien. Ini sesederhana memanggil metode rx() selama membangun klien. Dalam Listing 3, metode rx() mengembalikan invoker reaktif yang ada selama eksekusi klien, dan klien mengembalikan respons tipe CompletionStage.rx() , yang memungkinkan transisi dari invoker sinkron ke invoker asinkron dengan invoker sederhana dengan panggilan sederhana.

Listing 3

 CompletionStage<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .rx() .get(); 

CompletionStage<> adalah antarmuka baru yang diperkenalkan di Java 8. Ini mewakili komputasi, yang dapat menjadi langkah dalam perhitungan yang lebih besar, seperti namanya. Ini adalah satu-satunya perwakilan reaktivitas Java 8 yang menekan JAX-RS.
Setelah menerima contoh respons, saya dapat memanggil AcceptAsync() , di mana saya dapat memberikan sepotong kode yang akan dieksekusi secara tidak sinkron ketika respons tersedia, seperti yang ditunjukkan pada Listing 4.

Listing 4

 response.thenAcceptAsync(res -> { Temperature t = res.readEntity(Temperature.class); //do stuff with t }); 

Menambahkan reaktivitas ke titik akhir REST

Pendekatan reaktif tidak terbatas pada sisi klien di JAX-RS; itu juga dapat digunakan di sisi server. Sebagai contoh, pertama saya akan membuat skrip sederhana di mana saya dapat meminta daftar lokasi satu tujuan. Untuk setiap posisi, saya akan melakukan panggilan terpisah dengan data lokasi ke titik lain untuk mendapatkan nilai suhu. Interaksi tujuan akan seperti yang ditunjukkan pada Gambar 1.


Gambar 1. Interaksi antara titik tujuan

Pertama, saya cukup mendefinisikan model domain, dan kemudian layanan untuk masing-masing model. Listing 5 menunjukkan bagaimana kelas Forecast didefinisikan, yang membungkus kelas Location dan Temperature .

Listing 5

 public class Temperature { private Double temperature; private String scale; // getters & setters } public class Location { String name; public Location() {} public Location(String name) { this.name = name; } // getters & setters } public class Forecast { private Location location; private Temperature temperature; public Forecast(Location location) { this.location = location; } public Forecast setTemperature( final Temperature temperature) { this.temperature = temperature; return this; } // getters } 

Untuk membungkus daftar perkiraan, kelas ServiceResponse diimplementasikan pada Listing 6.

Listing 6

 public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; } // getters } 

LocationResource ditunjukkan pada Listing 7 mendefinisikan tiga lokasi sampel yang dikembalikan dengan path /location .

Listing 7

 @Path("/location") public class LocationResource { @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocations() { List<Location> locations = new ArrayList<>(); locations.add(new Location("London")); locations.add(new Location("Istanbul")); locations.add(new Location("Prague")); return Response.ok(new GenericEntity<List<Location>>(locations){}).build(); } } 

TemperatureResource ditunjukkan pada Listing 8 mengembalikan nilai suhu yang dihasilkan secara acak antara 30 dan 50 untuk lokasi tertentu. Penundaan 500 ms telah ditambahkan ke implementasi untuk mensimulasikan pembacaan sensor.

Listing 8

 @Path("/temperature") public class TemperatureResource { @GET @Path("/{city}") @Produces(MediaType.APPLICATION_JSON) public Response getAverageTemperature(@PathParam("city") String cityName) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { ignored.printStackTrace(); } return Response.ok(temperature).build(); } } 

Pertama, saya akan menunjukkan implementasi ForecastResource sinkron (lihat Listing 9), yang mengembalikan semua lokasi. Kemudian, untuk setiap posisi, ia memanggil layanan suhu untuk mendapatkan nilai dalam derajat Celcius.

Listing 9

 @Path("/forecast") public class ForecastResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocationsWithTemperature() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = locationTarget .request() .get(new GenericType<List<Location>>(){}); locations.forEach(location -> { Temperature temperature = temperatureTarget .resolveTemplate("city", location.getName()) .request() .get(Temperature.class); response.getForecasts().add( new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return Response.ok(response).build(); } } 

Ketika tujuan perkiraan diminta sebagai /forecast , Anda akan mendapatkan output yang mirip dengan yang ditunjukkan pada Listing 10. Perhatikan bahwa waktu pemrosesan permintaan memakan waktu 1,533 ms, yang logis, karena permintaan sinkron untuk nilai suhu dari tiga lokasi berbeda menambahkan hingga 1,5 ms

Listing 10

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 33 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 38 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 46 } } ], "processingTime": 1533 } 

Sejauh ini, semuanya berjalan sesuai rencana. Sudah saatnya untuk memperkenalkan pemrograman reaktif di sisi server, di mana panggilan ke setiap lokasi dapat dilakukan secara paralel setelah menerima semua lokasi. Ini jelas dapat meningkatkan aliran sinkron yang ditunjukkan sebelumnya. Ini dilakukan pada Listing 11, yang menunjukkan definisi versi layanan perkiraan reaktif.

Listing 11

 @Path("/reactiveForecast") public class ForecastReactiveResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public void getLocationsWithTemperature(@Suspended final AsyncResponse async) { long startTime = System.currentTimeMillis(); //   (stage)    CompletionStage<List<Location>> locationCS = locationTarget.request() .rx() .get(new GenericType<List<Location>>() {}); //      , //  ,   , //    CompletionStage final CompletionStage<List<Forecast>> forecastCS = locationCS.thenCompose(locations -> { //      //   ompletionStage List<CompletionStage<Forecast>> forecastList = //      //     locations.stream().map(location -> { //     //      //    final CompletionStage<Temperature> tempCS = temperatureTarget .resolveTemplate("city", location.getName()) .request() .rx() .get(Temperature.class); //   CompletableFuture,   //    //      return CompletableFuture.completedFuture( new Forecast(location)) .thenCombine(tempCS, Forecast::setTemperature); }).collect(Collectors.toList()); //    CompletableFuture, //     completable future //  return CompletableFuture.allOf( forecastList.toArray( new CompletableFuture[forecastList.size()])) .thenApply(v -> forecastList.stream() .map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join) .collect(Collectors.toList())); }); //   ServiceResponse, //       //    . //   future    // forecastCS,    //      CompletableFuture.completedFuture( new ServiceResponse()) .thenCombine(forecastCS, ServiceResponse::forecasts) .whenCompleteAsync((response, throwable) -> { response.setProcessingTime( System.currentTimeMillis() - startTime); async.resume(response); }); } } 

Implementasi reaktif mungkin tampak rumit pada pandangan pertama, tetapi setelah penelitian yang lebih hati-hati, Anda akan melihat bahwa itu sangat sederhana. Dalam penerapan ForecastReactiveResource saya pertama kali membuat panggilan klien ke layanan lokasi menggunakan API Klien Reaktif JAX-RS. Seperti yang saya sebutkan di atas, ini adalah add-on untuk Java EE 8, dan ini membantu untuk membuat panggilan reaktif hanya menggunakan metode rx() .

Sekarang saya membuat fase baru berdasarkan lokasi untuk mengumpulkan daftar perkiraan. Mereka akan disimpan sebagai daftar perkiraan dalam satu tahap penyelesaian besar yang disebut forecastCS . Pada akhirnya, saya akan membuat respons panggilan layanan hanya menggunakan forecastCS .

Dan sekarang, mari kita kumpulkan prakiraan dalam bentuk daftar tahap penyelesaian yang ditentukan dalam daftar forecastList variabel. Untuk membuat tahap penyelesaian untuk setiap perkiraan, saya meneruskan data berdasarkan lokasi, dan kemudian membuat variabel tempCS , sekali lagi menggunakan JAX-RS Reactive Client API, yang memanggil layanan suhu dengan nama kota. Di sini, saya menggunakan metode resolveTemplate() untuk membangun klien, dan ini memungkinkan saya untuk meneruskan nama kota ke kolektor sebagai parameter.

Sebagai langkah terakhir dalam streaming, saya melakukan panggilan ke CompletableFuture.completedFuture() , dengan mengirimkan contoh Forecast baru sebagai parameter. Saya menggabungkan masa depan ini dengan tahap tempCS sehingga saya memiliki nilai suhu untuk lokasi yang dipantau.

Metode CompletableFuture.allOf() di Listing 11 mengubah daftar tahap penyelesaian menjadi forecastCS . Melakukan langkah ini mengembalikan instance masa depan yang dapat diselesaikan yang besar ketika semua objek masa depan yang dapat diselesaikan yang lengkap selesai.

Respons layanan adalah turunan dari kelas ServiceResponse , jadi saya membuat masa depan yang selesai, dan kemudian menggabungkan tahap penyelesaian forecastCS dengan daftar perkiraan dan menghitung waktu respons layanan.

Tentu saja, pemrograman reaktif hanya memaksa sisi server untuk berjalan secara tidak sinkron; sisi klien akan diblokir sampai server mengirim respons kembali ke pemohon. Untuk mengatasi masalah ini, Server Terkirim Acara (SSE) dapat digunakan untuk mengirim sebagian tanggapan segera setelah itu tersedia sehingga nilai suhu untuk setiap lokasi ditransmisikan ke klien satu per satu. Output dari ForecastReactiveResource akan serupa dengan yang disajikan pada Listing 12. Seperti yang ditunjukkan dalam output, waktu pemrosesan adalah 515 ms, yang merupakan runtime yang ideal untuk mendapatkan nilai suhu dari satu lokasi.

Listing 12

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 49 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 32 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 45 } } ], "processingTime": 515 } 

Kesimpulan

Dalam contoh di artikel ini, saya pertama kali menunjukkan cara sinkron untuk mendapatkan perkiraan menggunakan layanan lokasi dan suhu. Kemudian, saya pergi ke pendekatan reaktif sehingga pemrosesan asinkron dilakukan antara panggilan layanan. Ketika Anda menggunakan API Klien Reaktif JAX-RS di Java EE 8 bersama dengan kelas CompletionStage dan CompletableFuture tersedia di Java 8, kekuatan pemrosesan asinkron akan terputus berkat pemrograman reaktif.

Pemrograman reaktif lebih dari sekadar menerapkan model asinkron dari model sinkron; itu juga menyederhanakan bekerja dengan konsep-konsep seperti tahap bersarang. Semakin banyak digunakan, semakin mudah untuk mengelola skrip kompleks dalam pemrograman paralel.

AKHIR

Terima kasih atas perhatian anda Seperti biasa, kami menunggu komentar dan pertanyaan Anda.

Source: https://habr.com/ru/post/id424031/


All Articles