Node.js Streaming untuk boneka atau cara bekerja dengan aliran

Saya pikir banyak orang telah mendengar tentang Node js Streams lebih dari sekali, tetapi tidak pernah menggunakannya, atau menggunakannya tanpa memikirkan bagaimana mereka bekerja, aliran pipa dan norma. Mari kita cari tahu apa itu stream, pipa (pipe), chunk (chunk - bagian dari data) dan semua itu))



Mengapa penting untuk memahami bagaimana stream bekerja di Node js? Jawabannya sederhana: banyak modul built-in di Node js mengimplementasikan stream, seperti permintaan / tanggapan HTTP, baca / tulis fs, zlib, crypto, soket TCP, dan lainnya. Anda juga perlu stream, misalnya, saat memproses file besar, saat bekerja dengan gambar. Anda mungkin tidak menulis aliran Anda sendiri, tetapi memahami cara kerjanya akan membuat Anda menjadi pengembang yang lebih kompeten.

Jadi, apa sebenarnya stream (selanjutnya saya akan gunakan sebagai pengganti Stream (stream)). Stream adalah konsep yang dengannya Anda dapat memproses data dalam bagian-bagian kecil, yang memungkinkan Anda untuk menggunakan sejumlah kecil RAM. Juga, dengan bantuannya, kita dapat membagi pemrosesan masing-masing bagian menjadi modul yang tidak saling bergantung (fungsi atau kelas). Misalnya, kita dapat langsung mengompres bagian dari data, kemudian mengenkripsi dan menulis ke file. Gagasan utamanya bukan untuk bekerja dengan seluruh data, tetapi untuk memproses bagian dari data satu per satu.

Ada 4 jenis aliran di Node js:

  • Dapat dibaca
  • Tertulis - Menulis
  • Dupleks - Baca dan Tulis
  • Transform - sejenis aliran Duplex yang dapat mengubah data

Anda dapat menemukan informasi lebih rinci di situs web resmi, dan sekarang mari kita beralih ke praktik.

Contoh sederhana


Saya pikir banyak yang sudah menggunakan stream tanpa menyadarinya. Dalam contoh ini, kami cukup mengirim file ke klien.

// 1 - ( )      ,         const getFile = async (req, res, next) => { const fileStream = fs.createReadStream('path to file'); res.contentType('application/pdf'); fileStream.pipe(res); }; // 2 - (  )         const getFile = async (req, res, next) => { const file = fs.readFileSync('path to file'); res.contentType('application/pdf'); res.send(file); }; 

Satu-satunya perbedaan adalah bahwa dalam kasus pertama, kami mengunduh sebagian file dan mengirimkannya, dengan demikian, tidak memuat RAM server. Dalam kasus kedua, kami segera memuat seluruh file ke dalam RAM dan baru kemudian mengirimkannya.

Lebih lanjut dalam artikel kami akan menganalisis setiap aliran secara terpisah. Anda dapat membuat aliran menggunakan warisan atau menggunakan fungsi konstruktor.

 const { Readable } = require('stream'); // 1 -   const myReadable = new Readable(opt); // 2 -   class myReadable extends Readable { constructor(opt) { super(opt); } } 

Dalam semua contoh, saya akan menggunakan 2 metode.

Aliran yang dapat dibaca


Mari kita lihat bagaimana kita dapat membuat aliran yang dapat dibaca di NodeJS.

 const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} } 

Seperti yang Anda lihat dari contoh di atas, kelas ini menerima satu set parameter. Kami hanya akan mempertimbangkan yang diperlukan untuk pemahaman umum tentang aliran yang Dapat Dibaca, sisanya dapat Anda lihat dalam dokumentasi. Kami tertarik pada parameter highWaterMark dan metode _read.

highWaterMark - ini adalah jumlah byte maksimum buffer aliran internal (secara default 16kb) setelah mencapai di mana pembacaan dari sumber daya ditangguhkan. Untuk melanjutkan membaca, kita perlu membebaskan buffer internal. Kita dapat melakukan ini dengan memanggil pipa, melanjutkan metode, atau dengan berlangganan acara data.

