أعتقد أن الكثيرين قد سمعوا عن Node js Streams أكثر من مرة ، لكنهم لم يستخدموها أبدًا ، أو استخدموها دون التفكير في كيفية عملهم ، وأنابيب البث والقواعد. دعنا نتعرف على ماهية التدفقات ، الأنابيب (الأنابيب) ، القطع (القطعة - جزء من البيانات) وكل ذلك))

لماذا من المهم أن نفهم كيف تعمل التدفقات في Node js؟ الجواب بسيط: تقوم العديد من الوحدات المضمنة في Node js بتنفيذ التدفقات ، مثل طلبات HTTP / استجابات ، و fs read / write ، و zlib ، و crypto ، ومآخذ TCP وغيرها. ستحتاج أيضًا إلى تدفقات ، على سبيل المثال ، عند معالجة الملفات الكبيرة ، عند العمل مع الصور. قد لا تكتب الدفق الخاص بك ، ولكن فهم كيف يعمل سيجعلك مطورًا أكثر كفاءة.
لذلك ، ما هو بالضبط دفق (المشار إليها فيما يلي سأستخدم بدلاً من الدفق (دفق)). Stream هو مفهوم يمكنك من خلاله معالجة البيانات في أجزاء صغيرة ، مما يسمح لك باستخدام كمية صغيرة من ذاكرة الوصول العشوائي. أيضًا ، بمساعدتها ، يمكننا تقسيم معالجة كل جزء إلى وحدات مستقلة عن بعضها البعض (وظائف أو فئات). على سبيل المثال ، يمكننا ضغط جزء من البيانات على الفور ، ثم تشفير أحد الملفات والكتابة إليه. الفكرة الرئيسية ليست العمل مع البيانات بأكملها ، ولكن معالجة جزء من البيانات واحدًا تلو الآخر.
هناك 4 أنواع من التدفقات في Node js:
- قابلة للقراءة - القراءة
- الكتابة - الكتابة
- الطباعة على الوجهين - القراءة والكتابة
- تحويل - نوع من الدفق المزدوج الذي يمكن تعديل البيانات
يمكنك العثور على مزيد من المعلومات المفصلة على الموقع الرسمي ، والآن دعنا ننتقل إلى الممارسة.
مثال بسيط
أعتقد أن الكثيرين قد استخدموا بالفعل تيارات دون أن يدركوا ذلك. في هذا المثال ، نرسل ببساطة الملف إلى العميل.
الفرق الوحيد هو أنه في الحالة الأولى ، نقوم بتنزيل جزء من الملف وإرساله ، وبالتالي ، لا يتم تحميل ذاكرة الوصول العشوائي للخادم. في الحالة الثانية ، نقوم على الفور بتحميل الملف بأكمله إلى ذاكرة الوصول العشوائي (RAM) ثم إرساله بعد ذلك.
علاوة على ذلك ، سوف نحلل كل تيار بشكل منفصل. يمكنك إنشاء دفق باستخدام الوراثة أو باستخدام دالة المنشئ.
const { Readable } = require('stream');
في جميع الأمثلة ، سأستخدم طريقة 2.
تيار مقروء
دعونا نلقي نظرة على كيفية إنشاء دفق قابل للقراءة في NodeJS.
const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} }
كما ترى من المثال أعلاه ، فإن هذه الفئة تقبل مجموعة من المعلمات. سنأخذ في الاعتبار فقط ما هو ضروري لفهم عام للتيار المقروء ، والباقي الذي يمكنك رؤيته في الوثائق. نحن مهتمون بالمعلمة highWaterMark وطريقة _read.
highWaterMark هو الحد الأقصى لعدد بايتات المخزن المؤقت للتيار الداخلي (افتراضياً 16 كيلو بايت) عند الوصول إلى القراءة من المورد. من أجل مواصلة القراءة ، نحتاج إلى تحرير المخزن المؤقت الداخلي. يمكننا القيام بذلك عن طريق استدعاء توجيه الإخراج أو أساليب الاستئناف أو عن طريق الاشتراك في حدث البيانات.
_read هو تطبيق لطريقة خاصة تسمى بواسطة الطرق الداخلية للفئة القابلة للقراءة. يتم استدعاؤه باستمرار حتى يصل حجم البيانات إلى highWaterMark.
حسنًا ، الطريقة الأخيرة التي تهمنا هي readable.push ، فهي تضيف البيانات مباشرةً إلى المخزن المؤقت الداخلي. يتم إرجاع true ، ولكن بمجرد امتلاء المخزن المؤقت ، ستبدأ المكالمة على هذه الطريقة في إرجاع false. يمكن التحكم فيه بواسطة طريقة القراءة.
دعونا الآن نرى مثالا لتوضيح الوضع.
class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`);
بادئ ذي بدء ، سأقول أن counter.read () ليس هو _read الذي قمنا بتطبيقه في الفصل. هذه الطريقة خاصة ، وهذه طريقة عامة ، وتقوم بإرجاع البيانات من المخزن المؤقت الداخلي. عندما نقوم بتنفيذ هذا الرمز ، في وحدة التحكم ، سنرى ما يلي:

ماذا حدث هنا؟ عند إنشاء دفق Counter ({highWaterMark: 2}) الجديد ، أشرنا إلى أن حجم المخزن المؤقت الداخلي لدينا سيكون 2 بايت ، أي يمكن تخزين 2 أحرف (حرف واحد = 1 بايت). بعد استدعاء counter.read () ، يبدأ الدفق في القراءة ، ويكتب "1" إلى المخزن المؤقت الداخلي ويعيده. ثم يواصل القراءة ، يكتب "2". عندما يكتب "3" ، سيكون المخزن المؤقت ممتلئًا ، وسيعود push إلى الخطأ ، وسينتظر الدفق حتى يتم تحرير المخزن المؤقت الداخلي. لأن في مثالنا ، لا يوجد منطق لتحرير المخزن المؤقت ، سينتهي البرنامج النصي.
كما ذكرنا سابقًا ، لضمان عدم مقاطعة القراءة ، نحتاج إلى مسح المخزن المؤقت الداخلي باستمرار. للقيام بذلك ، نحن نشترك في حدث البيانات. استبدل آخر سطرين بالرمز التالي.
const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); });
الآن إذا قمنا بتشغيل هذا المثال ، فسنرى أن كل شيء يعمل بالشكل الذي يجب أن يتم به ويتم عرض الأرقام من 1 إلى 1000 في وحدة التحكم.
تيار مكتوب
في الواقع ، إنه يشبه إلى حد كبير دفق قابل للقراءة ، مخصص فقط لكتابة البيانات.
const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} }
يقبل معلمات مماثلة ، مثل Readable Stream. نحن مهتمون في highWaterMark و _write.
_write هي طريقة خاصة تسمى بواسطة الطرق الداخلية للفئة Writable لكتابة جزء من البيانات. يستغرق 3 معلمات: قطعة (جزء من البيانات) ، ترميز (ترميز إذا كانت قطعة سلسلة) ، رد الاتصال (وظيفة تسمى بعد كتابة ناجحة أو غير ناجحة).
highWaterMark هو الحد الأقصى لعدد بايتات المخزن المؤقت للتيار الداخلي (افتراضياً 16 كيلو بايت) ، عند الوصول إلى أي ستستمر دالة stream.write في إرجاع خطأ.
دعنا نعيد كتابة المثال السابق باستخدام عداد.
const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); }
في الواقع ، كل شيء بسيط ، ولكن هناك فارق بسيط واحد يستحق التذكر! عند إنشاء دفق Counter ({highWaterMark: 2}) الجديد ، أشرنا إلى أن حجم المخزن المؤقت الداخلي لدينا سيكون 2 بايت ، أي يمكن تخزين 2 أحرف (حرف واحد = 1 بايت). عندما يصل العداد إلى عشرة ، سيتم ملء المخزن المؤقت بكل مكالمة للكتابة ؛ وبالتالي ، إذا كانت الكتابة إلى مصدر بطيء ، فسيتم حفظ جميع البيانات الأخرى على ذاكرة الوصول العشوائي عند الاتصال بالكتابة ، مما قد يتسبب في حدوث تجاوزات (في هذا المثال ، بالطبع لا يهم ، نظرًا لأن المخزن المؤقت لدينا هو وحدتي بايت ، ولكن مع الملفات الكبيرة تحتاج إلى تذكر ذلك). عند ظهور مثل هذا الموقف ، نحتاج إلى الانتظار حتى يكتب الدفق الجزء الحالي من البيانات ، ويصدر المخزن المؤقت الداخلي (يؤدي إلى حدث التصريف) ، وبعد ذلك يمكننا استئناف تسجيل البيانات. دعنا نعيد كتابة مثالنا.
const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })();
تمت إضافة طريقة events.once في الإصدار 11.13.0 وتسمح لك بإنشاء وعد وانتظر تنفيذ حدث معين مرة واحدة. في هذا المثال ، نتحقق مما إذا كان يمكن كتابة البيانات إلى البث ، إن لم يكن ، فانتظر حتى يتم تحرير المخزن المؤقت ومتابعة التسجيل.
للوهلة الأولى ، قد يبدو هذا كأنه إجراء غير ضروري ، ولكن عند التعامل مع كميات كبيرة من البيانات ، على سبيل المثال الملفات التي يزيد وزنها عن 10 غيغابايت ، إذا نسيت القيام بذلك ، فقد تواجه تسربًا للذاكرة.
تيار مزدوج
فهو يجمع بين التدفقات القابلة للقراءة والكتابة ، أي أنه يجب علينا كتابة تطبيق لطريقتي _read و _write.
const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} }
نحن هنا مهتمون بمعلمتين يمكننا تمريرهما إلى المُنشئ ، وهما قابلتان قابلتان قراءتهما HighWaterMark و writableHighWaterMark ، مما يسمح لنا بتحديد حجم المخزن المؤقت الداخلي للتدفقات القابلة للقراءة والكتابة ، على التوالي. هذه هي الطريقة التي سيبدو بها تنفيذ المثالين السابقين بمساعدة دفق الطباعة على الوجهين.
const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })();
أعتقد أن هذا الرمز لا يحتاج إلى تفسير ، لأنه هو نفسه كما كان من قبل ، فقط في فصل واحد.
تحويل تيار
هذا الدفق هو دفق مزدوج. هناك حاجة لتحويل جزء من البيانات وإرسالها إلى أسفل السلسلة. يمكن تنفيذه بنفس طريقة تنفيذ بقية الدفق.
const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} }
نحن مهتمون بأسلوب التحويل.
_transform هي طريقة خاصة يتم استدعاؤها بواسطة الطرق الداخلية لفئة التحويل لتحويل جزء كبير من البيانات. يستغرق 3 معلمات: قطعة (جزء من البيانات) ، ترميز (ترميز إذا كانت قطعة سلسلة) ، رد الاتصال (وظيفة تسمى بعد كتابة ناجحة أو غير ناجحة).
باستخدام هذه الطريقة ، سيحدث تغيير في جزء البيانات. داخل هذه الطريقة ، يمكن أن نطلق على convert.push () صفر أو أكثر من المرات ، والذي يرتكب التغييرات. عندما ننتهي من تحويل البيانات ، نحتاج إلى استدعاء رد اتصال ، والذي سيرسل كل شيء أضفناه إلى convert.push (). المعلمة الأولى من وظيفة رد الاتصال هذه هي خطأ. أيضًا ، لا يمكننا استخدام transform.push () ، لكننا نرسل البيانات التي تم تغييرها كمعلمة ثانية إلى وظيفة رد الاتصال (على سبيل المثال: رد الاتصال (خالية ، بيانات)). لفهم كيفية استخدام هذا النوع من الدفق ، دعنا نحلل طريقة التدفق.
stream.pipe - تُستخدم هذه الطريقة لتوصيل الدفق القابل للقراءة بالدفق القابل للكتابة ، وكذلك لإنشاء سلاسل الدفق. هذا يعني أنه يمكننا قراءة جزء من البيانات ونقلها إلى الدفق التالي للمعالجة ، ثم إلى التالي ، إلخ.
دعنا نكتب تيار تحويل يضيف الحرف * إلى بداية ونهاية كل جزء من البيانات.
class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter);
في هذا المثال ، استخدمت التدفقات القابلة للقراءة والكتابة من الأمثلة السابقة ، وأضفت أيضًا التحويل. كما ترون ، اتضح أنها بسيطة للغاية.
لذلك نظرنا إلى كيفية ترتيب الجداول. المفهوم الرئيسي هو معالجة البيانات في أجزاء ، والتي هي مريحة للغاية ولا تتطلب موارد كبيرة. أيضًا ، يمكن استخدام التدفقات مع التكرارات ، مما يجعلها أكثر ملاءمة للاستخدام ، لكن هذه قصة مختلفة تمامًا.