Bom dia

No meu trabalho, surgiu uma disputa entre eu e os afiliados sobre os threads na nova versão do Node.JS e a necessidade de sincronizá-los. Para começar, decidimos escolher a tarefa de escrever linhas no arquivo em paralelo. O tópico com worker_threads está quente, por favor, em cat.
Um pouco sobre os próprios fluxos. Eles são uma tecnologia experimental no Node.JS 10.5.0 e, para ter acesso ao módulo "worker_threads", é necessário executar nosso aplicativo Node.JS com o sinalizador "--experimental-worker". Registrei esse sinalizador no script de início no arquivo package.json:
{ "name": "worker-test", "version": "1.0.0", "description": "", "main": "app.js", "scripts": { "start": "node --max-old-space-size=4096 --experimental-worker app.js " }, "author": "", "license": "ISC" }
Agora sobre a própria lógica. O thread principal gera N threads de trabalho; todos eles gravam no arquivo em algum intervalo. Ao contrário de todos os exemplos em que os fluxos principal e filho começam em um arquivo, separei os fluxos em outro, parece-me mais limpo e elegante.
Na verdade, o código.
O principal arquivo app.js é o ponto de entrada.
const { Worker } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Hello from main!'); for (var i = 1; i <= WORKERS_NUMBER ; i++) { const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); }
Aqui, simplesmente criamos fluxos filho usando a classe Worker e especificando o caminho para o arquivo inicial do fluxo './writer-worker-app/app.js'. Ao criar o fluxo, transferimos o identificador auto-escrito como dados do workerData.
Inicie o arquivo para stream ./writer-worker-app/app.js:
const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); while (true) { sendMessage(); } function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); }
Bem, a classe mais simples de logger: ./writer-worker-app/logger.js
const fs = require('fs'); function log(message) { return fs.appendFileSync('./my-file.txt', message); } module.exports = { log };
Ao iniciar esse aplicativo, todos esperávamos que, no final, tivéssemos alguma bagunça no arquivo e os doadores gritassem como a trava necessária com semáforos e outras alegrias da execução paralela. Mas não! No arquivo, todas as linhas vão sem interrupção, exceto em ordem aleatória:
Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11
Um experimento maravilhoso, outra pequena vitória para Noda :-) Minha suposição é que toda sincronização ocorre no nível de E / S dos fluxos de Noda, mas ficarei feliz em saber a opção correta nos comentários. Por precaução, verificamos o trabalho usando não
fs.appendFileSync , mas
fs.createWriteStream e o método
stream.write .
O resultado saiu o mesmo.
Mas não paramos por aí.
Um colega sugeriu a tarefa de sincronizar threads. Para o nosso exemplo específico, seja a tarefa de gravar sequencialmente em um arquivo em ordem crescente de identificadores. Primeiro escreve o primeiro fluxo, depois o segundo, depois o terceiro e assim por diante.
Para fazer isso, introduzi outro Gerenciador de threads. Era possível conviver com o principal, mas estou muito satisfeito por criar esses trabalhadores isolados e criar comunicação por meio de mensagens. Antes de começar a escrever a implementação do Stream-Manager, você precisa criar um canal de comunicação entre ele e os escritores-trabalhadores. A classe
MessageChannel foi usada para isso. As instâncias desta classe têm dois campos:
porta1 e
porta2 , cada um dos quais pode escutar e enviar mensagens para a outra usando os
métodos .on ( 'message') e
.postMessage () . Essa classe foi criada como parte do módulo “worker_threads” para comunicação entre threads, porque geralmente quando um objeto é transferido, ele é simplesmente clonado e, em um ambiente de execução de thread isolado, é inútil.
Para comunicação entre 2 fluxos, devemos fornecer uma porta a todos.
Um fato interessante : na versão 10.5.0
, é impossível passar a porta pelo construtor do worker , você precisa fazer isso apenas através de worker.postMessage () e não se esqueça de especificar a porta no parâmetro transferList!
O próprio gerenciador de encadeamentos enviará comandos para os encadeamentos do gravador em ordem crescente de seus identificadores e enviará o próximo comando somente após receber uma resposta do gravador sobre a operação bem-sucedida.
Diagrama de aplicativo sob UML:

