Node.js تيارات للدمى أو كيفية العمل مع تيارات

أعتقد أن الكثيرين قد سمعوا عن Node js Streams أكثر من مرة ، لكنهم لم يستخدموها أبدًا ، أو استخدموها دون التفكير في كيفية عملهم ، وأنابيب البث والقواعد. دعنا نتعرف على ماهية التدفقات ، الأنابيب (الأنابيب) ، القطع (القطعة - جزء من البيانات) وكل ذلك))



لماذا من المهم أن نفهم كيف تعمل التدفقات في Node js؟ الجواب بسيط: تقوم العديد من الوحدات المضمنة في Node js بتنفيذ التدفقات ، مثل طلبات HTTP / استجابات ، و fs read / write ، و zlib ، و crypto ، ومآخذ TCP وغيرها. ستحتاج أيضًا إلى تدفقات ، على سبيل المثال ، عند معالجة الملفات الكبيرة ، عند العمل مع الصور. قد لا تكتب الدفق الخاص بك ، ولكن فهم كيف يعمل سيجعلك مطورًا أكثر كفاءة.

لذلك ، ما هو بالضبط دفق (المشار إليها فيما يلي سأستخدم بدلاً من الدفق (دفق)). Stream هو مفهوم يمكنك من خلاله معالجة البيانات في أجزاء صغيرة ، مما يسمح لك باستخدام كمية صغيرة من ذاكرة الوصول العشوائي. أيضًا ، بمساعدتها ، يمكننا تقسيم معالجة كل جزء إلى وحدات مستقلة عن بعضها البعض (وظائف أو فئات). على سبيل المثال ، يمكننا ضغط جزء من البيانات على الفور ، ثم تشفير أحد الملفات والكتابة إليه. الفكرة الرئيسية ليست العمل مع البيانات بأكملها ، ولكن معالجة جزء من البيانات واحدًا تلو الآخر.

هناك 4 أنواع من التدفقات في Node js:

  • قابلة للقراءة - القراءة
  • الكتابة - الكتابة
  • الطباعة على الوجهين - القراءة والكتابة
  • تحويل - نوع من الدفق المزدوج الذي يمكن تعديل البيانات

يمكنك العثور على مزيد من المعلومات المفصلة على الموقع الرسمي ، والآن دعنا ننتقل إلى الممارسة.

مثال بسيط


أعتقد أن الكثيرين قد استخدموا بالفعل تيارات دون أن يدركوا ذلك. في هذا المثال ، نرسل ببساطة الملف إلى العميل.

// 1 - ( )      ,         const getFile = async (req, res, next) => { const fileStream = fs.createReadStream('path to file'); res.contentType('application/pdf'); fileStream.pipe(res); }; // 2 - (  )         const getFile = async (req, res, next) => { const file = fs.readFileSync('path to file'); res.contentType('application/pdf'); res.send(file); }; 

الفرق الوحيد هو أنه في الحالة الأولى ، نقوم بتنزيل جزء من الملف وإرساله ، وبالتالي ، لا يتم تحميل ذاكرة الوصول العشوائي للخادم. في الحالة الثانية ، نقوم على الفور بتحميل الملف بأكمله إلى ذاكرة الوصول العشوائي (RAM) ثم إرساله بعد ذلك.

علاوة على ذلك ، سوف نحلل كل تيار بشكل منفصل. يمكنك إنشاء دفق باستخدام الوراثة أو باستخدام دالة المنشئ.

 const { Readable } = require('stream'); // 1 -   const myReadable = new Readable(opt); // 2 -   class myReadable extends Readable { constructor(opt) { super(opt); } } 

في جميع الأمثلة ، سأستخدم طريقة 2.

تيار مقروء


دعونا نلقي نظرة على كيفية إنشاء دفق قابل للقراءة في NodeJS.

 const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} } 

كما ترى من المثال أعلاه ، فإن هذه الفئة تقبل مجموعة من المعلمات. سنأخذ في الاعتبار فقط ما هو ضروري لفهم عام للتيار المقروء ، والباقي الذي يمكنك رؤيته في الوثائق. نحن مهتمون بالمعلمة highWaterMark وطريقة _read.

highWaterMark هو الحد الأقصى لعدد بايتات المخزن المؤقت للتيار الداخلي (افتراضياً 16 كيلو بايت) عند الوصول إلى القراءة من المورد. من أجل مواصلة القراءة ، نحتاج إلى تحرير المخزن المؤقت الداخلي. يمكننا القيام بذلك عن طريق استدعاء توجيه الإخراج أو أساليب الاستئناف أو عن طريق الاشتراك في حدث البيانات.

