اللعب مع المواضيع في Node.JS 10.5.0

يوم جيد



في عملي ، نشأ نزاع بيني وبين الشركات التابعة حول سلاسل الرسائل في الإصدار الجديد من Node.JS والحاجة إلى مزامنتها. بادئ ذي بدء ، قررنا اختيار مهمة كتابة الأسطر إلى الملف بالتوازي. الموضوع مع worker_threads ساخن ، من فضلك ، تحت القط.

القليل عن الجداول نفسها. إنها تقنية تجريبية في Node.JS 10.5.0 ، ومن أجل الوصول إلى وحدة "worker_threads" ، تحتاج إلى تشغيل تطبيق Node.JS مع العلم "- عامل تجريبي". لقد سجلت هذه العلامة في البرنامج النصي لبدء التشغيل في ملف package.json:
{ "name": "worker-test", "version": "1.0.0", "description": "", "main": "app.js", "scripts": { "start": "node --max-old-space-size=4096 --experimental-worker app.js " }, "author": "", "license": "ISC" } 

الآن عن المنطق نفسه. يفرز الخيط الرئيسي N خيوط العامل ؛ وكلهم يكتبون إلى الملف في بعض الفترات. على عكس جميع الأمثلة التي تبدأ فيها التدفقات الرئيسية والطفل من ملف واحد ، قمت بفصل التدفقات إلى ملف منفصل ، يبدو لي أكثر نظافة وأناقة.

في الواقع ، الرمز.

ملف app.js الرئيسي هو نقطة الدخول.

 const { Worker } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Hello from main!'); for (var i = 1; i <= WORKERS_NUMBER ; i++) { const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); } 

هنا نقوم ببساطة بإنشاء تدفقات فرعية باستخدام فئة العامل وتحديد المسار إلى ملف البداية للدفق './writer-worker-app/app.js'. عند إنشاء الدفق ، فإننا ننقل المعرف المكتوب ذاتيًا على أنه بيانات workerData.

بدء ملف الدفق. / writer-worker-app/app.js:

 const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); while (true) { sendMessage(); } function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } 


حسنًا ، أبسط فئة تسجيل: ./writer-worker-app/logger.js
 const fs = require('fs'); function log(message) { return fs.appendFileSync('./my-file.txt', message); } module.exports = { log }; 

عند بدء هذا التطبيق ، كنا نأمل جميعًا في أننا سنحصل في النهاية على بعض الفوضى في الملف وسوف يصرخ المانحون حول كيفية استخدام الأقفال اللازمة مع الإشارات وغيرها من أفراح التنفيذ المتوازي. لكن لا! في الملف ، تذهب جميع الأسطر دون انقطاع ، إلا بترتيب عشوائي:

Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11

تجربة رائعة ، انتصار صغير آخر لـ Noda :-) وافتراضي هو أن كل المزامنة تحدث على مستوى I / O لتدفقات Noda ، لكنني سأكون سعيدًا بمعرفة الخيار الصحيح في التعليقات. في هذه الحالة فقط ، قمنا بفحص العمل باستخدام ليس fs.appendFileSync ، ولكن fs.createWriteStream وأسلوب stream.write .

خرجت النتيجة نفسها.

لكننا لم نتوقف عند هذا الحد.


اقترح زميل مهمة مزامنة المواضيع. بالنسبة لمثالنا المحدد ، فليكن مهمة الكتابة بالتسلسل إلى ملف بترتيب تصاعدي للمعرفات. يكتب أولاً الدفق الأول ، ثم الثاني ، ثم الثالث وهكذا.

للقيام بذلك ، قدمت مدير موضوع آخر. كان من الممكن أن نتغلب على الشيء الرئيسي ، لكني مسرور جدًا لإنشاء هؤلاء العمال المعزولين وبناء التواصل من خلال الرسائل. قبل البدء في كتابة تنفيذ Stream-Manager ، تحتاج إلى إنشاء قناة اتصال بينه وبين الكتاب والعاملين. تم استخدام فئة MessageChannel لهذا الغرض. تحتوي مثيلات هذه الفئة على حقلين: port1 و port2 ، يمكن لكل منهما الاستماع وإرسال الرسائل إلى الآخر باستخدام أساليب .on ('message') و .postMessage () . تم إنشاء هذا الفصل كجزء من وحدة "worker_threads" للتواصل بين سلاسل المحادثات ، لأنه عادةً عندما يتم نقل كائن ما ، يتم استنساخه ببساطة ، وفي بيئة تنفيذ سلسلة عمليات معزولة ، سيكون عديم الفائدة.

للتواصل بين تدفقين ، يجب أن نعطي الجميع منفذًا.

حقيقة مثيرة للاهتمام : في 10.5.0 من المستحيل تمرير المنفذ من خلال منشئ العامل ، لا تحتاج إلى القيام بذلك إلا من خلال worker.postMessage () ، وتأكد من تحديد المنفذ في معلمة transferList!

سيرسل مدير سلاسل الرسائل نفسه أوامر إلى سلاسل رسائل الكاتب بترتيب تصاعدي لمعرفاتهم ، وسوف يرسل الأمر التالي فقط بعد تلقي استجابة من الكاتب حول العملية الناجحة.

رسم تخطيطي للتطبيق تحت UML:


