Dasar-dasar pemrograman reaktif menggunakan RxJS. Bagian 3. Observasi Tingkat Tinggi



Dalam artikel ini kita akan melihat bagaimana mungkin untuk memproses yang lain dalam satu utas, mengapa diperlukan, dan bagaimana Observable Order Tinggi (selanjutnya disebut HOO) operator akan membantu kami dalam hal ini.

Seri artikel "Dasar-dasar pemrograman reaktif menggunakan RxJS":



Ketika bekerja dengan utas, suatu situasi sering muncul ketika perlu untuk mentransfer hasil yang lain ke utas sebagai nilai. Sebagai contoh, kami ingin menjalankan permintaan ajax dan memproses responsnya di utas saat ini, atau menjalankan beberapa permintaan paralel, menerapkan pengumpulan. Saya pikir banyak orang terbiasa memecahkan masalah seperti itu menggunakan mekanisme seperti janji. Tetapi apakah mungkin untuk menyelesaikannya menggunakan RxJS? Tentu saja, dan semuanya jauh lebih mudah daripada yang Anda pikirkan!

Catatan : untuk memahami bagian teoritis artikel, Anda tidak harus membaca artikel sebelumnya, Anda hanya perlu tahu apa yang dapat diamati, operator dan pipa. Pada bagian praktis, kami akan memperbaiki contoh dari artikel kedua , yang dapat Anda temukan di sini .

Masalah


Mari kita bayangkan tugas berikut: kita perlu mencari tahu setiap detik apakah server dapat diakses. Bagaimana kita bisa menyelesaikannya?

Pertama, buat aliran menggunakan metode pengatur waktu:

timer(0, 1000).subscribe({ next: console.log }); 

Metode timer sangat mirip secara prinsip dengan interval . Tapi tidak seperti itu, ini memungkinkan Anda untuk mengatur waktu mulai utas, yang dikirim oleh parameter pertama. Parameter kedua menunjukkan interval di mana nilai baru akan dihasilkan. Jika parameter kedua tidak ditentukan, timer hanya akan menghasilkan satu nilai dan menghentikan aliran.

Karena Anda dan saya tidak memiliki server, saya sarankan hanya menulis fungsi yang mengemulasi permintaan ke server:

 const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) } 

Apa yang dilakukan metode ini? Ini mengembalikan aliran yang dibuat menggunakan metode timer, yang memancarkan nilai setelah satu detik berlalu dan berakhir. Karena metode penghitung waktu hanya menghasilkan angka, kami menggunakan operator mapTo untuk menggantinya dengan string "sukses".

Seperti inilah aliran yang dibuat oleh metode makeRequest:



Sekarang kita punya pilihan: untuk memanggil metode makeRequest di dalam aliran atau untuk menyerahkan tanggung jawab ini kepada pengamat?

Pendekatan pertama lebih disukai, karena dalam hal ini kita akan dapat menggunakan potensi penuh RxJS dengan operatornya dan membebaskan pengamat kita dari tugas yang tidak perlu. Kami menggunakan metode timer untuk mengeksekusi permintaan dengan interval:

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log }); 

Ketika kita menjalankan kode seperti itu, kita akan melihat bahwa di console.log kita tidak mendapatkan pesan dengan teks "success", tetapi objek bertipe Observable:



Jawabannya cukup diharapkan, karena di peta kami mengembalikan aliran. Agar streaming berfungsi, Anda harus berlangganan. Baiklah, mari kita lihat bagaimana tidak melakukannya :

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); }); 

Masalah dengan contoh di atas adalah kita mendapatkan langganan dalam langganan. Tetapi bagaimana jika kita ingin membuat lebih dari satu permintaan dalam satu rantai? Atau bagaimana jika pada suatu saat kita perlu berhenti berlangganan dari aliran di dalam? Dalam hal ini, kode kita akan semakin menyerupai "mie". Untuk mengatasi masalah ini, RxJS memiliki operator khusus yang disebut HOO.

Hoo


HOO adalah jenis pernyataan khusus yang menerima aliran sebagai nilai. Salah satu operator tersebut adalah metode mergeAll.

Ketika aliran tiba di mergeAll, itu berlangganan. Aliran dimana operator berlangganan disebut internal. Aliran dari mana operator menerima aliran lain sebagai nilai disebut eksternal.

