Eu acho que muitos já ouviram falar sobre o Node js Streams mais de uma vez, mas nunca o usaram ou o usaram sem pensar em como eles funcionam, fluxo canalizado e normas. Vamos descobrir o que são fluxos, pipe (pipe), chunks (chunk - parte dos dados) e tudo mais))

Por que é importante entender como os fluxos funcionam no nó js? A resposta é simples: muitos dos módulos internos no Nó js implementam fluxos, como solicitações / respostas HTTP, leitura / gravação de FS, zlib, criptografia, soquetes TCP e outros. Você também precisará de fluxos, por exemplo, ao processar arquivos grandes, ao trabalhar com imagens. Você pode não estar escrevendo seu próprio fluxo, mas entender como ele funciona o tornará um desenvolvedor mais competente.
Então, o que exatamente é um fluxo (daqui em diante vou usar em vez de fluxo (fluxo)). Stream é um conceito com o qual você pode processar dados em pequenas partes, o que permite usar uma pequena quantidade de RAM. Além disso, com sua ajuda, podemos dividir o processamento de cada parte em módulos independentes um do outro (funções ou classes). Por exemplo, podemos compactar imediatamente parte dos dados, criptografar e gravar em um arquivo. A idéia principal não é trabalhar com os dados inteiros, mas processar parte dos dados um por um.
Existem 4 tipos de fluxos no nó js:
- Legível
- Gravável - Escrita
- Duplex - leitura e gravação
- Transform - um tipo de fluxo Duplex que pode modificar dados
Você pode encontrar informações mais detalhadas no site oficial e agora vamos praticar.
Exemplo simples
Eu acho que muitos já usaram fluxos sem nem perceber. Neste exemplo, simplesmente enviamos o arquivo para o cliente.
A única diferença é que, no primeiro caso, fazemos o download de parte do arquivo e o enviamos, portanto, não carregando a RAM do servidor. No segundo caso, carregamos imediatamente o arquivo inteiro na RAM e só então o enviamos.
Mais adiante neste artigo, analisaremos cada fluxo separadamente. Você pode criar um fluxo usando herança ou usando a função construtora.
const { Readable } = require('stream');
Em todos os exemplos, usarei o método 2.
Fluxo legível
Vejamos como podemos criar um fluxo legível no NodeJS.
const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} }
Como você pode ver no exemplo acima, essa classe aceita um conjunto de parâmetros. Consideraremos apenas aqueles que são necessários para uma compreensão geral do fluxo Readable, o restante você pode ver na documentação. Estamos interessados no parâmetro highWaterMark e no método _read.
highWaterMark é o número máximo de bytes do buffer interno do fluxo (por padrão, 16kb) ao atingir o qual a leitura do recurso está suspensa. Para continuar lendo, precisamos liberar o buffer interno. Podemos fazer isso chamando o canal, métodos de retomada ou assinando o evento de dados.
_read é uma implementação de um método privado chamado por métodos internos da classe Readable. É chamado continuamente até que o tamanho dos dados atinja highWaterMark.
Bem, o último método que nos interessa é o readable.push, ele adiciona diretamente os dados ao buffer interno. Ele retorna true, mas assim que o buffer estiver cheio, uma chamada para esse método começará a retornar false. Pode ser controlado pelo método readable._read.
Vamos agora ver um exemplo para esclarecer a situação.
class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`);
Para começar, direi que counter.read () não é o _read que implementamos na classe. Esse método é privado, e este é público, e retorna dados do buffer interno. Quando executamos esse código, no console, veremos o seguinte:

O que aconteceu aqui? Ao criar o novo fluxo Counter ({highWaterMark: 2}), indicamos que o tamanho do nosso buffer interno seria 2 bytes, ou seja, pode armazenar 2 caracteres (1 caractere = 1 byte). Após chamar counter.read (), o fluxo começa a ler, grava '1' no buffer interno e o retorna. Então ele continua lendo, escrevendo '2'. Quando ele escreve '3', o buffer estará cheio, readable.push retornará false e o fluxo aguardará até que o buffer interno seja liberado. Porque no nosso exemplo, não há lógica para liberar o buffer, o script terminará.
Como mencionado anteriormente, para garantir que a leitura não seja interrompida, precisamos limpar constantemente o buffer interno. Para fazer isso, assinamos o evento de dados. Substitua as 2 últimas linhas pelo código a seguir.
const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); });
Agora, se executarmos este exemplo, veremos que tudo funcionou como deveria e os números de 1 a 1000 são exibidos no console.
Fluxo escrito
De fato, é muito semelhante a um fluxo legível, destinado apenas à gravação de dados.
const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} }
Ele aceita parâmetros semelhantes, como fluxo legível. Estamos interessados em highWaterMark e _write.
_write é um método privado chamado pelos métodos internos da classe Writable para gravar uma parte dos dados. São necessários três parâmetros: chunk (parte dos dados), codificação (codificação se chunk é uma string), callback (uma função chamada após uma gravação bem-sucedida ou malsucedida).
highWaterMark é o número máximo de bytes do buffer interno do fluxo (por padrão, 16kb), ao atingir qual stream.write começará a retornar false.
Vamos reescrever o exemplo anterior com um contador.
const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); }
De fato, tudo é simples, mas há uma nuance interessante que vale a pena lembrar! Ao criar o novo fluxo Counter ({highWaterMark: 2}), indicamos que o tamanho do nosso buffer interno seria 2 bytes, ou seja, pode armazenar 2 caracteres (1 caractere = 1 byte). Quando o contador atingir dez, o buffer será preenchido com cada chamada para gravação; portanto, se a gravação for em uma fonte lenta, todos os outros dados serão salvos na RAM ao chamar a gravação, o que pode causar o transbordamento (neste exemplo, Claro que isso não importa, já que nosso buffer é de 2 bytes, mas com arquivos grandes, você precisa se lembrar disso). Quando essa situação ocorre, precisamos esperar até que o fluxo grave a parte atual dos dados, libere o buffer interno (aciona o evento de drenagem) e, em seguida, podemos retomar a gravação dos dados. Vamos reescrever o nosso exemplo.
const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })();
O método events.once foi adicionado na v11.13.0 e permite criar uma promessa e aguardar que um evento específico seja executado uma vez. Neste exemplo, verificamos se os dados podem ser gravados no fluxo, caso contrário, aguardamos até que o buffer seja liberado e continuamos a gravar.
À primeira vista, isso pode parecer uma ação desnecessária, mas ao trabalhar com grandes quantidades de dados, por exemplo, arquivos com peso superior a 10 GB, esquecendo-se de fazer isso, você pode encontrar um vazamento de memória.
Fluxo duplex
Ele combina fluxos legíveis e graváveis, ou seja, precisamos escrever uma implementação dos dois métodos _read e _write.
const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} }
Aqui, estamos interessados em 2 parâmetros que podemos passar para o construtor, que são legíveisHighWaterMark e writableHighWaterMark, que permitem especificar o tamanho do buffer interno para fluxos legíveis e graváveis, respectivamente. É assim que a implementação dos dois exemplos anteriores, com a ajuda do fluxo Duplex, será exibida.
const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })();
Eu acho que esse código não precisa de explicação, pois é o mesmo de antes, apenas em uma classe.
Transformar fluxo
Esse fluxo é um fluxo duplex. É necessário converter uma parte dos dados e enviá-los mais adiante na cadeia. Pode ser implementado da mesma maneira que o restante do fluxo.
const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} }
Estamos interessados no método _transform.
_transform é um método privado chamado pelos métodos internos da classe Transform para transformar um pedaço de dados. São necessários três parâmetros: chunk (parte dos dados), codificação (codificação se chunk é uma string), callback (uma função chamada após uma gravação bem-sucedida ou malsucedida).
Usando esse método, ocorrerá uma alteração na parte dos dados. Dentro deste método, podemos chamar transform.push () zero ou mais vezes, o que confirma as alterações. Quando terminarmos a conversão de dados, precisamos chamar um retorno de chamada, que enviará tudo o que adicionamos ao transform.push (). O primeiro parâmetro dessa função de retorno de chamada é um erro. Além disso, não podemos usar transform.push (), mas enviar os dados alterados como o segundo parâmetro para a função de retorno de chamada (exemplo: retorno de chamada (nulo, dados)). Para entender como usar esse tipo de fluxo, vamos analisar o método stream.pipe.
stream.pipe - esse método é usado para conectar o fluxo legível ao fluxo gravável e para criar cadeias de fluxo. Isso significa que podemos ler parte dos dados e transferi-los para o próximo fluxo para processamento, e depois para o próximo, etc.
Vamos escrever um fluxo do Transform que adicionará o caractere * ao início e ao final de cada parte dos dados.
class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter);
Neste exemplo, usei fluxos legíveis e graváveis dos exemplos anteriores e também adicionei o Transform. Como você pode ver, ficou bem simples.
Então, vimos como os fluxos são organizados. Seu conceito principal é o processamento de dados em partes, o que é muito conveniente e não requer grandes recursos. Além disso, os fluxos podem ser usados com iteradores, o que os torna ainda mais convenientes de usar, mas essa é uma história completamente diferente.