ملف ./app.js الرئيسي الذي تم تغييره:
 const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Main app initialized and started.'); const workersMeta = []; for (var i = 1; i <= WORKERS_NUMBER; i++) { const channel = new MessageChannel(); const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); workersMeta.push({ id: i, worker, channel }); } workersMeta.forEach(({ worker, channel }) => { worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]); }) setTimeout(() => { const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js')); const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 })); orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port)); console.log('All worker threads have been initialized'); }, WORKERS_NUMBER * 10); 

هنا نقوم أولاً بإنشاء العمال ، ثم نرسل لكل منفذ منفذًا للتواصل مع المدير (وبهذه الطريقة فقط ، من خلال المنشئ ، من المستحيل القيام بذلك).

ثم نقوم بإنشاء مدير سلسلة ، ونرسل إليه قائمة بالمنافذ للتواصل مع تدفقات الكاتب.
تحديث : تجريبيًا ، اكتشفت أنه عند العمل مع التدفقات ، من الأفضل السماح لهم بالتخمير أولاً (التهيئة حسب الحاجة). من أجل الخير كان من الضروري الاستماع إلى بعض الإجابات من التدفقات بأسلوب "أنا مستعد!" ، لكنني قررت أن أسلك الطريق الأسهل.

سنقوم أيضًا بتغيير سلوك مؤشر ترابط الكاتب بحيث يرسل رسالة فقط عندما يتم إخباره بها ، ويعيد أيضًا النتيجة عند اكتمال عملية الكتابة:
./writer-worer-app/app.js
 const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); parentPort.on('message', value => { const orchestratorPort = value.orchestratorPort; orchestratorPort.on('message', data => { if (data.command == 'write') { console.log(`Worker ${id} received write command`); sendMessage(); sendResult(orchestratorPort); } }); console.log(`Worker ${id} started.`); }); function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } function sendResult(port) { port.postMessage({ id, status: 'completed' }); } 

لقد تمت تهيئته بشكل صحيح من رسالة الدفق الرئيسي ، وبدأنا يحدث قناة مدير الدفق ، وعندما نتلقى الأمر ، نكتب أولاً إلى الملف ، ثم نرسل النتيجة. وتجدر الإشارة إلى أن الملف مكتوب بشكل متزامن ، لذلك يتم استدعاء sendResult () مباشرة بعد sendMessage ().

كل ما تبقى هو كتابة تنفيذ مديرنا الذكي
./orchestrator-worker-app/app.js:
 const { parentPort } = require('worker_threads'); console.log('Orchestrator initialized.') let workerPorts; parentPort.on('message', (value) => { workerPorts = value.workerPorts; workerPorts.forEach(wp => wp.port.on('message', handleResponse)); console.log('Orchestrator started.'); sendCommand(workerPorts[0]); }); function handleResponse(status) { const responseWorkerId = status.id; let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1); if (!nextWorker) { nextWorker = workerPorts[0]; } sendCommand(nextWorker); } function sendCommand(worker) { worker.port.postMessage({ command: 'write' }); } 

لقد حصلنا على قائمة بالمنافذ ، مرتبة ، لكل منفذ تعيين رد اتصال على الرد ، حسنًا ، وأرسلنا الأمر إلى الأول. في الاستدعاء نفسه ، نبحث عن الكاتب التالي ونرسل إليه أمرًا. لكي لا تجهد النظام كثيرًا ، تم تحديد الفاصل الزمني بين الفرق.

هذا كل شيء ، تطبيق إدارة سلاسل المحادثات المتعددة جاهز. تعلمنا ليس فقط لتوليد تدفقات العمال في Node.JS ، ولكن أيضًا لخلق طرق فعالة للتواصل بينهما. في رأيي الشخصي ، فإن بنية الخيوط المعزولة في Node.JS مع الانتظار وإرسال الرسائل أكثر من مريحة واعدة. شكرا لكم جميعا على اهتمامكم.

يمكن العثور على جميع التعليمات البرمجية المصدر هنا .

تحديث


من أجل عدم تضليل القراء ، وأيضًا لعدم إعطاء أسباب غير ضرورية للكتابة بأنني أغش مع انتهاء المهلة ، قمت بتحديث المقالة والمستودع.
التغييرات:
1) تتم إزالة الفواصل الزمنية في الكتاب الأصليين ، الآن بينما (صحيح) يذهب على طول المتشددين
2) أضاف - max-old-space-size = 4096 flag ، فقط في حالة ، لأن التنفيذ الحالي للتيارات غير مستقر للغاية وآمل أن يساعد هذا بطريقة أو بأخرى.
3) تم حذف الفواصل الزمنية لإرسال الرسائل من مدير سلسلة الرسائل. الآن التسجيل بدون توقف.
4) تمت إضافة مهلة عند تهيئة المدير ، لماذا - موصوف أعلاه.

للقيام بما يلي:
1) إضافة رسائل ذات طول متغير أو احتساب مكالمة المسجل - شكرا FANAT1242
2) إضافة معيار ، مقارنة عمل الإصدارين الأول والثاني (كم عدد الأسطر التي سيتم كتابتها في 10 ثوانٍ ، على سبيل المثال)

تحديث 2


1) تم تغيير رمز التسجيل: الآن كل رسالة لها طول مختلف.
2) تم تغيير Writer-worker-app / app.old.js: يكتب كل موضوع 1000 مرة ، ثم ينتهي.

تم ذلك لاختبار أفكار مستخدم FANAT1242. الرسائل كلها لا تعيد كتابة بعضها البعض ، الأسطر في الملف هي بالضبط 1000 * سلاسل.

Source: https://habr.com/ru/post/ar416015/


All Articles