Node.js Streams pour les nuls ou comment travailler avec les streams

Je pense que beaucoup ont entendu parler de Node js Streams plus d'une fois, mais ne l'ont jamais utilisé, ou ne l'ont jamais utilisé sans réfléchir à leur fonctionnement, à leur diffusion et à leurs normes. Voyons ce que sont les flux, pipe (pipe), morceaux (morceau - partie des données) et tout ça))



Pourquoi est-il important de comprendre le fonctionnement des flux dans Node js? La réponse est simple: de nombreux modules intégrés dans Node js implémentent des flux, tels que les requêtes / réponses HTTP, la lecture / écriture fs, zlib, crypto, les sockets TCP et autres. Vous aurez également besoin de flux, par exemple, lors du traitement de gros fichiers, lorsque vous travaillez avec des images. Vous n'écrivez peut-être pas votre propre flux, mais comprendre comment cela fonctionne fera de vous un développeur plus compétent.

Alors, qu'est-ce qu'un flux (je vais utiliser ci-après à la place de Stream (stream)). Stream est un concept avec lequel vous pouvez traiter des données en petites parties, ce qui vous permet d'utiliser une petite quantité de RAM. De plus, avec son aide, nous pouvons diviser le traitement de chaque partie en modules indépendants les uns des autres (fonctions ou classes). Par exemple, nous pouvons compresser immédiatement une partie des données, puis chiffrer et écrire dans un fichier. L'idée principale n'est pas de travailler avec l'ensemble des données, mais de traiter une partie des données une par une.

Il existe 4 types de flux dans Node js:

  • Lisible
  • Inscriptible - Écriture
  • Duplex - Lecture et écriture
  • Transformer - une sorte de flux duplex qui peut modifier les données

Vous pouvez trouver des informations plus détaillées sur le site officiel et passons maintenant à la pratique.

Exemple simple


Je pense que beaucoup ont déjà utilisé des streams sans même s'en rendre compte. Dans cet exemple, nous envoyons simplement le fichier au client.

// 1 - ( )      ,         const getFile = async (req, res, next) => { const fileStream = fs.createReadStream('path to file'); res.contentType('application/pdf'); fileStream.pipe(res); }; // 2 - (  )         const getFile = async (req, res, next) => { const file = fs.readFileSync('path to file'); res.contentType('application/pdf'); res.send(file); }; 

La seule différence est que dans le premier cas, nous téléchargeons une partie du fichier et l’envoyons donc sans charger la RAM du serveur. Dans le deuxième cas, nous chargeons immédiatement l'intégralité du fichier dans la RAM et ensuite nous l'envoyons.

Plus loin dans l'article, nous analyserons chaque flux séparément. Vous pouvez créer un flux à l'aide de l'héritage ou à l'aide de la fonction constructeur.

 const { Readable } = require('stream'); // 1 -   const myReadable = new Readable(opt); // 2 -   class myReadable extends Readable { constructor(opt) { super(opt); } } 

Dans tous les exemples, j'utiliserai la méthode 2.

Flux lisible


Voyons comment créer un flux lisible dans NodeJS.

 const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} } 

Comme vous pouvez le voir dans l'exemple ci-dessus, cette classe accepte un ensemble de paramètres. Nous ne considérerons que ceux qui sont nécessaires pour une compréhension générale du flux lisible, le reste que vous pouvez voir dans la documentation. Nous nous intéressons au paramètre highWaterMark et à la méthode _read.

highWaterMark est le nombre maximal d'octets de la mémoire tampon de flux interne (par défaut 16 ko) à l'atteinte duquel la lecture de la ressource est suspendue. Afin de continuer la lecture, nous devons libérer le tampon interne. Nous pouvons le faire en appelant le canal, en reprenant les méthodes ou en vous abonnant à l'événement data.

_read est une implémentation d'une méthode privée qui est appelée par les méthodes internes de la classe Readable. Il est appelé en continu jusqu'à ce que la taille des données atteigne highWaterMark.

Eh bien, la dernière méthode qui nous intéresse est readable.push, elle ajoute directement des données au tampon interne. Il renvoie vrai, mais dès que le tampon est plein, un appel à cette méthode commencera à retourner faux. Il peut être contrôlé par la méthode readable._read.

