Node.js Streams para manequins ou como trabalhar com streams

Eu acho que muitos já ouviram falar sobre o Node js Streams mais de uma vez, mas nunca o usaram ou o usaram sem pensar em como eles funcionam, fluxo canalizado e normas. Vamos descobrir o que são fluxos, pipe (pipe), chunks (chunk - parte dos dados) e tudo mais))



Por que é importante entender como os fluxos funcionam no nó js? A resposta é simples: muitos dos módulos internos no Nó js implementam fluxos, como solicitações / respostas HTTP, leitura / gravação de FS, zlib, criptografia, soquetes TCP e outros. Você também precisará de fluxos, por exemplo, ao processar arquivos grandes, ao trabalhar com imagens. Você pode não estar escrevendo seu próprio fluxo, mas entender como ele funciona o tornará um desenvolvedor mais competente.

Então, o que exatamente é um fluxo (daqui em diante vou usar em vez de fluxo (fluxo)). Stream é um conceito com o qual você pode processar dados em pequenas partes, o que permite usar uma pequena quantidade de RAM. Além disso, com sua ajuda, podemos dividir o processamento de cada parte em módulos independentes um do outro (funções ou classes). Por exemplo, podemos compactar imediatamente parte dos dados, criptografar e gravar em um arquivo. A idéia principal não é trabalhar com os dados inteiros, mas processar parte dos dados um por um.

Existem 4 tipos de fluxos no nó js:

  • Legível
  • Gravável - Escrita
  • Duplex - leitura e gravação
  • Transform - um tipo de fluxo Duplex que pode modificar dados

Você pode encontrar informações mais detalhadas no site oficial e agora vamos praticar.

Exemplo simples


Eu acho que muitos já usaram fluxos sem nem perceber. Neste exemplo, simplesmente enviamos o arquivo para o cliente.

// 1 - ( )      ,         const getFile = async (req, res, next) => { const fileStream = fs.createReadStream('path to file'); res.contentType('application/pdf'); fileStream.pipe(res); }; // 2 - (  )         const getFile = async (req, res, next) => { const file = fs.readFileSync('path to file'); res.contentType('application/pdf'); res.send(file); }; 

A única diferença é que, no primeiro caso, fazemos o download de parte do arquivo e o enviamos, portanto, não carregando a RAM do servidor. No segundo caso, carregamos imediatamente o arquivo inteiro na RAM e só então o enviamos.

Mais adiante neste artigo, analisaremos cada fluxo separadamente. Você pode criar um fluxo usando herança ou usando a função construtora.

 const { Readable } = require('stream'); // 1 -   const myReadable = new Readable(opt); // 2 -   class myReadable extends Readable { constructor(opt) { super(opt); } } 

Em todos os exemplos, usarei o método 2.

Fluxo legível


Vejamos como podemos criar um fluxo legível no NodeJS.

 const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} } 

Como você pode ver no exemplo acima, essa classe aceita um conjunto de parâmetros. Consideraremos apenas aqueles que são necessários para uma compreensão geral do fluxo Readable, o restante você pode ver na documentação. Estamos interessados ​​no parâmetro highWaterMark e no método _read.

highWaterMark é o número máximo de bytes do buffer interno do fluxo (por padrão, 16kb) ao atingir o qual a leitura do recurso está suspensa. Para continuar lendo, precisamos liberar o buffer interno. Podemos fazer isso chamando o canal, métodos de retomada ou assinando o evento de dados.

_read é uma implementação de um método privado chamado por métodos internos da classe Readable. É chamado continuamente até que o tamanho dos dados atinja highWaterMark.

Bem, o último método que nos interessa é o readable.push, ele adiciona diretamente os dados ao buffer interno. Ele retorna true, mas assim que o buffer estiver cheio, uma chamada para esse método começará a retornar false. Pode ser controlado pelo método readable._read.