Nosso arquivo ./app.js principal alterado:
const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Main app initialized and started.'); const workersMeta = []; for (var i = 1; i <= WORKERS_NUMBER; i++) { const channel = new MessageChannel(); const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); workersMeta.push({ id: i, worker, channel }); } workersMeta.forEach(({ worker, channel }) => { worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]); }) setTimeout(() => { const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js')); const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 })); orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port)); console.log('All worker threads have been initialized'); }, WORKERS_NUMBER * 10);
Aqui, primeiro criamos trabalhadores, depois enviamos a cada porta uma porta para comunicação com o gerente (e só assim, através do construtor, é impossível fazer isso).
Em seguida, criamos um gerenciador de encadeamentos, enviamos uma lista de portas para comunicação com os fluxos de gravadores.
Atualizado : empiricamente, descobri que, ao trabalhar com fluxos, é melhor deixá-los preparar primeiro (inicialize conforme necessário). Para sempre, tive que ouvir algumas respostas dos fluxos de estilo "Estou pronto!", Mas decidi seguir o caminho mais fácil.
Também mudaremos o comportamento do encadeamento do gravador para que ele envie uma mensagem somente quando for solicitada e também retornará o resultado quando a operação de gravação for concluída:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); parentPort.on('message', value => { const orchestratorPort = value.orchestratorPort; orchestratorPort.on('message', data => { if (data.command == 'write') { console.log(`Worker ${id} received write command`); sendMessage(); sendResult(orchestratorPort); } }); console.log(`Worker ${id} started.`); }); function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } function sendResult(port) { port.postMessage({ id, status: 'completed' }); }
Inicializamos corretamente a partir da mensagem do fluxo pai, começamos a acontecer no canal do gerenciador de fluxo. Quando recebemos o comando, primeiro escrevemos no arquivo e depois enviamos o resultado. Deve-se observar que o arquivo é gravado de forma síncrona; portanto, sendResult () é chamado imediatamente após sendMessage ().
Tudo o que resta é escrever a implementação do nosso gerente inteligente
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads'); console.log('Orchestrator initialized.') let workerPorts; parentPort.on('message', (value) => { workerPorts = value.workerPorts; workerPorts.forEach(wp => wp.port.on('message', handleResponse)); console.log('Orchestrator started.'); sendCommand(workerPorts[0]); }); function handleResponse(status) { const responseWorkerId = status.id; let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1); if (!nextWorker) { nextWorker = workerPorts[0]; } sendCommand(nextWorker); } function sendCommand(worker) { worker.port.postMessage({ command: 'write' }); }
Temos uma lista de portas, ordenadas, para cada porta definir um retorno de chamada para a resposta, bem, e enviamos o comando para a primeira. No retorno de chamada, estamos procurando o próximo gravador e enviamos um comando para ele. Para não sobrecarregar muito o sistema, foi estabelecido o intervalo entre as equipes.
É isso aí, nosso aplicativo de gerenciamento de encadeamentos multithread está pronto. Aprendemos não apenas a gerar fluxos de trabalho no Node.JS, mas também a criar formas eficazes de comunicação entre eles. Na minha opinião pessoal, a arquitetura de threads isolados no Node.JS com espera e envio de mensagens é mais do que conveniente e promissora. Obrigado a todos pela atenção.
Todo o código fonte pode ser encontrado aqui .
ATUALIZAÇÃO
Para não enganar os leitores e também para não fornecer razões desnecessárias para escrever que trapaceio com o tempo limite, atualizei o artigo e o repositório.
Alterações:
1) os intervalos nos escritores originais são removidos, agora enquanto (verdadeiro) segue o hardcore
2) adicionado sinalizador --max-old-space-size = 4096, por precaução, porque a implementação atual dos fluxos não é muito estável e espero que isso ajude de alguma forma.
3) os intervalos para o envio de mensagens do gerenciador de threads foram excluídos. Agora a gravação é ininterrupta.
4) um tempo limite foi adicionado ao inicializar o gerente, por que - está descrito acima.
PARA FAZER:
1) adicione mensagens de tamanho variável ou contando a chamada do logger - obrigado FANAT1242
2) adicione uma referência, compare o trabalho da primeira e da segunda versões (quantas linhas serão gravadas em 10 segundos, por exemplo)
ATUALIZAÇÃO 2
1) O código de log foi alterado: agora cada mensagem tem um comprimento diferente.
2) O Writer-worker-app / app.old.js foi alterado: cada thread grava 1000 vezes e termina.
Isso foi feito para testar as idéias do usuário do FANAT1242. As mensagens iguais não são reescritas, as linhas do arquivo são exatamente 1000 * N threads.