_read هو تطبيق لطريقة خاصة تسمى بواسطة الطرق الداخلية للفئة القابلة للقراءة. يتم استدعاؤه باستمرار حتى يصل حجم البيانات إلى highWaterMark.

حسنًا ، الطريقة الأخيرة التي تهمنا هي readable.push ، فهي تضيف البيانات مباشرةً إلى المخزن المؤقت الداخلي. يتم إرجاع true ، ولكن بمجرد امتلاء المخزن المؤقت ، ستبدأ المكالمة على هذه الطريقة في إرجاع false. يمكن التحكم فيه بواسطة طريقة القراءة.

دعونا الآن نرى مثالا لتوضيح الوضع.

 class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`); 

بادئ ذي بدء ، سأقول أن counter.read () ليس هو _read الذي قمنا بتطبيقه في الفصل. هذه الطريقة خاصة ، وهذه طريقة عامة ، وتقوم بإرجاع البيانات من المخزن المؤقت الداخلي. عندما نقوم بتنفيذ هذا الرمز ، في وحدة التحكم ، سنرى ما يلي:



ماذا حدث هنا؟ عند إنشاء دفق Counter ({highWaterMark: 2}) الجديد ، أشرنا إلى أن حجم المخزن المؤقت الداخلي لدينا سيكون 2 بايت ، أي يمكن تخزين 2 أحرف (حرف واحد = 1 بايت). بعد استدعاء counter.read () ، يبدأ الدفق في القراءة ، ويكتب "1" إلى المخزن المؤقت الداخلي ويعيده. ثم يواصل القراءة ، يكتب "2". عندما يكتب "3" ، سيكون المخزن المؤقت ممتلئًا ، وسيعود push إلى الخطأ ، وسينتظر الدفق حتى يتم تحرير المخزن المؤقت الداخلي. لأن في مثالنا ، لا يوجد منطق لتحرير المخزن المؤقت ، سينتهي البرنامج النصي.

كما ذكرنا سابقًا ، لضمان عدم مقاطعة القراءة ، نحتاج إلى مسح المخزن المؤقت الداخلي باستمرار. للقيام بذلك ، نحن نشترك في حدث البيانات. استبدل آخر سطرين بالرمز التالي.

 const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); }); 

الآن إذا قمنا بتشغيل هذا المثال ، فسنرى أن كل شيء يعمل بالشكل الذي يجب أن يتم به ويتم عرض الأرقام من 1 إلى 1000 في وحدة التحكم.

تيار مكتوب


في الواقع ، إنه يشبه إلى حد كبير دفق قابل للقراءة ، مخصص فقط لكتابة البيانات.

 const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} } 

يقبل معلمات مماثلة ، مثل Readable Stream. نحن مهتمون في highWaterMark و _write.

_write هي طريقة خاصة تسمى بواسطة الطرق الداخلية للفئة Writable لكتابة جزء من البيانات. يستغرق 3 معلمات: قطعة (جزء من البيانات) ، ترميز (ترميز إذا كانت قطعة سلسلة) ، رد الاتصال (وظيفة تسمى بعد كتابة ناجحة أو غير ناجحة).

highWaterMark هو الحد الأقصى لعدد بايتات المخزن المؤقت للتيار الداخلي (افتراضياً 16 كيلو بايت) ، عند الوصول إلى أي ستستمر دالة stream.write في إرجاع خطأ.

دعنا نعيد كتابة المثال السابق باستخدام عداد.

 const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); } 

في الواقع ، كل شيء بسيط ، ولكن هناك فارق بسيط واحد يستحق التذكر! عند إنشاء دفق Counter ({highWaterMark: 2}) الجديد ، أشرنا إلى أن حجم المخزن المؤقت الداخلي لدينا سيكون 2 بايت ، أي يمكن تخزين 2 أحرف (حرف واحد = 1 بايت). عندما يصل العداد إلى عشرة ، سيتم ملء المخزن المؤقت بكل مكالمة للكتابة ؛ وبالتالي ، إذا كانت الكتابة إلى مصدر بطيء ، فسيتم حفظ جميع البيانات الأخرى على ذاكرة الوصول العشوائي عند الاتصال بالكتابة ، مما قد يتسبب في حدوث تجاوزات (في هذا المثال ، بالطبع لا يهم ، نظرًا لأن المخزن المؤقت لدينا هو وحدتي بايت ، ولكن مع الملفات الكبيرة تحتاج إلى تذكر ذلك). عند ظهور مثل هذا الموقف ، نحتاج إلى الانتظار حتى يكتب الدفق الجزء الحالي من البيانات ، ويصدر المخزن المؤقت الداخلي (يؤدي إلى حدث التصريف) ، وبعد ذلك يمكننا استئناف تسجيل البيانات. دعنا نعيد كتابة مثالنا.

 const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })(); 

تمت إضافة طريقة events.once في الإصدار 11.13.0 وتسمح لك بإنشاء وعد وانتظر تنفيذ حدث معين مرة واحدة. في هذا المثال ، نتحقق مما إذا كان يمكن كتابة البيانات إلى البث ، إن لم يكن ، فانتظر حتى يتم تحرير المخزن المؤقت ومتابعة التسجيل.

للوهلة الأولى ، قد يبدو هذا كأنه إجراء غير ضروري ، ولكن عند التعامل مع كميات كبيرة من البيانات ، على سبيل المثال الملفات التي يزيد وزنها عن 10 غيغابايت ، إذا نسيت القيام بذلك ، فقد تواجه تسربًا للذاكرة.

تيار مزدوج


فهو يجمع بين التدفقات القابلة للقراءة والكتابة ، أي أنه يجب علينا كتابة تطبيق لطريقتي _read و _write.

 const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} } 

نحن هنا مهتمون بمعلمتين يمكننا تمريرهما إلى المُنشئ ، وهما قابلتان قابلتان قراءتهما HighWaterMark و writableHighWaterMark ، مما يسمح لنا بتحديد حجم المخزن المؤقت الداخلي للتدفقات القابلة للقراءة والكتابة ، على التوالي. هذه هي الطريقة التي سيبدو بها تنفيذ المثالين السابقين بمساعدة دفق الطباعة على الوجهين.

 const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })(); 

أعتقد أن هذا الرمز لا يحتاج إلى تفسير ، لأنه هو نفسه كما كان من قبل ، فقط في فصل واحد.

تحويل تيار


هذا الدفق هو دفق مزدوج. هناك حاجة لتحويل جزء من البيانات وإرسالها إلى أسفل السلسلة. يمكن تنفيذه بنفس طريقة تنفيذ بقية الدفق.

 const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} } 

نحن مهتمون بأسلوب التحويل.

_transform هي طريقة خاصة يتم استدعاؤها بواسطة الطرق الداخلية لفئة التحويل لتحويل جزء كبير من البيانات. يستغرق 3 معلمات: قطعة (جزء من البيانات) ، ترميز (ترميز إذا كانت قطعة سلسلة) ، رد الاتصال (وظيفة تسمى بعد كتابة ناجحة أو غير ناجحة).

باستخدام هذه الطريقة ، سيحدث تغيير في جزء البيانات. داخل هذه الطريقة ، يمكن أن نطلق على convert.push () صفر أو أكثر من المرات ، والذي يرتكب التغييرات. عندما ننتهي من تحويل البيانات ، نحتاج إلى استدعاء رد اتصال ، والذي سيرسل كل شيء أضفناه إلى convert.push (). المعلمة الأولى من وظيفة رد الاتصال هذه هي خطأ. أيضًا ، لا يمكننا استخدام transform.push () ، لكننا نرسل البيانات التي تم تغييرها كمعلمة ثانية إلى وظيفة رد الاتصال (على سبيل المثال: رد الاتصال (خالية ، بيانات)). لفهم كيفية استخدام هذا النوع من الدفق ، دعنا نحلل طريقة التدفق.

stream.pipe - تُستخدم هذه الطريقة لتوصيل الدفق القابل للقراءة بالدفق القابل للكتابة ، وكذلك لإنشاء سلاسل الدفق. هذا يعني أنه يمكننا قراءة جزء من البيانات ونقلها إلى الدفق التالي للمعالجة ، ثم إلى التالي ، إلخ.

دعنا نكتب تيار تحويل يضيف الحرف * إلى بداية ونهاية كل جزء من البيانات.

 class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter); 

في هذا المثال ، استخدمت التدفقات القابلة للقراءة والكتابة من الأمثلة السابقة ، وأضفت أيضًا التحويل. كما ترون ، اتضح أنها بسيطة للغاية.

لذلك نظرنا إلى كيفية ترتيب الجداول. المفهوم الرئيسي هو معالجة البيانات في أجزاء ، والتي هي مريحة للغاية ولا تتطلب موارد كبيرة. أيضًا ، يمكن استخدام التدفقات مع التكرارات ، مما يجعلها أكثر ملاءمة للاستخدام ، لكن هذه قصة مختلفة تمامًا.

Source: https://habr.com/ru/post/ar479048/


All Articles