Vamos agora ver um exemplo para esclarecer a situação.

 class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`); 

Para começar, direi que counter.read () não é o _read que implementamos na classe. Esse método é privado, e este é público, e retorna dados do buffer interno. Quando executamos esse código, no console, veremos o seguinte:



O que aconteceu aqui? Ao criar o novo fluxo Counter ({highWaterMark: 2}), indicamos que o tamanho do nosso buffer interno seria 2 bytes, ou seja, pode armazenar 2 caracteres (1 caractere = 1 byte). Após chamar counter.read (), o fluxo começa a ler, grava '1' no buffer interno e o retorna. Então ele continua lendo, escrevendo '2'. Quando ele escreve '3', o buffer estará cheio, readable.push retornará false e o fluxo aguardará até que o buffer interno seja liberado. Porque no nosso exemplo, não há lógica para liberar o buffer, o script terminará.

Como mencionado anteriormente, para garantir que a leitura não seja interrompida, precisamos limpar constantemente o buffer interno. Para fazer isso, assinamos o evento de dados. Substitua as 2 últimas linhas pelo código a seguir.

 const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); }); 

Agora, se executarmos este exemplo, veremos que tudo funcionou como deveria e os números de 1 a 1000 são exibidos no console.

Fluxo escrito


De fato, é muito semelhante a um fluxo legível, destinado apenas à gravação de dados.

 const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} } 

Ele aceita parâmetros semelhantes, como fluxo legível. Estamos interessados ​​em highWaterMark e _write.

_write é um método privado chamado pelos métodos internos da classe Writable para gravar uma parte dos dados. São necessários três parâmetros: chunk (parte dos dados), codificação (codificação se chunk é uma string), callback (uma função chamada após uma gravação bem-sucedida ou malsucedida).

highWaterMark é o número máximo de bytes do buffer interno do fluxo (por padrão, 16kb), ao atingir qual stream.write começará a retornar false.

Vamos reescrever o exemplo anterior com um contador.

 const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); } 

De fato, tudo é simples, mas há uma nuance interessante que vale a pena lembrar! Ao criar o novo fluxo Counter ({highWaterMark: 2}), indicamos que o tamanho do nosso buffer interno seria 2 bytes, ou seja, pode armazenar 2 caracteres (1 caractere = 1 byte). Quando o contador atingir dez, o buffer será preenchido com cada chamada para gravação; portanto, se a gravação for em uma fonte lenta, todos os outros dados serão salvos na RAM ao chamar a gravação, o que pode causar o transbordamento (neste exemplo, Claro que isso não importa, já que nosso buffer é de 2 bytes, mas com arquivos grandes, você precisa se lembrar disso). Quando essa situação ocorre, precisamos esperar até que o fluxo grave a parte atual dos dados, libere o buffer interno (aciona o evento de drenagem) e, em seguida, podemos retomar a gravação dos dados. Vamos reescrever o nosso exemplo.

 const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })(); 

O método events.once foi adicionado na v11.13.0 e permite criar uma promessa e aguardar que um evento específico seja executado uma vez. Neste exemplo, verificamos se os dados podem ser gravados no fluxo, caso contrário, aguardamos até que o buffer seja liberado e continuamos a gravar.

À primeira vista, isso pode parecer uma ação desnecessária, mas ao trabalhar com grandes quantidades de dados, por exemplo, arquivos com peso superior a 10 GB, esquecendo-se de fazer isso, você pode encontrar um vazamento de memória.

Fluxo duplex


Ele combina fluxos legíveis e graváveis, ou seja, precisamos escrever uma implementação dos dois métodos _read e _write.

 const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} } 

Aqui, estamos interessados ​​em 2 parâmetros que podemos passar para o construtor, que são legíveisHighWaterMark e writableHighWaterMark, que permitem especificar o tamanho do buffer interno para fluxos legíveis e graváveis, respectivamente. É assim que a implementação dos dois exemplos anteriores, com a ajuda do fluxo Duplex, será exibida.

 const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })(); 

Eu acho que esse código não precisa de explicação, pois é o mesmo de antes, apenas em uma classe.

Transformar fluxo


Esse fluxo é um fluxo duplex. É necessário converter uma parte dos dados e enviá-los mais adiante na cadeia. Pode ser implementado da mesma maneira que o restante do fluxo.

 const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} } 

Estamos interessados ​​no método _transform.

_transform é um método privado chamado pelos métodos internos da classe Transform para transformar um pedaço de dados. São necessários três parâmetros: chunk (parte dos dados), codificação (codificação se chunk é uma string), callback (uma função chamada após uma gravação bem-sucedida ou malsucedida).

Usando esse método, ocorrerá uma alteração na parte dos dados. Dentro deste método, podemos chamar transform.push () zero ou mais vezes, o que confirma as alterações. Quando terminarmos a conversão de dados, precisamos chamar um retorno de chamada, que enviará tudo o que adicionamos ao transform.push (). O primeiro parâmetro dessa função de retorno de chamada é um erro. Além disso, não podemos usar transform.push (), mas enviar os dados alterados como o segundo parâmetro para a função de retorno de chamada (exemplo: retorno de chamada (nulo, dados)). Para entender como usar esse tipo de fluxo, vamos analisar o método stream.pipe.

stream.pipe - esse método é usado para conectar o fluxo legível ao fluxo gravável e para criar cadeias de fluxo. Isso significa que podemos ler parte dos dados e transferi-los para o próximo fluxo para processamento, e depois para o próximo, etc.

Vamos escrever um fluxo do Transform que adicionará o caractere * ao início e ao final de cada parte dos dados.

 class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter); 

Neste exemplo, usei fluxos legíveis e graváveis ​​dos exemplos anteriores e também adicionei o Transform. Como você pode ver, ficou bem simples.

Então, vimos como os fluxos são organizados. Seu conceito principal é o processamento de dados em partes, o que é muito conveniente e não requer grandes recursos. Além disso, os fluxos podem ser usados ​​com iteradores, o que os torna ainda mais convenientes de usar, mas essa é uma história completamente diferente.

Source: https://habr.com/ru/post/pt479048/


All Articles