Voyons maintenant un exemple pour clarifier la situation.

 class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`); 

Pour commencer, je dirai que counter.read () n'est pas le _read que nous avons implémenté dans la classe. Cette méthode est privée, celle-ci est publique et renvoie les données du tampon interne. Lorsque nous exécutons ce code, dans la console, nous verrons ce qui suit:



Que s'est-il passé ici? Lors de la création du nouveau flux Counter ({highWaterMark: 2}), nous avons indiqué que la taille de notre tampon interne serait de 2 octets, c'est-à-dire peut stocker 2 caractères (1 caractère = 1 octet). Après avoir appelé counter.read (), le flux commence la lecture, écrit «1» dans le tampon interne et le renvoie. Puis il continue de lire en écrivant «2». Lorsqu'il écrit «3», le tampon sera plein, readable.push retournera false et le flux attendra que le tampon interne soit libéré. Parce que dans notre exemple, il n'y a pas de logique pour libérer le tampon, le script se terminera.

Comme mentionné précédemment, afin de garantir que la lecture n'est pas interrompue, nous devons constamment vider le tampon interne. Pour ce faire, nous nous abonnons à l'événement data. Remplacez les 2 dernières lignes par le code suivant.

 const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); }); 

Maintenant, si nous exécutons cet exemple, nous verrons que tout a fonctionné comme il se doit et les nombres de 1 à 1000 sont affichés dans la console.

Flux écrit


En fait, il est très similaire à un flux lisible, uniquement destiné à l'écriture de données.

 const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} } 

Il accepte des paramètres similaires, comme Readable Stream. Nous sommes intéressés par highWaterMark et _write.

_write est une méthode privée qui est appelée par les méthodes internes de la classe Writable pour écrire une donnée. Il prend 3 paramètres: chunk (partie des données), encodage (encodage si chunk est une chaîne), callback (une fonction qui est appelée après une écriture réussie ou non).

highWaterMark est le nombre maximal d'octets du tampon de flux interne (par défaut 16 Ko), lorsqu'il atteint quel stream.write commencera à retourner faux.

Réécrivons l'exemple précédent avec un compteur.

 const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); } 

En fait, tout est simple, mais il y a une nuance intéressante qui mérite d'être rappelée! Lors de la création du nouveau flux Counter ({highWaterMark: 2}), nous avons indiqué que la taille de notre tampon interne serait de 2 octets, c'est-à-dire peut stocker 2 caractères (1 caractère = 1 octet). Lorsque le compteur atteint dix, le tampon sera rempli à chaque appel pour écrire; en conséquence, si l'écriture était vers une source lente, toutes les autres données seraient enregistrées dans la RAM lors de l'appel en écriture, ce qui pourrait provoquer un débordement (dans cet exemple, Bien sûr, cela n'a pas d'importance, car notre tampon est de 2 octets, mais avec de gros fichiers, vous devez vous en souvenir). Lorsqu'une telle situation se produit, nous devons attendre que le flux écrit la partie actuelle des données, libère le tampon interne (déclenche l'événement de vidange), puis nous pouvons reprendre l'enregistrement des données. Réécrivons notre exemple.

 const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })(); 

La méthode events.once a été ajoutée dans la version 11.13.0 et vous permet de créer une promesse et d'attendre qu'un événement spécifique soit exécuté une fois. Dans cet exemple, nous vérifions si les données peuvent être écrites dans le flux, sinon, attendons jusqu'à ce que le tampon soit libéré et continuons l'enregistrement.

À première vue, cela peut sembler une action inutile, mais lorsque vous travaillez avec de grandes quantités de données, par exemple, des fichiers qui pèsent plus de 10 Go, en oubliant de le faire, vous pouvez rencontrer une fuite de mémoire.

Flux duplex


Il combine les flux lisibles et inscriptibles, c'est-à-dire que nous devons écrire une implémentation des deux méthodes _read et _write.

 const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} } 

Ici, nous sommes intéressés par 2 paramètres que nous pouvons transmettre au constructeur, ce sont readableHighWaterMark et writableHighWaterMark, qui nous permettent de spécifier la taille du tampon interne pour les flux lisibles et inscriptibles, respectivement. Voici à quoi ressemblera l'implémentation des deux exemples précédents à l'aide du flux Duplex.

 const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })(); 

Je pense que ce code n'a pas besoin d'explication, car il est le même qu'avant, seulement dans une classe.

Transformer le flux


Ce flux est un flux duplex. Il est nécessaire de convertir une partie des données et de les envoyer plus loin dans la chaîne. Il peut être implémenté de la même manière que le reste du flux.

 const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} } 

Nous nous intéressons à la méthode _transform.

_transform est une méthode privée appelée par les méthodes internes de la classe Transform pour transformer un bloc de données. Il prend 3 paramètres: chunk (partie des données), encodage (encodage si chunk est une chaîne), callback (une fonction qui est appelée après une écriture réussie ou non).

En utilisant cette méthode, un changement dans la partie des données se produira. Dans cette méthode, nous pouvons appeler transform.push () zéro ou plusieurs fois, ce qui valide les modifications. Lorsque nous avons terminé la conversion des données, nous devons appeler un rappel, qui enverra tout ce que nous avons ajouté à transform.push (). Le premier paramètre de cette fonction de rappel est une erreur. De plus, nous ne pouvons pas utiliser transform.push (), mais envoyer les données modifiées en tant que deuxième paramètre à la fonction de rappel (exemple: callback (null, data)). Afin de comprendre comment utiliser ce type de flux, analysons la méthode stream.pipe.

stream.pipe - cette méthode est utilisée pour connecter le flux lisible au flux inscriptible, ainsi que pour créer des chaînes de flux. Cela signifie que nous pouvons lire une partie des données et les transférer vers le flux suivant pour le traitement, puis vers le suivant, etc.

Écrivons un flux de transformation qui ajoutera le caractère * au début et à la fin de chaque élément de données.

 class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter); 

Dans cet exemple, j'ai utilisé des flux lisibles et inscriptibles des exemples précédents, et j'ai également ajouté Transform. Comme vous pouvez le voir, cela s'est avéré assez simple.

Nous avons donc examiné comment les flux sont organisés. Leur concept principal est le traitement de données en plusieurs parties, ce qui est très pratique et ne nécessite pas de grandes ressources. De plus, les flux peuvent être utilisés avec des itérateurs, ce qui les rend encore plus pratiques à utiliser, mais c'est une histoire complètement différente.

Source: https://habr.com/ru/post/fr479048/


All Articles