Ketika utas internal menghasilkan nilai, menggabungkan Semua mendorong nilai itu ke utas eksternal. Dengan demikian, kami menyingkirkan kebutuhan untuk berlangganan secara manual. Jika kami berhenti berlangganan dari aliran eksternal, maka menggabungkan semua akan berhenti secara otomatis dari yang internal.

Mari kita lihat bagaimana kita dapat menulis ulang contoh kita dengan mergeAll:

 timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log }); 

Dalam contoh di atas, aliran eksternal dibuat oleh pernyataan timer. Dan aliran yang dibuat di operator peta bersifat internal. Setiap utas yang dibuat jatuh ke dalam pernyataan mergeAll.



Peta kombinasi + mergeAll digunakan sangat sering, oleh karena itu di RxJS ada metode mergeMap:

 timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log }); 

Ketika utas eksternal menghasilkan nilai, operator mergeMap memanggil fungsi panggil balik yang dilaluinya, yang menghasilkan utas baru. Kemudian, gabung berlangganan berlangganan aliran yang dihasilkan.



Kekhasan operator mergeAll / mergeMap adalah jika aliran lain turun ke sana, maka ia juga berlangganan. Jadi, dalam aliran eksternal, kita bisa mendapatkan nilai dari beberapa yang internal sekaligus. Mari kita lihat contoh berikut:

  timer(0, 1000) 

Beginilah tampilan stream eksternal tanpa operator mergeMap:



Demikian juga dengan mergeMap:

 timer(0, 1000).pipe( mergeMap(() => interval(1000)) ) 



Setiap detik, kami membuat utas internal baru dan menggabungkan subscribe. Dengan demikian, kami memiliki banyak utas internal yang bekerja secara bersamaan, nilai-nilai dari mana jatuh ke eksternal:





Catatan : hati-hati menggunakan mergeMap, setiap utas internal baru akan berfungsi sampai Anda berhenti berlangganan dari eksternal. Pada contoh di atas, jumlah utas internal bertambah setiap detik, pada akhirnya bisa ada begitu banyak utas sehingga komputer tidak dapat mengatasi beban.

concatAll / concatMap


Metode mergeMap sangat bagus ketika Anda tidak peduli dengan urutan eksekusi utas internal, tetapi bagaimana jika Anda membutuhkannya? Misalkan kita ingin permintaan server berikutnya hanya dieksekusi setelah menerima respons dari yang sebelumnya?

Untuk tujuan tersebut, operator HOO concatAll / concatMap cocok. Operator ini, setelah berlangganan ke utas internal, menunggu sampai selesai, dan hanya kemudian berlangganan yang berikutnya.

Jika selama eksekusi dari satu utas yang baru turun ke sana, maka ia ditempatkan di antrian sampai yang sebelumnya selesai.

 // ,  1     const firstInnerObservable = timer(1000).pipe( mapTo(1) ); // ,  2     const secondInnerObservable = timer(500).pipe( mapTo(2) ); of( firstInnerObservable, secondInnerObservable ).pipe( concatAll() ).subscribe({ next: console.log }); 

Pada contoh di atas, kami membuat dua utas menggunakan metode pengatur waktu. Untuk kejelasan, saya menggunakan operator mapTo untuk menampilkan nilai yang berbeda. Utas pertama akan menghasilkan 1, yang kedua - 2. Utas eksternal dibuat menggunakan metode, yang mengambil dua di atas yang dapat diamati sebagai input.

Pernyataan concatAll pertama kali menerima firstInnerObservable, berlangganan, dan menunggu sampai selesai, dan hanya setelah selesai berlangganan pertama ke secondInnerObservable. Inilah yang akan terlihat seperti aliran eksternal:



Jika kami mengganti concatAll dengan mergeAll, maka aliran akan terlihat seperti ini:

 of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log }); 



switchAll / switchMap


Operator ini berbeda dari yang sebelumnya ketika menerima aliran baru, ia segera berhenti berlangganan dari yang sebelumnya dan berlangganan yang baru.

Ambil contoh di atas dan ganti concatAll dengan switchAll, dan lihat bagaimana aliran eksternal berperilaku:

 of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log }); 



Hanya nilai dari aliran internal kedua yang masuk ke aliran eksternal. Itu karena switchMap berhenti berlangganan dari yang pertama saat menerima utas kedua.

Kapan ini dibutuhkan? Misalnya, saat menerapkan pencarian data. Jika respons dari server belum datang, dan kami telah mengirim permintaan baru, maka kami tidak masuk akal untuk menunggu yang sebelumnya.