_read adalah implementasi dari metode privat yang disebut dengan metode internal kelas Readable. Itu disebut terus menerus sampai ukuran data mencapai highWaterMark.

Nah, metode terakhir yang menarik bagi kita adalah readable.push, secara langsung menambahkan data ke buffer internal. Ini mengembalikan true, tetapi segera setelah buffer penuh, panggilan ke metode ini akan mulai kembali salah. Itu dapat dikontrol oleh metode readable._read.

Mari kita lihat contoh untuk memperjelas situasi.

 class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`); 

Untuk memulainya, saya akan mengatakan bahwa counter.read () bukan _read yang kami implementasikan di kelas. Metode itu pribadi, dan ini publik, dan mengembalikan data dari buffer internal. Ketika kita menjalankan kode ini, di konsol kita akan melihat yang berikut:



Apa yang terjadi di sini? Saat membuat aliran Penghitung baru ({highWaterMark: 2}), kami mengindikasikan bahwa ukuran buffer internal kami akan menjadi 2 byte, mis. dapat menyimpan 2 karakter (1 karakter = 1 byte). Setelah memanggil counter.read (), aliran mulai membaca, menulis '1' ke buffer internal dan mengembalikannya. Kemudian dia melanjutkan membaca, menulis '2'. Ketika ia menulis '3', buffer akan penuh, dapat dibaca.push akan mengembalikan false, dan aliran akan menunggu sampai buffer internal dibebaskan. Karena dalam contoh kita, tidak ada logika untuk membebaskan buffer, skrip akan berakhir.

Seperti yang disebutkan sebelumnya, untuk memastikan bahwa bacaan tidak terganggu, kita perlu terus-menerus menghapus buffer internal. Untuk melakukan ini, kami berlangganan acara data. Ganti 2 baris terakhir dengan kode berikut.

 const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); }); 

Sekarang jika kita menjalankan contoh ini, kita akan melihat bahwa semuanya berfungsi sebagaimana mestinya dan angka dari 1 hingga 1000 ditampilkan di konsol.

Aliran tertulis


Bahkan, sangat mirip dengan aliran yang Dapat Dibaca, hanya dimaksudkan untuk menulis data.

 const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} } 

Ia menerima parameter serupa, seperti Stream yang Dapat Dibaca. Kami tertarik pada highWaterMark dan _write.

_write adalah metode pribadi yang dipanggil oleh metode internal kelas Writable untuk menulis sepotong data. Dibutuhkan 3 parameter: chunk (bagian dari data), encoding (penyandian jika chunk adalah sebuah string), callback (fungsi yang dipanggil setelah penulisan yang berhasil atau tidak berhasil).

highWaterMark adalah jumlah byte maksimum dari buffer aliran internal (secara default 16kb), setelah mencapai yang stream.write akan mulai mengembalikan false.

Mari kita menulis ulang contoh sebelumnya dengan penghitung.

 const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); } 

Sebenarnya, semuanya sederhana, tetapi ada satu nuansa menarik yang patut diingat! Saat membuat aliran Penghitung baru ({highWaterMark: 2}), kami mengindikasikan bahwa ukuran buffer internal kami akan menjadi 2 byte, mis. dapat menyimpan 2 karakter (1 karakter = 1 byte). Ketika penghitung mencapai sepuluh, buffer akan diisi dengan masing-masing panggilan untuk menulis, masing-masing, jika rekaman dilakukan dalam sumber yang lambat, maka semua data lain saat panggilan tulis akan disimpan dalam RAM, yang dapat menyebabkannya meluap (dalam contoh ini, ini Tentu saja itu tidak masalah, karena buffer kami adalah 2 byte, tetapi dengan file besar Anda perlu mengingat ini). Ketika situasi seperti itu muncul, kita perlu menunggu sampai aliran menulis bagian data saat ini, melepaskan buffer internal (memicu peristiwa drainase), dan kemudian kita dapat melanjutkan merekam data. Mari kita menulis ulang contoh kita.

 const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })(); 

Metode events.once ditambahkan di v11.13.0 dan memungkinkan Anda untuk membuat janji dan menunggu acara tertentu dijalankan sekali. Dalam contoh ini, kami memeriksa apakah data dapat ditulis ke aliran, jika tidak, maka tunggu hingga buffer dibebaskan dan melanjutkan perekaman.

Pada pandangan pertama, ini mungkin tampak seperti tindakan yang tidak perlu, tetapi ketika bekerja dengan sejumlah besar data, misalnya file yang beratnya lebih dari 10GB, lupa untuk melakukan ini, Anda mungkin mengalami kebocoran kehabisan memori.

Aliran duplex


Ini menggabungkan aliran Readable dan Writable, yaitu, kita harus menulis implementasi dari dua metode _read dan _write.

 const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} } 

Di sini kita tertarik pada 2 parameter yang dapat kita lewati ke konstruktor, ini dapat dibacaHighWaterMark dan writableHighWaterMark, yang memungkinkan kita untuk menentukan ukuran buffer internal untuk Readable, Writable stream, masing-masing. Ini adalah bagaimana implementasi dari dua contoh sebelumnya dengan bantuan aliran Duplex akan terlihat.

 const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })(); 

Saya pikir kode ini tidak perlu penjelasan, karena sama seperti sebelumnya, hanya dalam satu kelas.

Ubah aliran


Aliran ini adalah aliran dupleks. Diperlukan untuk mengonversi sepotong data dan mengirimkannya lebih jauh ke rantai. Itu dapat diimplementasikan dengan cara yang sama seperti sisa aliran.

 const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} } 

Kami tertarik dengan metode _transform.

_transform adalah metode pribadi yang dipanggil oleh metode internal kelas Transform untuk mentransformasi sepotong data. Dibutuhkan 3 parameter: chunk (bagian dari data), encoding (penyandian jika chunk adalah sebuah string), callback (fungsi yang dipanggil setelah penulisan yang berhasil atau tidak berhasil).

Dengan menggunakan metode ini, perubahan dalam porsi data akan terjadi. Di dalam metode ini, kita bisa memanggil transform.push () nol atau lebih, yang melakukan perubahan. Ketika kami menyelesaikan konversi data, kami perlu menelepon panggilan balik, yang akan mengirim semua yang kami tambahkan ke transform.push (). Parameter pertama dari fungsi panggilan balik ini adalah kesalahan. Selain itu, kami tidak dapat menggunakan transform.push (), tetapi mengirim data yang diubah sebagai parameter kedua ke fungsi callback (contoh: callback (null, data)). Untuk memahami cara menggunakan jenis aliran ini, mari kita menganalisis metode stream.pipe.

stream.pipe - metode ini digunakan untuk menghubungkan aliran Readable ke stream Writable, serta untuk membuat rantai aliran. Ini berarti bahwa kita dapat membaca bagian dari data dan mentransfernya ke aliran berikutnya untuk diproses, dan kemudian ke yang berikutnya, dll.

Mari kita menulis aliran Transform yang akan menambahkan karakter * ke awal dan akhir setiap bagian data.

 class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter); 

Dalam contoh ini, saya menggunakan aliran Readable dan Writable dari contoh sebelumnya, dan juga menambahkan Transform. Seperti yang Anda lihat, ternyata cukup sederhana.

Jadi kami melihat bagaimana aliran diatur. Konsep utama mereka adalah pemrosesan data di bagian-bagian, yang sangat nyaman dan tidak memerlukan sumber daya yang besar. Selain itu, stream dapat digunakan dengan iterator, yang membuatnya lebih nyaman digunakan, tetapi ini adalah cerita yang sama sekali berbeda.

Source: https://habr.com/ru/post/id479048/


All Articles