Creo que muchos han escuchado sobre Node js Streams más de una vez, pero nunca lo han usado, o lo han usado sin pensar cómo funcionan, las transmisiones y las normas. Averigüemos qué son las secuencias, la tubería (tubería), los fragmentos (fragmento - parte de los datos) y todo eso))

¿Por qué es importante entender cómo funcionan las secuencias en Node js? La respuesta es simple: muchos de los módulos integrados en Node js implementan flujos, como solicitudes / respuestas HTTP, fs lectura / escritura, zlib, criptografía, sockets TCP y otros. También necesitará transmisiones, por ejemplo, cuando procese archivos grandes, cuando trabaje con imágenes. Es posible que no esté escribiendo su propia transmisión, pero comprender cómo funciona lo convertirá en un desarrollador más competente.
Entonces, ¿qué es exactamente una secuencia (en adelante usaré en lugar de Stream (secuencia)). Stream es un concepto con el que puede procesar datos en partes pequeñas, lo que le permite utilizar una pequeña cantidad de RAM. Además, con su ayuda, podemos dividir el procesamiento de cada parte en módulos independientes entre sí (funciones o clases). Por ejemplo, podemos comprimir inmediatamente parte de los datos, luego encriptar y escribir en un archivo. La idea principal no es trabajar con todos los datos, sino procesar parte de los datos uno por uno.
Hay 4 tipos de secuencias en Nodo js:
- Legible
- Escribible - Escritura
- Dúplex - Leer y escribir
- Transformar: un tipo de flujo dúplex que puede modificar datos
Puede encontrar información más detallada en el sitio web oficial, y ahora pasemos a practicar.
Ejemplo simple
Creo que muchos ya han usado transmisiones sin siquiera darse cuenta. En este ejemplo, simplemente enviamos el archivo al cliente.
La única diferencia es que, en el primer caso, descargamos parte del archivo y lo enviamos, por lo tanto, sin cargar la RAM del servidor. En el segundo caso, cargamos inmediatamente todo el archivo en la RAM y solo luego lo enviamos.
Más adelante en el artículo analizaremos cada transmisión por separado. Puede crear una secuencia utilizando la herencia o la función de constructor.
const { Readable } = require('stream');
En todos los ejemplos, usaré el método 2.
Transmisión legible
Veamos cómo podemos crear una secuencia legible en NodeJS.
const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} }
Como puede ver en el ejemplo anterior, esta clase acepta un conjunto de parámetros. Consideraremos solo aquellos que son necesarios para una comprensión general de la transmisión legible, el resto puede verlo en la documentación. Estamos interesados en el parámetro highWaterMark y el método _read.
highWaterMark es el número máximo de bytes del búfer de flujo interno (de forma predeterminada, 16 kb) al alcanzar el cual se suspende la lectura del recurso. Para continuar leyendo, necesitamos liberar el búfer interno. Podemos hacer esto llamando a la tubería, reanudar métodos o suscribiéndose al evento de datos.
_read es una implementación de un método privado que se llama mediante métodos internos de la clase Readable. Se llama continuamente hasta que el tamaño de los datos alcanza highWaterMark.
Bueno, el último método que nos interesa es legible.push, agrega directamente datos al búfer interno. Devuelve verdadero, pero tan pronto como el búfer esté lleno, una llamada a este método comenzará a devolver falso. Se puede controlar mediante el método readable._read.
Veamos ahora un ejemplo para aclarar la situación.
class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`);
Para empezar, diré que counter.read () no es el _read que implementamos en la clase. Ese método es privado, y este es público, y devuelve datos del búfer interno. Cuando ejecutamos este código, en la consola veremos lo siguiente:

Que paso aqui Al crear la nueva secuencia de Counter ({highWaterMark: 2}), indicamos que el tamaño de nuestro búfer interno sería de 2 bytes, es decir, puede almacenar 2 caracteres (1 carácter = 1 byte). Después de llamar a counter.read (), la secuencia comienza a leer, escribe '1' en el búfer interno y lo devuelve. Luego continúa leyendo, escribiendo '2'. Cuando escribe '3', el búfer estará lleno, readable.push devolverá falso y la secuencia esperará hasta que se libere el búfer interno. Porque en nuestro ejemplo, no hay lógica para liberar el búfer, el script finalizará.
Como se mencionó anteriormente, para garantizar que la lectura no se interrumpa, necesitamos limpiar constantemente el búfer interno. Para hacer esto, nos suscribimos al evento de datos. Reemplace las últimas 2 líneas con el siguiente código.
const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); });
Ahora, si ejecutamos este ejemplo, veremos que todo funcionó como debería y los números del 1 al 1000 se muestran en la consola.
Secuencia escrita
De hecho, es muy similar a un flujo legible, solo destinado a escribir datos.
const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} }
Acepta parámetros similares, como Readable Stream. Estamos interesados en highWaterMark y _write.
_write es un método privado que los métodos internos de la clase Writable llaman para escribir un dato. Se necesitan 3 parámetros: fragmento (parte de los datos), codificación (codificación si el fragmento es una cadena), devolución de llamada (una función que se llama después de una escritura exitosa o fallida).
highWaterMark es el número máximo de bytes del búfer de flujo interno (por defecto 16kb), al llegar a qué flujo.write comenzará a devolver falso.
Reescribamos el ejemplo anterior con un contador.
const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); }
De hecho, todo es simple, ¡pero hay un matiz interesante que vale la pena recordar! Al crear la nueva secuencia de Counter ({highWaterMark: 2}), indicamos que el tamaño de nuestro búfer interno sería de 2 bytes, es decir, puede almacenar 2 caracteres (1 carácter = 1 byte). Cuando el contador llegue a diez, el búfer se llenará con cada llamada a escritura; en consecuencia, si la escritura se realizó en una fuente lenta, todos los demás datos se guardarían en la RAM al llamar a la escritura, lo que podría causar que se desborde (en este ejemplo, Por supuesto que no importa, ya que nuestro búfer es de 2 bytes, pero con archivos grandes necesita recordar esto). Cuando surge tal situación, debemos esperar hasta que la secuencia escriba la porción actual de datos, libere el búfer interno (desencadena el evento de drenaje) y luego podemos reanudar la grabación de datos. Reescribamos nuestro ejemplo.
const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })();
El método events.once se agregó en v11.13.0 y le permite crear una promesa y esperar a que se ejecute un evento específico una vez. En este ejemplo, verificamos si los datos se pueden escribir en la secuencia, si no, luego esperamos hasta que se libere el búfer y continúe grabando.
A primera vista, esto puede parecer una acción innecesaria, pero al trabajar con grandes cantidades de datos, por ejemplo, archivos que pesan más de 10 GB, olvidando hacer esto, puede encontrar una pérdida de memoria.
Flujo dúplex
Combina secuencias legibles y grabables, es decir, debemos escribir una implementación de los dos métodos _read y _write.
const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} }
Aquí estamos interesados en 2 parámetros que podemos pasar al constructor, estos son readableHighWaterMark y writableHighWaterMark, que nos permiten especificar el tamaño del búfer interno para flujos legibles y grabables, respectivamente. Así es como se verá la implementación de los dos ejemplos anteriores con la ayuda de la secuencia Duplex.
const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })();
Creo que este código no necesita explicación, ya que es el mismo que antes, solo en una clase.
Transformar flujo
Este flujo es un flujo dúplex. Es necesario para convertir una porción de datos y enviarla más abajo en la cadena. Se puede implementar de la misma manera que el resto de la transmisión.
const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} }
Estamos interesados en el método _transform.
_transform es un método privado llamado por los métodos internos de la clase Transform para transformar una porción de datos. Se necesitan 3 parámetros: fragmento (parte de los datos), codificación (codificación si el fragmento es una cadena), devolución de llamada (una función que se llama después de una escritura exitosa o fallida).
Con este método, se producirá un cambio en la porción de datos. Dentro de este método, podemos llamar a transform.push () cero o más veces, lo que confirma los cambios. Cuando terminemos la conversión de datos, debemos llamar a una devolución de llamada, que enviará todo lo que agregamos a transform.push (). El primer parámetro de esta función de devolución de llamada es un error. Además, no podemos usar transform.push (), pero enviamos los datos modificados como el segundo parámetro a la función de devolución de llamada (ejemplo: devolución de llamada (nulo, datos)). Para entender cómo usar este tipo de transmisión, analicemos el método stream.pipe.
stream.pipe: este método se utiliza para conectar la secuencia legible a la secuencia de escritura, así como para crear cadenas de secuencia. Esto significa que podemos leer parte de los datos y transferirlos a la siguiente secuencia para su procesamiento, y luego a la siguiente, etc.
Escribamos una secuencia de Transformación que agregará el carácter * al principio y al final de cada pieza de datos.
class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter);
En este ejemplo, utilicé secuencias legibles y grabables de los ejemplos anteriores, y también agregué Transformar. Como puede ver, resultó bastante simple.
Así que miramos cómo se organizan las corrientes. Su concepto principal es el procesamiento de datos en partes, lo cual es muy conveniente y no requiere grandes recursos. Además, las transmisiones se pueden usar con iteradores, lo que las hace aún más cómodas de usar, pero esta es una historia completamente diferente.