Halo, Habr!
Minggu ini kami mengharapkan
buku Spring 5 baru dari percetakan:
Di antara fitur menarik dari Spring 5, pemrograman reaktif layak disebutkan secara khusus, implementasi yang dalam kerangka ini dijelaskan secara singkat oleh artikel yang diusulkan oleh Matt Raible. Dalam buku tersebut, pola reaktif dibahas dalam bab 11.
Matt ditulis bersama oleh Josh Long, penulis buku hebat lain tentang Java and Spring, "
Java in the Cloud, " dirilis musim panas lalu.
Pemrograman reaktif adalah cara Anda untuk membangun sistem yang tahan terhadap beban tinggi. Memproses lalu lintas besar tidak lagi menjadi masalah, karena server non-pemblokiran dan proses klien tidak harus menunggu tanggapan. Klien tidak dapat secara langsung mengamati bagaimana program berjalan di server dan menyinkronkannya. Ketika API mengalami kesulitan untuk memproses permintaan, itu harus tetap memberikan tanggapan yang masuk akal. Seharusnya tidak menolak dan membuang pesan dengan cara yang tidak terkontrol. Itu harus menginformasikan komponen yang lebih tinggi bahwa ia bekerja di bawah beban sehingga mereka dapat membebaskan sebagian dari beban ini. Teknik ini disebut backpressure, aspek penting dari pemrograman reaktif.
Kami turut menulis artikel ini dengan
Josh Long . Josh adalah seorang juara Java, Spring Developer Advocate, dan umumnya seorang pria global yang bekerja di Pivotal. Saya telah bekerja dengan Spring untuk waktu yang lama, tetapi Josh yang menunjukkan kepada saya Boot Musim Semi, pada konferensi Devoxx di Belgia. Sejak itu, kami telah menjadi teman yang kuat, kami menyukai Java dan menulis aplikasi keren.
Pemrograman reaktif atau I / O, I / O, kami mulai bekerja ...Pemrograman reaktif adalah pendekatan untuk membuat perangkat lunak yang secara aktif menggunakan I / O yang tidak sinkron. Asynchronous I / O adalah ide kecil, penuh dengan perubahan besar dalam pemrograman. Idenya sendiri sederhana: untuk memperbaiki situasi dengan alokasi sumber daya yang tidak efisien, membebaskan sumber daya yang akan menganggur tanpa campur tangan kita, menunggu penyelesaian I / O. Input / output asinkron membalikkan pendekatan biasa untuk pemrosesan I / O: klien dibebaskan dan dapat melakukan tugas-tugas lain, menunggu pemberitahuan baru.
Pertimbangkan apa yang umum antara input / output sinkron dan asinkron, dan apa perbedaan di antara keduanya.
Kami akan menulis program sederhana yang membaca data dari sumber (khususnya, kita berbicara tentang tautan
java.io.File
). Mari kita mulai dengan implementasi yang menggunakan
java.io.InputStream
:
Contoh 1. Secara sinkron membaca data dari suatu file package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) {
- Kami menyediakan file untuk dibaca dengan
java.io.File
biasa - Tarik hasil dari sumber satu baris sekaligus ...
- Saya menulis kode ini untuk mengambil
Consumer<BytesPayloadgt;
dipanggil saat data baru tiba
Cukup sederhana, apa yang kamu katakan? Jalankan kode ini dan Anda akan melihat di log output (di sebelah kiri setiap baris), menunjukkan bahwa semua tindakan terjadi dalam satu utas.
Di sini kita mengekstrak byte dari data kita yang diambil dalam sumber (dalam hal ini, kita berbicara tentang subkelas
java.io.FileInputStream
diwarisi dari
java.io.InputStream
). Apa yang salah dengan contoh ini? Dalam hal ini, kami menggunakan InputStream yang menunjuk ke data yang terletak di sistem file kami. Jika file ada di sana dan hard drive berfungsi, maka kode ini akan berfungsi seperti yang diharapkan.
Tapi, apa yang terjadi jika kita membaca data bukan dari
File
, tetapi dari soket jaringan, dan menggunakan implementasi
InputStream
? Tidak ada yang perlu dikhawatirkan! Tentu saja, tidak akan ada yang perlu dikhawatirkan jika kecepatan jaringan sangat tinggi. Dan jika saluran jaringan antara ini dan simpul lainnya tidak pernah gagal. Jika kondisi ini terpenuhi, maka kodenya akan bekerja dengan sempurna.
Tetapi apa yang terjadi jika jaringan melambat atau mundur? Dalam hal ini, maksud saya bahwa kita akan meningkatkan periode sampai operasi
in.read(β¦)
. Bahkan, dia mungkin tidak kembali sama sekali! Ini adalah masalah jika kita mencoba melakukan hal lain dengan aliran dari mana kita membaca data. Tentu saja, Anda selalu dapat membuat aliran lain dan membaca data melalui itu. Ini dapat dilakukan hingga titik tertentu, tetapi, pada akhirnya, kita akan mencapai batas di mana hanya menambahkan utas untuk penskalaan lebih lanjut tidak akan lagi cukup. Kami tidak akan memiliki persaingan sejati di luar jumlah core yang ada di mesin kami. Jalan buntu! Dalam hal ini, kita dapat meningkatkan pemrosesan input / output (membaca dimaksudkan di sini) hanya karena arus tambahan, tetapi di sini kita cepat atau lambat akan mencapai batas.
Dalam contoh ini, karya utama adalah membaca - hampir tidak ada yang terjadi di bidang lain. Kami bergantung pada I / O. Pertimbangkan bagaimana solusi asinkron membantu kita mengatasi monopoli sebagian dari aliran kita.
Contoh 2. Membaca data dari file secara tidak sinkron package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath();
- Kali ini kita mengadaptasi
java.io.File
, membuat Java NIO java.nio.file.Path
darinya - Saat membuat
Channel
, kami, khususnya, menentukan layanan java.util.concurrent.ExecutorService
, yang akan digunakan untuk memanggil handler CompletionHandler
ketika data yang diperlukan muncul - Kami mulai membaca dengan mengirimkan tautan ke
CompletionHandler<Integer, ByteBuffer> (this)
- Dalam panggilan balik, baca byte dari
ByteBuffer
ke dalam kapasitas byte[]
- Sama seperti pada contoh
Synchronous
, data byte[]
diteruskan ke konsumen.
Kami akan segera melakukan reservasi: kode ini ternyata jauh lebih sulit! Ada begitu banyak hal yang terjadi di sini sehingga kepala Anda langsung berputar, namun, izinkan saya tunjukkan ... kode ini membaca data dari
Java NIO Channel
, dan kemudian memproses data ini dalam utas terpisah yang bertanggung jawab untuk panggilan balik. Dengan demikian, aliran di mana bacaan dimulai tidak dimonopoli. Kami kembali hampir seketika setelah memanggil
.read(..)
, dan ketika, akhirnya, kami memiliki data yang kami miliki, panggilan balik dilakukan - sudah ada di utas lainnya. Jika ada penundaan di antara panggilan ke
.read()
Anda dapat beralih ke masalah lain dengan mengeksekusinya di utas kami. Durasi operasi baca asinkron, dari byte pertama hingga terakhir, paling tidak tidak lebih dari operasi baca sinkron. Biasanya, operasi asinkron tidak konsisten lebih lama. Namun, dengan kesulitan tambahan seperti itu, kita dapat lebih efektif menangani arus kita. Lakukan lebih banyak pekerjaan, multiplex I / O di kolam dengan jumlah utas yang terbatas.
Saya bekerja untuk perusahaan cloud computing. Kami ingin Anda mendapatkan contoh aplikasi baru untuk menyelesaikan masalah dengan penskalaan horizontal! Tentu saja, di sini saya sedikit tidak jujur. Asynchronous I / O sedikit mempersulit hal, tapi saya harap contoh ini menggambarkan betapa kode reaktif sangat berguna: memungkinkan Anda untuk memproses lebih banyak permintaan dan melakukan lebih banyak pekerjaan pada perangkat keras yang ada jika kinerjanya sangat tergantung pada I / O. Jika kinerjanya tergantung pada penggunaan prosesor (katakanlah, kita berbicara tentang operasi pada angka Fibonacci, penambangan bitcoin atau kriptografi), maka pemrograman reaktif tidak akan memberi kita apa pun.
Saat ini, kebanyakan dari kita tidak menggunakan implementasi
Channel
atau
InputStream
dalam pekerjaan kita sehari-hari! Kita harus memikirkan masalah pada level abstraksi level yang lebih tinggi. Ini tentang hal-hal seperti array, atau lebih tepatnya, hirarki
java.util.Collection
. Koleksi
java.util.Collection
ditampilkan dengan sangat baik pada InputStream: kedua entitas menganggap bahwa Anda dapat beroperasi pada semua data sekaligus, dan hampir secara instan. Diharapkan bahwa Anda akan dapat selesai membaca dari sebagian besar
InputStreams
lebih awal, daripada nanti. Jenis koleksi menjadi sedikit tidak nyaman ketika pindah ke jumlah data yang lebih besar. Bagaimana jika Anda berurusan dengan sesuatu yang berpotensi tak terbatas (tak terbatas) - misalnya, soket web atau peristiwa server? Apa yang harus dilakukan jika ada penundaan di antara rekaman?
Kami membutuhkan cara yang lebih baik untuk menggambarkan data semacam ini. Kita berbicara tentang peristiwa yang tidak sinkron, seperti yang akan terjadi pada akhirnya. Tampaknya
Future<T>
atau
CompletableFuture<T>
sangat cocok untuk tujuan ini, tetapi mereka menggambarkan hanya satu hal yang terjadi pada akhirnya. Bahkan, Java tidak menyediakan metafora yang cocok untuk menggambarkan data semacam ini. Baik tipe
Iterator
dan
Stream
dari Java 8 mungkin tidak terkait, namun, keduanya berorientasi untuk menarik; Anda sendiri yang meminta entri berikutnya, bukan tipe yang harus mengirim panggilan balik ke kode Anda. Diasumsikan bahwa jika pemrosesan berbasis-push didukung dalam kasus ini, yang akan memungkinkan mencapai lebih banyak pada level thread, maka API juga akan menyediakan kontrol threading dan penjadwalan. Implementasi
Iterator
tidak mengatakan apa pun tentang threading, dan semua thread Java 8 berbagi fork-join pool yang sama.
Jika
Iterator
dan
Stream
benar-benar mendukung pemrosesan dorong, maka kita akan menghadapi masalah lain yang benar-benar meningkat secara tepat dalam konteks I / O: kita akan memerlukan semacam mekanisme penetrasi belakang! Karena konsumen data diproses secara tidak sinkron, kami tidak tahu kapan data akan berada dalam pipa dan dalam jumlah berapa. Kami tidak tahu berapa banyak data yang perlu diproses dalam panggilan balik berikutnya: satu byte atau satu terabyte!
Menarik data dari
InputStream
, Anda membaca informasi sebanyak yang siap Anda proses, dan tidak lebih. Pada contoh sebelumnya, kita membaca data ke buffer
byte[]
dengan panjang tetap dan dikenal. Dalam konteks asinkron, kita perlu beberapa cara untuk memberi tahu penyedia data berapa banyak data yang ingin kita proses.
Ya, tuan. Pasti ada sesuatu yang hilang di sini.
Cari metafora yang hilangDalam hal ini, kami mencari metafora yang dengan indah mencerminkan esensi I / O asinkron, mendukung mekanisme untuk transfer data terbalik dan memungkinkan kami untuk mengontrol aliran eksekusi dalam sistem terdistribusi. Dalam pemrograman reaktif, kemampuan klien untuk memberi sinyal muatan apa yang dapat ditangani disebut "aliran balik."
Sekarang ada sejumlah proyek bagus - Vert.x, Akka Streams dan RxJava - mendukung pemrograman reaktif. Tim Spring juga menjalankan proyek yang disebut
Reactor . Di antara berbagai standar ini terdapat bidang umum yang cukup luas, secara de facto dialokasikan untuk standar
inisiatif Reactive Streams . Inisiatif Aliran Reaktif mendefinisikan empat jenis:
Antarmuka
Publisher<T>
; menghasilkan nilai-nilai yang pada akhirnya mungkin tiba. Antarmuka
Publisher<T>
; menghasilkan nilai tipe
T
untuk
Subscriber<T>
.
Contoh 3. Aliran reaktif: Antarmuka Publisher<T>
.
package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); }
Tipe Pelanggan berlangganan
Publisher<T>
, menerima pemberitahuan nilai baru tipe
T
melalui
onNext(T)
. Jika ada kesalahan,
onError(Throwable)
. Ketika pemrosesan selesai secara normal, metode
onComplete
dari pelanggan dipanggil.
Contoh 4. Jet stream: Antarmuka Subscriber<T>
. package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Saat
Subscriber
pertama kali terhubung ke
Publisher
, ia akan
Subscription
dalam metode
Subscriber#onSubscribe
. Berlangganan
Subscription
mungkin merupakan bagian terpenting dari keseluruhan spesifikasi; dialah yang menyediakan aliran balik. Pelanggan Pelanggan menggunakan metode
Subscription#request
untuk meminta data tambahan atau metode
Subscription#cancel
untuk menghentikan pemrosesan.
Contoh 5. Streaming reaktif: Subscription<T>
antarmuka Subscription<T>
.
package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); }
Spesifikasi aliran reaktif menyediakan jenis lain yang bermanfaat, meskipun jelas
Processor<A,B>
hanyalah sebuah antarmuka yang mewarisi
Subscriber<A>
dan
Publisher<B>
.
Contoh 6. Jet stream: Antarmuka Processor<T>
.
package org.reactivestreams; public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Spesifikasi tidak diposisikan sebagai resep untuk implementasi, pada kenyataannya, tujuannya adalah untuk menentukan jenis untuk mendukung interoperabilitas. Manfaat yang jelas dari tipe yang terkait dengan aliran reaktif adalah bahwa mereka tetap menemukan tempat di rilis Java 9, lebih dari itu, secara semantik mereka adalah βsatu ke satuβ sesuai dengan antarmuka dari kelas
java.util.concurrent.Flow
, misalnya:
java.util.concurrent.Flow.Publisher
.
Temui ReactorJenis aliran reaktif saja tidak cukup; implementasi tingkat tinggi diperlukan untuk mendukung operasi seperti penyaringan dan transformasi. Dengan demikian, proyek Reactor nyaman; itu dibangun di atas spesifikasi Streaming Reaktif dan menyediakan dua spesialisasi
Publisher<T>
.
Pertama,
Flux<T>
adalah
Publisher
yang menghasilkan nilai nol atau lebih. Yang kedua,
Mono<T>
, adalah
Publisher<T>
, menghasilkan nol atau satu nilai. Keduanya mempublikasikan nilai dan dapat menanganinya, namun, kemampuan mereka jauh lebih luas daripada spesifikasi Reactive Streams. Keduanya menyediakan operator yang memungkinkan Anda memproses aliran nilai. Jenis reaktor menyusun dengan baik - output dari salah satu dari mereka dapat berfungsi sebagai input untuk yang lain, dan jika suatu jenis perlu bekerja dengan aliran data lain, mereka bergantung pada instance
Publisher<T>
.
Baik
Mono<T>
dan
Flux<T>
menerapkan
Publisher<T>
; kami menyarankan agar metode Anda menerima instance
Publisher<T>
tetapi mengembalikan
Flux<T>
atau
Mono<T>
; ini akan membantu klien membedakan jenis data apa yang ia terima.
Misalkan Anda diberi
Publisher<T>
dan diminta untuk menampilkan antarmuka pengguna untuk
Publisher<T>
ini
Publisher<T>
. Haruskah saya menampilkan halaman dengan detail untuk satu catatan, karena Anda bisa mendapatkan
CompletableFuture<T>
? Atau tampilkan halaman ikhtisar dengan daftar atau kisi di mana semua entri ditampilkan halaman demi halaman? Sulit dikatakan.
Pada gilirannya,
Flux<T>
dan
Mono<T>
sangat spesifik. Anda tahu bahwa Anda perlu menampilkan halaman ulasan jika
Flux<T>
diterima, dan halaman dengan detail untuk satu (atau bukan satu) catatan ketika Anda menerima
Mono<T>
.
Reactor adalah proyek sumber terbuka yang diluncurkan oleh Pivotal; Sekarang dia menjadi sangat populer. Facebook menggunakannya di
mesin jetnya
untuk memanggil prosedur jarak jauh , dan juga menggunakannya di
Rsocket , dipimpin oleh pencipta RxJava Ben Christensen. Salesforce menggunakannya dalam
implementasi gRPC reaktifnya . Reactor mengimplementasikan tipe Reactive Streams, sehingga dapat berinteraksi dengan teknologi lain yang mendukung tipe ini, misalnya, dengan
RxJava 2 dari Netflix,
Akka Streams dari Lightbend dan dengan proyek
Vert.x dari Eclipse Foundation. David Cairnock, direktur RxJava 2, juga secara aktif berkolaborasi dengan Pivotal untuk mengembangkan Reactor, membuat proyek ini menjadi lebih baik. Plus, tentu saja, hadir dalam satu atau lain bentuk dalam Kerangka Kerja Musim Semi, dimulai dengan Kerangka Kerja 4.0.
Pemrograman reaktif dengan Spring WebFluxUntuk semua manfaatnya, Reactor hanyalah dasar. Aplikasi kita harus berkomunikasi dengan sumber data. Harus mendukung otentikasi dan otorisasi. Spring menyediakan semua ini. Jika Reactor memberi kita metafora yang hilang, maka Spring membantu kita semua berbicara dengan bahasa yang sama.
Spring Framework 5.0 dirilis pada September 2017. Ini didasarkan pada spesifikasi Reactor dan Reactive Streams. Ini memiliki runtime reaktif baru dan model komponen yang disebut
Spring WebFlux .
Spring WebFlux independen dari Servlet API dan tidak mengharuskan mereka untuk bekerja. Muncul dengan adaptor yang memungkinkan Anda menggunakannya di atas mesin Servlet jika perlu, tetapi ini tidak perlu. Ini juga menyediakan runtime berbasis Netty yang sama sekali baru yang disebut Spring WebFlux. Spring Framework 5, bekerja dengan Java 8 dan Java EE 7 dan yang lebih baru, sekarang berfungsi sebagai fondasi bagi sebagian besar ekosistem Spring, termasuk Spring Data Kay, Spring Security 5, Spring Boot 2, dan Spring Cloud Finchley.