Pemrograman Asinkron dengan Contoh: Merekonstruksi java.util.concurrent.CompletableFuture Methods

Mengapa rekonstruksi diperlukan jika kode sumber untuk kelas ini terbuka?


Jika hanya karena di bawah kap ada kode yang sangat optimal, sulit dibaca, studi yang memberikan sedikit pedagogis.


Oleh karena itu, kami akan membuat ulang semantik operasi sesuai dengan spesifikasinya, dan menulis kode yang secara fungsional setara, dapat dimengerti, dan dapat dibaca, meskipun itu mungkin bukan yang paling ekonomis dalam hal konsumsi memori dan waktu prosesor.


Mari kita mulai dengan metode yang relatif sederhana:


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture 

Baca spesifikasi dengan cermat:


 Returns a new CompletableFuture 

Yaitu, objek bertipe CompletableFuture atau subkelasnya dibuat dan dikembalikan sebagai hasilnya.


 that is asynchronously completed by a task running in the given executor` 

Selain itu, tugas dijalankan yang dieksekusi pada Executor e.
Seperti yang kita ketahui, Executor hanya menerima objek Runnable .
Runnable adalah sebuah antarmuka, dan objek pertama mungkin mengimplementasikannya - jadi kami akan menggabungkan dua fungsi dalam satu objek.


  completed ... with the value obtained by calling the given Supplier. 

Runnable ini harus memanggil Supplier diberikan dan dengan nilai yang diterima melengkapi CompletableFuture dibuat.


Supplier adalah fungsi tanpa parameter, jadi menyandikan semuanya sangat sederhana:


  class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable { Supplier<U> supplier; public CompletableFutureForSupplyAsync(Supplier<U> supplier) { this.supplier = supplier; } public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; } 

Contoh berikut ini sedikit lebih rumit:


 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage<T> Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage 

Returns a new CompletionStage that... is executed using the supplied Executor


Di sini kami ditawarkan untuk mengatur objek yang dibuat untuk diterbitkan dalam bentuk Runnable .


... with this stage's result as the argument to the supplied function.


Tapi ini sudah lebih menarik. Fungsi yang diberikan kepada kami memiliki parameter, dan nilai parameter ini adalah nilai yang melengkapi CompletionStage saat ini. Pada saat memanggil thenApplyAsync nilai ini mungkin tidak diketahui, jadi kami tidak dapat segera menjalankan tugas pada Executor . Sebaliknya, kita harus bernegosiasi dengan CompletionStage saat ini,
sehingga pada saat penyelesaiannya, ia mentransfer nilainya ke tugas. Di antara banyak metode CompletionStage ada satu yang persis cocok untuk tujuan ini, whenComplete :


 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. 

Artinya, di objek tugas yang baru dibuat, cukup untuk mengimplementasikan antarmuka BiConsumer untuk menerima argumen:


  class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override // implementation of BiConsumer interface public void accept(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; } else { this.arg = argument; } executor.execute(this); } @Override public void run() { if (throwable == null) { try { U result = fn.apply(arg); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } else { super.completeExceptionally(throwable); } } } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor ) { CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); this.whenComplete(task); return task; } } 

Contoh ini sangat penting untuk memahami sifat pemrograman asinkron, jadi sekali lagi kami daftar langkah-langkah utamanya:


1) prosedur asinkron dibuat:


  CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); 

2) belum siap untuk dieksekusi, jadi kami meminta penyedia argumen yang hilang untuk meneruskan argumen ini kepada kami di masa mendatang, dengan memanggil metode yang kami kirimkan:


  this.whenComplete(task); 

3) dalam metode ini kami tidak hanya menyimpan argumen yang diterima, tetapi juga menjalankan tugas untuk dieksekusi (lihat metode accept ()).


4) pelaksanaan tugas dikurangi menjadi pemenuhan fungsi yang diberikan kepada kami dan menyimpan hasilnya.
Hasil ini dapat seperti yang diminta oleh prosedur lain menggunakan metode whenComplete () yang diterapkan pada objek yang baru dibangun, sehingga kita dapat membangun rantai prosedur asinkron dengan panjang sewenang-wenang. Tetapi rantai ini akan dipenuhi secara berurutan, tanpa paralelisme.


Tetapi bagaimana cara menggambarkan diagram perhitungan yang lebih kompleks yang berisi cabang paralel?
Untuk melakukan ini, gunakan metode thenCombineAsync .


Jika dalam contoh sebelumnya kita memulai prosedur asinkron dengan satu argumen, maka dalam ini - dengan dua.


Dalam hal ini, perhitungan kedua argumen dapat terjadi secara paralel.


 ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function. 

Di sini, semuanya sama seperti pada contoh sebelumnya dengan thenApplyAsync , tetapi parameter fungsi sudah memiliki dua argumen, dan CompletionStage<? extends U> other Parameter ditambahkan CompletionStage<? extends U> other , yang merupakan penyedia asinkron argumen kedua.


Bagaimana kita memastikan bahwa argumen kedua diproses?


Yah, pertama, alih-alih satu variabel T arg jelaskan dua: T arg1; U arg2; T arg1; U arg2; , alih-alih satu metode public void accept(T argument, Throwable throwable) menjelaskan dua - accept1 dan accept2 ,
masing-masing bekerja dengan argumennya sendiri.


Pada saat yang sama, objek kami yang sedang dibangun tidak lagi mengimplementasikan BiConsumer<T,Throwable> dan kami tidak dapat lagi menulis kalimat kunci untuk menghubungkan node dari grafik perhitungan asinkron.


  this.whenComplete(task); 

Untungnya, objek antarmuka fungsional dapat diwakili oleh referensi ke suatu metode, tanpa menyimpulkannya dalam kelas yang terpisah:


  this.whenComplete(task::accept1); other.whenComplete(task::accept2); 

Artinya, saat this objek this memasok argumen pertama, dan objek other yang kedua.


Berikut ini hanya kode metode yang harus diubah sehingga mereka tidak memulai tugas segera setelah kedatangan argumen mereka, tetapi juga memeriksa tanda terima yang kedua:


  public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } } 

Metode accept2 dijelaskan dengan cara yang sama.


Perhatikan bahwa:


  • metode disinkronkan (kami bekerja dengan data umum)
  • dalam hal transmisi kesalahan, tunggu argumen kedua tidak diperlukan.
  • memeriksa bahwa argumen datang dengan perbandingan dengan null bukan cara terbaik, mungkin Anda perlu menambahkan variabel Boolean untuk setiap argumen.

Dengan cara ini, Anda dapat membuat prosedur asinkron dari sejumlah besar argumen daripada dua, tetapi pemikiran itu segera muncul - dapatkah Anda masih membuat kelas terpisah untuk parameter, sehingga tidak menulis metode Anda sendiri untuk menerima setiap parameter, tetapi melakukannya dengan secara dinamis membuat parameter?


  Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2); 

Ya, Anda dapat membuat kelas seperti itu, tetapi lebih banyak di waktu berikutnya.


Ringkasan singkat dari yang disebutkan di atas:


  • program asinkron adalah jaringan prosedur asinkron yang saling berhubungan,
    seperti halnya program multi-utas adalah jaringan utas eksekusi yang saling terhubung (utas).

Tetapi sarana arus komunikasi dan prosedur asinkron secara fundamental berbeda.


Streaming dihubungkan menggunakan semaphores, memblokir antrian dan objek serupa lainnya,
yang memblokir aliran penerima jika informasi belum tiba, tetapi aliran sudah mencoba untuk mengekstraknya menggunakan operasi berbasis tarik.


Prosedur asinkron - penerima tidak perlu melakukan eksekusi sampai semua informasi yang mereka butuhkan siap.
Mereka secara pasif menunggu sampai penyedia informasi sendiri melewatinya melalui operasi berbasis push.
Karena hal ini, mereka tidak menghabiskan memori pada stack sambil menunggu, dan, oleh karena itu, mengambil memori jauh lebih sedikit daripada untaian eksekusi.


  • membangun jaringan prosedur asinkron direduksi menjadi membuat objek dan menghubungkannya bersama, lebih tepatnya, untuk menghubungkan sub-objek mereka - parameter: alamat parameter input penerima ditransmisikan ke parameter output penyedia informasi.

Himpunan metode CompletableFuture melakukan hal itu, dan pada prinsipnya, Anda dapat melakukannya tanpa metode ini dengan membuat objek secara eksplisit, seperti yang ditunjukkan dalam contoh di atas.
Tetapi untuk ini perlu untuk memiliki kelas yang mirip dengan yang dijelaskan dalam contoh-contoh ini.
Untuk beberapa alasan, pencipta java.util.concurrent memilih untuk tidak memberi pengguna akses ke kelas-kelas ini dan menyembunyikannya di kedalaman kode CompletableFuture .


Mereka yang ingin memiliki representasi visual dari jaringan asinkron yang sedang dibuat dapat merekonstruksi kelas-kelas ini dengan melanjutkan contoh-contoh di atas. Kode sumber untuk contoh tersedia di Github .

Source: https://habr.com/ru/post/id418547/


All Articles