knalpot / exhaustMap


knalpot adalah kebalikan dari pernyataan switchAll, dan perilakunya mirip dengan concatAll. Metode ini, berlangganan aliran, menunggu sampai selesai. Jika aliran baru turun ke sana, maka itu hanya dibuang.

 of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log }); 



Dalam contoh di atas, kami tidak mendapatkan deuce, karena pada saat itu operator sedang menunggu penyelesaian utas pertama, dan hanya menjatuhkan yang kedua.

Saya pikir banyak yang punya pertanyaan, kapan perilaku seperti itu dibutuhkan? Contoh yang baik adalah formulir login. Tidak masuk akal untuk mengirim beberapa permintaan ke server sampai yang saat ini selesai.

Kami sedang menyelesaikan aplikasi


Kami ingat contoh dari artikel kedua . Di dalamnya, kami menerapkan pencarian di GitHub dan menggunakan operator mergeMap untuk mengirim permintaan ke server. Sekarang kita tahu fitur-fitur operator ini, apakah ini benar-benar cocok untuk kasus kita?

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

Mari kita asumsikan bahwa server GitHub akan kelebihan beban, maka memproses respons kita akan memakan banyak waktu. Apa yang mungkin salah dalam kasus ini?

Misalkan pengguna memasukkan beberapa data, tidak menunggu jawaban, dan memasukkan yang baru. Dalam hal ini, kami akan mengirimkan permintaan kedua ke server. Namun, tidak ada yang menjamin bahwa jawaban untuk permintaan pertama akan datang lebih awal.

Karena operator mergeMap tidak peduli dalam urutan apa untuk memproses utas internal, dalam kasus ketika permintaan pertama dieksekusi lebih lambat dari yang kedua, kami akan menghapus data aktual. Oleh karena itu, saya mengusulkan untuk mengganti metode mergeMap dengan switchMap:

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

Sekarang, jika pengguna memasukkan data baru, switchMap akan berhenti berlangganan dari aliran sebelumnya dan berlangganan yang baru.

Perlu dicatat bahwa permintaan http kami akan terus hang sampai server memberikan jawaban. Namun, karena kami telah berhenti berlangganan dari aliran internal, jawabannya tidak akan jatuh ke aliran eksternal.

Catatan : jika Anda bekerja dengan Angular dan menggunakan HttpClient untuk bekerja dengan http, maka Anda tidak dapat khawatir tentang membatalkan permintaan itu sendiri. HttpClient dapat melakukan ini untuk Anda ketika berhenti berlangganan.

Batalkan http


Fetch api memiliki kemampuan untuk membatalkan permintaan http menggunakan AbortController . Ketika dikombinasikan dengan operator switchMap, fungsi ini akan menghemat lalu lintas pengguna.

Mari kita tulis ulang contoh kita sedikit. Dan buat metode yang akan membungkus panggilan pengambilan di diamati:

 const createCancellableRequest = (url) => { //      const controller = new AbortController(); const signal = controller.signal; return new Observable(observer => { fetch(url, { signal }) .then(response => { if (response.ok) { return response.json(); } throw new Error(''); }) //     .then(result => observer.next(result)) //   .then(() => observer.complete()) //   ,     .catch(error => observer.error(error)); // ,    return () => { //   controller.abort(); }; }); }; 

Juga ubah metode getUsersRepsFromApi:

 const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); } 

Sekarang metode tersebut tidak mengembalikan janji, tetapi dapat diamati. Oleh karena itu, kami menghapus bungkus dari di switchMap:

 switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) ) 

Catatan : dalam RxJS versi 6.5, mereka menambahkan pernyataan fromFetch , yang dengan sendirinya menyebut metode abort di bawah kap, sehingga Anda tidak perlu lagi menulis "sepeda" sendiri.

Itu saja! Semua kode sampel dapat ditemukan di sini .

Kesimpulan


Hari ini kami melihat apa itu HOO dan beberapa operator yang sangat berguna dari kategori ini. Tentu saja, ini jauh dari semuanya. Untuk informasi yang lebih terperinci dan terperinci, saya sarankan mengunjungi dokumentasi RxJS.

Dalam artikel selanjutnya saya berencana untuk mempertimbangkan apa perbedaan antara Panas dan Dingin yang dapat diamati.

Akhirnya: jangan menggunakan langganan dalam langganan, karena ada HOO!

Source: https://habr.com/ru/post/id450050/


All Articles