مرحبا القارئ. لقد مر الكثير من الوقت منذ إصدار .NET Core 2.1. وهذه الابتكارات الرائعة مثل Span and Memory معروفة بالفعل على نطاق واسع ، يمكنك قراءتها ورؤيتها وسماع الكثير عنها. ومع ذلك ، للأسف ، مكتبة تسمى System.IO. لم تحصل Pipeslines على نفس الاهتمام. كل ما يوجد تقريبًا حول هذا الموضوع هو المنشور
الوحيد الذي تمت ترجمته ونسخه على العديد من الموارد. يجب أن يكون هناك مزيد من المعلومات حول هذه التكنولوجيا للنظر إليها من زوايا مختلفة.

مقدمة
لذلك ، تهدف هذه المكتبة إلى تسريع معالجة البيانات المتدفقة. تم إنشاؤه واستخدامه في الأصل بواسطة فريق التطوير التابع لـ Kestrel (خادم ويب مشترك عبر النظام الأساسي لـ ASP.NET Core) ، ولكنه متوفر حاليًا للبشر من خلال
حزمة nuget .
قبل الخوض في الموضوع ، يمكننا أن نتخيل آلية المكتبة كتماثل محسّن لـ MemoryStream. المشكلة في MemoryStream الأصلي هي عدد كبير من النسخ ، وهو أمر واضح إذا كنت تتذكر أنه يتم استخدام صفيف بايت خاص داخل MemoryStream كمخزن مؤقت. على سبيل المثال ، في أساليب
القراءة والكتابة ، يمكنك رؤية نسخ البيانات بوضوح. وبالتالي ، بالنسبة للكائن الذي نريد الكتابة إليه ، سيتم إنشاء نسخة في المخزن المؤقت الداخلي ، وخلال القراءة ، سيتم إرجاع نسخة من النسخة الداخلية إلى المستهلك. يبدو أنه ليس الاستخدام الأكثر عقلانية للذاكرة.
لا تهدف System.IO.Pipelines إلى استبدال كافة التدفقات ، بل هي أداة إضافية في ترسانة مطور يقوم بكتابة كود عالي الأداء. أقترح عليك أن تتعرف على الأساليب والفصول الأساسية ، وأن ترى تفاصيل تنفيذها وتحلل الأمثلة الأساسية.
لنبدأ مع التفاصيل الداخلية والتنفيذ ، في نفس الوقت ننظر إلى أجزاء التعليمات البرمجية البسيطة. بعد ذلك ، سوف يصبح من الواضح كيف يعمل وكيف يجب استخدامه. عند العمل مع System.IO.Pipelines ، تجدر الإشارة إلى أن المفهوم الأساسي هو أن جميع عمليات القراءة والكتابة يجب أن تتم دون تخصيصات إضافية. لكن بعض الأساليب الجذابة للوهلة الأولى تتناقض مع هذه القاعدة. وفقًا لذلك ، تبدأ التعليمات البرمجية التي تحاول تسريعها بشدة في تخصيص ذاكرة للبيانات الجديدة والجديدة ، وتحميل أداة تجميع مجمعي البيانات المهملة.
تستخدم الأجزاء الداخلية للمكتبة أكبر إمكانيات لأحدث إصدارات اللغة ووقت التشغيل - Span ، Memory ، تجمعات الكائنات ، ValueTask وما إلى ذلك. يجدر البحث هناك على الأقل للحصول على مثال رائع لاستخدام هذه الميزات في الإنتاج.
في وقت واحد ، لم يكن بعض المطورين راضين عن تطبيق التدفقات في C # ، لأنه تم استخدام فصل واحد للقراءة والكتابة. لكن كما يقولون ، لا يمكنك رمي الأساليب خارج الفصل. حتى إذا كان الدفق لا يدعم القراءة / الكتابة / البحث ، يتم استخدام خصائص CanRead و CanWrite و CanSeek. يبدو وكأنه عكاز صغير. ولكن الآن أصبحت الأمور مختلفة.
للعمل مع خطوط الأنابيب ، يتم استخدام
فئتين :
PipeWriter و
PipeReader . تحتوي هذه الفئات على ما يقرب من 50 سطرًا من التعليمات البرمجية وهي واجهات زائفة (ليست الأكثر تقليدية في تجسيدها ، حيث إنها تخفي فئة واحدة ، وليس الكثير) لفئة
الأنابيب ، والتي تحتوي على كل المنطق الأساسي للعمل مع البيانات. تحتوي هذه الفئة على 5 أعضاء عموميين: مُنشئان ، وخاصيتان فقط للحصول على - القارئ والكاتب ، طريقة إعادة التعيين () ، التي تعيد تعيين الحقول الداخلية إلى حالتها الأولية بحيث يمكن إعادة استخدام الفئة. أساليب العمل المتبقية داخلية وتسمى باستخدام واجهات زائفة.
دعنا نبدأ مع فئة الأنابيب
يشغل مثيل الفئة 320 بايت ، وهو كثير جدًا (ثلث كيلوبايت تقريبًا ، لا يمكن احتواء 2 من هذه الكائنات في ذاكرة Manchester Mark I). لذلك تخصيص كمية كبيرة من الحالات هو فكرة سيئة. علاوة على ذلك ، فإن الغرض من هذا الغرض هو الاستخدام طويل الأجل. استخدام تجمعات يجعل حجة لهذا البيان أيضا. ستعيش الكائنات المستخدمة في التجمع إلى الأبد (لتطبيق التجمع الافتراضي).
لاحظ أنه تم وضع علامة على الفصل على أنه مختوم وأنه آمن للخيط - فالكثير من أقسام الكود هي قسم حرج ويتم لفها بأقفال.
لبدء استخدام هذه الفئة ، يجب عليك إنشاء مثيل لفئة Pipe والحصول على كائنات PipeReader و PipeWriter باستخدام الخصائص المذكورة.
تهيئة بسيطةvar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
النظر في أساليب العمل مع الأنابيب:
الكتابة باستخدام PipeWriter - WriteAsync و GetMemory / GetSpan و Advance و FlushAsync و Complete و CancelPendingFlush و OnReaderCompleted.
القراءة باستخدام PipeReader - AdvanceTo و ReadAsync و TryRead و Complete و CancelPendingRead و OnWriterCompleted.
كما هو مذكور في
المنشور المذكور ، يستخدم الفصل قائمة منفردة مرتبطة من المخازن المؤقتة. ولكن ، من الواضح ، لا يتم نقلها بين PipeReader و PipeWriter - كل المنطق في فئة واحدة. تستخدم هذه القائمة في القراءة والكتابة. علاوة على ذلك ، يتم تخزين البيانات التي تم إرجاعها في هذه القائمة (لذلك لم يتم إجراء نسخ).
أيضًا ، هناك كائنات تشير إلى بداية البيانات المراد قراءتها (ReadHead والفهرس) ، ونهاية البيانات المراد قراءتها (ReadTail والفهرس) وبداية المساحة المراد كتابتها (WriteHead وعدد البايتات المخزنة مؤقتًا المكتوبة). يعد ReadHead و ReadTail و WriteHead أعضاء (شرائح) محددة من القائمة الداخلية للقطاعات ، ويشير الفهرس إلى موضع معين داخل القطاع. وبالتالي ، يمكن أن يبدأ التسجيل من منتصف مقطع ما ، ثم يلتقط مقطعًا كاملاً واحدًا وينتهي في منتصف الجزء الثالث. يتم نقل هذه المؤشرات بطرق مختلفة.
الشروع في استخدام أساليب PipeWriter
المذكورة جذابة لأول وهلة الطريقة. لديها توقيع مناسب وعصري للغاية - يقبل ReadOnlyMemory ، غير متزامن. وقد يتم إغراء الكثير منهم ، خاصةً تذكر أن Span and Memory سريعان وباردان. ولكن لا تملق نفسك. كل ما تفعله هذه الطريقة هو نسخ ReadOnlyMemory الذي تم تمريره إليها في القائمة الداخلية. وبعبارة "copy" تعني استدعاء الأسلوب CopyTo () ، وليس فقط نسخ الكائن نفسه. سيتم نسخ جميع البيانات التي نريد تسجيلها ، وبالتالي تحميل الذاكرة. يجب ذكر هذه الطريقة فقط للتأكد من أنه من الأفضل عدم استخدامها. حسنًا ، وربما يكون هذا السلوك مناسبًا لبعض المواقف النادرة.
جسم هذه الطريقة هو قسم حرج ، تتم مزامنة الوصول إليها من خلال شاشة.
ثم قد يطرح السؤال ، كيف تكتب شيئًا ما ، إن لم يكن من خلال الطريقة الأكثر وضوحًا والأكثر ملائمة
تأخذ الطريقة معلمة واحدة من نوع صحيح. في ذلك ، يجب أن نشير إلى عدد البايتات التي نرغب في كتابتها إلى خط الأنابيب (ما حجم المخزن المؤقت الذي نريده). يتحقق هذا الأسلوب مما إذا كانت هناك مساحة كافية للكتابة في جزء الذاكرة الحالي المخزن في _writingHeadMemory. إذا كانت كافية ، يتم إرجاع _writingHeadMemory كذاكرة. خلاف ذلك ، بالنسبة للبيانات المكتوبة إلى المخزن المؤقت ، ولكن لم يتم استدعاء طريقة FlushAsync لها ، يتم استدعاؤها ويتم تخصيص BufferSegment آخر ، وهو مرتبط بالبيانات السابقة (فيما يلي قائمتنا الداخلية). إذا كانت _writingHeadMemory خالية ، فستتم تهيئتها باستخدام BufferSegment جديد. وتخصيص المخزن المؤقت هو قسم حاسم ويتم تحت القفل.
أقترح أن ننظر إلى هذا المثال. للوهلة الأولى ، قد يبدو أن المترجم (أو وقت التشغيل) أفسد الشيطان.
وحشية var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
لكن كل شيء في هذا المثال مفهوم وبسيط.
أثناء إنشاء مثيل Pipe ، يمكننا تمرير كائن
PipeOptions إليه في المُنشئ مع خيارات الإنشاء.
يحتوي PipeOptions على حقل حجم قطعة الحد الأدنى الافتراضي. منذ وقت ليس ببعيد ، كان عام 2048 ، ولكن
هذا الالتزام قام بتحديث هذه القيمة إلى 4096. في وقت كتابة هذا المقال ، كان الإصدار 4096 في حزمة nuget التجريبية ، وكان إصدار الإصدار الأخير بقيمة 2048. وهذا ما يفسر سلوك المثال الأول. إذا كنت تنتقد استخدام حجم أصغر للمخزن المؤقت الافتراضي ، يمكنك تحديده في مثيل نوع PipeOptions.
ولكن في المثال الثاني ، حيث يتم تحديد الحد الأدنى للحجم ، فإن الطول لا يطابقه على أي حال. وهذا يحدث لأن إنشاء BufferSegment جديد يحدث باستخدام تجمعات. أحد الخيارات في PipeOptions هو تجمع الذاكرة. بعد ذلك ، سيتم استخدام التجمع المحدد لإنشاء شريحة جديدة. إذا لم تحدد تجمع الذاكرة ، فسيتم استخدام ArrayPool الافتراضي ، والذي ، كما تعلم ، يحتوي على عدة مجموعات لأحجام مختلفة من المصفوفات (كل واحدة تالية أكبر مرتين من السابقة) وعندما يتم طلبها لبعض الوقت الحجم ، يبحث عن دلو مع صفائف من الحجم المناسب (أي أقرب أو أكبر). وفقًا لذلك ، سيكون المخزن المؤقت الجديد أكبر تقريبًا مما طلبت. الحد الأدنى لحجم الصفيف في ArrayPool الافتراضي (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) هو 16. لكن لا تقلق ، فهذه مجموعة من الصفائف. وفقًا لذلك ، في الغالبية العظمى من الحالات ، لا تمارس المجموعة ضغطًا على جامع البيانات المهملة وسيتم إعادة استخدامها لاحقًا.
يعمل بشكل مشابه ، مع إعطاء سبان من الذاكرة.
وبالتالي GetMemory () أو GetSpan () هي الطرق الرئيسية للكتابة. يعطوننا كائنًا يمكننا الكتابة إليه. للقيام بذلك ، لا نحتاج إلى تخصيص ذاكرة لمصفوفات جديدة من القيم ، يمكننا الكتابة مباشرة في الأنبوب. أي واحد لاستخدامه يعتمد بشكل أساسي على واجهة برمجة التطبيقات التي تستخدمها وطريقة عدم التزامن. ومع ذلك ، في ضوء ما سبق ، يطرح سؤال. كيف سيعرف القارئ كم كتبنا؟ إذا استخدمنا دائمًا تطبيقًا محددًا للتجمع ، والذي يعطي صفيفًا بنفس الحجم المطلوب تمامًا ، عندئذٍ يمكن للقارئ قراءة المخزن المؤقت بالكامل مرة واحدة. ومع ذلك ، كما قلنا بالفعل ، يتم تخصيص مخزن مؤقت لدينا مع وجود احتمال كبير بحجم أكبر. هذا يؤدي إلى الطريقة التالية المطلوبة للتشغيل.
طريقة بسيطة بشكل رهيب. يستغرق عدد البايتات المكتوبة كوسيطة. يقومون بزيادة العدادات الداخلية - _unflushedBytes و _writingHeadBytesBuffered ، الذين تتحدث أسماؤهم عن أنفسهم. يقوم أيضًا باقتطاع (كتابة الشرائح) _writingHeadMemory تمامًا إلى عدد وحدات البايت المكتوبة (باستخدام طريقة الشريحة). لذلك ، بعد استدعاء هذه الطريقة ، تحتاج إلى طلب كتلة ذاكرة جديدة في شكل Memory أو Span ، لا يمكنك الكتابة إلى السابقة. والجسم كله من الطريقة هو قسم حاسم ويعمل تحت قفل.
يبدو أنه بعد هذا يمكن للقارئ تلقي البيانات. ولكن هناك حاجة إلى خطوة أخرى.
تسمى هذه الطريقة بعد أن نكتب البيانات اللازمة إلى الذاكرة المستلمة (GetMemory) وتشير إلى مقدار ما كتبنا هناك (Advance). تقوم الطريقة بإرجاع ValueTask ، ومع ذلك فهي غير متزامنة (على عكس StreamPipeWriter الخاص به). تعد ValueTask نوعًا خاصًا (بنية للقراءة فقط) يستخدم في الحالة التي تكون فيها معظم المكالمات غير متزامنة ، وهذا هو كل البيانات اللازمة ستكون متاحة في وقت دعوتها وستنتهي الطريقة بشكل متزامن. في الداخل نفسه يحتوي إما على بيانات أو مهمة (في حال لم ينجح الأمر بشكل متزامن). ذلك يعتمد على خاصية _writerAwaitable.IsCompleted. إذا بحثنا عن التغييرات التي طرأت على حالة هذا _writerAwaitable ، فسنرى أن هذا يحدث إذا تجاوزت كمية البيانات غير المستهلكة (هذه ليست بالضبط نفس البيانات التي لم يتم فحصها لاحقًا) حدًا معينًا (_pauseWriterThreshold). القيمة الافتراضية هي 16 قطعة الأحجام. إذا رغبت في ذلك ، يمكن تغيير القيمة في PipeOptions. أيضًا ، تبدأ هذه الطريقة في متابعة أسلوب ReadAsync ، إذا تم حظره.
إرجاع FlushResult يحتوي على 2 خصائص - IsCanceled و IsCompleted. يشير IsCanceled إلى ما إذا كان Flush قد تم إلغاؤه (استدعاء CancelPendingFlush ()). يشير IsCompleted إلى ما إذا كان PipeReader قد أكمل (عن طريق استدعاء أساليب Complete () أو CompleteAsync ()).
يتم تنفيذ الجزء الرئيسي من الأسلوب تحت القفل.
الطرق الأخرى لـ PipeWriter ليست مثيرة للاهتمام من وجهة نظر التنفيذ ويتم استخدامها بشكل أقل في كثير من الأحيان ، وبالتالي سيتم تقديم وصف موجز فقط.
# 5 باطلة كاملة (استثناء استثناء = خالية) أو ValueTask CompleteAsync (استثناء استثناء = فارغ)
علامات أنبوب مغلقة للكتابة. سيتم طرح استثناء عند محاولة استخدام أساليب الكتابة بعد الانتهاء. إذا تم إكمال PipeReader بالفعل ، فسيتم أيضًا إكمال مثيل توجيه الإخراج بالكامل. يتم معظم العمل تحت القفل.
# 6 باطلة CancelPendingFlush ()
كما يوحي الاسم ، فإنه يلغي عملية FlushAsync () الحالية. هناك قفل.
# 7 void OnReaderCompleted (رد الفعل <الاستثناء ، الكائن> رد الاتصال ، حالة الكائن)
ينفذ المفوض الذي تم تمريره عند اكتمال القارئ. هناك أيضا قفل.
في
الوثائق المكتوبة حاليًا ، قد لا يتم استدعاء هذه الطريقة في بعض تطبيقات PipeWriter وسيتم إزالتها في المستقبل. لذلك ، يجب أن لا تربط المنطق بهذه الطرق.
حان الوقت ل PipeReader
هنا ، كما في FlushAsync () ، يتم إرجاع ValueTask ، مما يشير إلى أن الطريقة متزامنة في الغالب ، ولكن ليس دائمًا. يعتمد على حالة _readerAwaitable. كما هو الحال مع FlushAsync ، تحتاج إلى البحث عن وقت ضبط _readerAwaitable على عدم اكتماله. يحدث هذا عندما يقرأ PipeReader كل شيء من القائمة الداخلية (أو يحتوي على بيانات تم تعليمها على أنها تم فحصها وتحتاج إلى مزيد من البيانات للمتابعة). الذي ، في الواقع ، هو واضح. وفقًا لذلك ، يمكننا أن نستنتج أنه من المستحسن ضبط توجيهات الإخراج إلى عملك ، لتعيين جميع خياراته بعناية ، استنادًا إلى إحصائيات محددة تجريبياً. سوف يقلل التكوين الصحيح من فرصة تنفيذ فرع غير متزامن وسيسمح بمعالجة أكثر كفاءة للبيانات. تقريبا كل رمز في الأسلوب بأكمله محاط بقفل.
إرجاع بعض
القراءة الغامضة. في الحقيقة ، إنها مجرد علامة + عازلة توضح حالة العملية (IsCanceled - ما إذا كان ReadAsync قد تم إلغاؤه و IsCompleted يشير إلى ما إذا كان PipeWriter مغلقًا). IsCompleted هي قيمة تشير إلى ما إذا كان تم استدعاء أساليب PipeWriter Complete () أو CompleteAsync (). إذا تم استدعاء هذه الطرق مع استثناء نجح ، فسيتم طرحها عند محاولة القراءة.
ومرة أخرى ، يحتوي المخزن المؤقت على نوع غامض -
ReadOnlySequence . هذا ، بدوره ، هو كائن محتويات الشرائح
(ReadOnlySequenceSegment) لبداية ونهاية + فهارس البداية والنهاية داخل الأجزاء المقابلة. الذي يشبه في الواقع هيكل فئة الأنابيب نفسها. بالمناسبة ، يتم وراثة BufferSegment من ReadOnlySequenceSegment ، مما يشير إلى أن BufferSegment يُستخدم في هذا التسلسل. بفضل هذا ، يمكنك فقط التخلص من تخصيصات الذاكرة غير الضرورية لنقل البيانات من كاتب إلى قارئ.
يمكن الحصول على ReadOnlySpan من المخزن المؤقت لمزيد من المعالجة. لإكمال الصورة ، يمكنك التحقق مما إذا كان المخزن المؤقت يحتوي على ReadOnlySpan واحد. إذا كانت تحتوي على ، فلن نحتاج إلى التكرار على المجموعة من عنصر واحد ويمكننا الحصول عليها باستخدام الخاصية الأولى. خلاف ذلك ، من الضروري تجاوز جميع القطاعات في المخزن المؤقت ومعالجة ReadOnlySpan لكل منها.
موضوع المناقشة - في فئة ReadOnlySequence ، يتم استخدام أنواع المراجع الفارغة بشكل نشط وهناك goto (وليس للتداخل العميق للحلقة وليس في التعليمة البرمجية المنشأة) - على وجه الخصوص ،
هنا .
بعد المعالجة ، تحتاج إلى الإشارة إلى مثيل توجيه الإخراج الذي نقرأ فيه البيانات.
إصدار متزامن. يتيح لك الحصول على النتيجة إذا كانت موجودة. خلاف ذلك ، بخلاف ReadAsync ، فإنه لا يحظر ويعود خطأ. أيضا رمز هذه الطريقة في القفل.
في هذه الطريقة ، يمكنك تحديد عدد البايتات التي نفحصها ونستهلكها. سيتم إرجاع البيانات التي تم فحصها ولكن لم يتم استهلاكها في المرة التالية التي يتم فيها قراءتها. قد تبدو هذه الميزة غريبة للوهلة الأولى ، ولكن عند معالجة دفق من البايتات ، نادراً ما يكون من الضروري معالجة كل بايت على حدة. عادة يتم تبادل البيانات باستخدام الرسائل. قد ينشأ موقف أن القارئ ، عند القراءة ، تلقى رسالة واحدة كاملة وجزءًا من الثانية. يجب معالجة الكل ، ويجب ترك جزء من الثاني للمستقبل حتى يأتي مع الجزء المتبقي. تأخذ طريقة AdvanceTo SequencePosition ، وهو في الواقع فهرس + قطعة. عند معالجة كل ما قرأه ReadAsync ، يمكنك تحديد المخزن المؤقت. بخلاف ذلك ، يجب عليك إنشاء موضع بشكل صريح ، مع الإشارة إلى القطاع والفهرس اللذين توقفت عندهما المعالجة. قفل تحت غطاء محرك السيارة.
أيضًا ، إذا كان مقدار المعلومات غير المستهلكة أقل من الحد المحدد (_resumeWriterThreshold) ، فسيبدأ استمرارية PipeWriter إذا تم حظره. بشكل افتراضي ، تكون هذه العتبة عبارة عن 8 مجلدات قطعة (نصف عتبة الحظر).
# 4 الفراغ مكتمل (استثناء استثناء = فارغ)
يكمل PipeReader. إذا تم إكمال PipeWriter في هذه المرحلة ، فسيتم إكمال مثيل Pipe الكامل. قفل في الداخل.
# 5 باطلة CancelPendingRead ()
يسمح لك بإلغاء القراءة الموجودة حاليًا في حالة الانتظار. القفل.
# 6 باطل OnWriterCompleted (رد الفعل <الاستثناء ، الكائن> رد الاتصال ، حالة الكائن)
يسمح لك بتحديد المفوض للتنفيذ عند الانتهاء من PipeWriter.
مثل الطريقة المماثلة لـ PipeWriter ، يوجد في
الوثائق نفس العلامة التي سيتم إزالتها. قفل تحت غطاء محرك السيارة.
مثال
توضح القائمة أدناه مثالًا على العمل مع الأنابيب.
منذ إدخال .NET Core Span و Memory ، تم استكمال العديد من الفئات للعمل مع البيانات بواسطة الأحمال الزائدة باستخدام هذه الأنواع. وبالتالي فإن مخطط التفاعل العام سيكون هو نفسه تقريبا. في المثال الخاص بي ، استخدمت خطوط الأنابيب للعمل مع الأنابيب (أحب الكلمات المتشابهة) - كائنات نظام التشغيل للاتصال بين العمليات. تم توسيع واجهة برمجة تطبيقات الأنابيب وفقًا لذلك لقراءة البيانات في Span و Memory. تستخدم النسخة غير المتزامنة الذاكرة ، نظرًا لأن الطريقة غير المتزامنة سيتم تحويلها إلى طريقة القالب باستخدام آلة الحالة المحدودة التي يتم إنشاؤها تلقائيًا ، والتي يتم فيها تخزين جميع المتغيرات المحلية ومعلمات الطريقة ، وبما أن Span هيكل refonly ، لا يمكن وضعها في الكومة ، على التوالي ، باستخدام Span بطريقة غير متزامنة أمر مستحيل. ولكن هناك أيضًا إصدار متزامن للطريقة التي تسمح لك باستخدام Span. في المثال الخاص بي ، جربت كليهما واتضح أن الإصدار المتزامن في هذا الموقف يظهر نفسه بشكل أفضل. عند استخدامه ، يحدث تجميع أقل للقمامة ، وتكون معالجة البيانات أسرع. ولكن هذا كان فقط بسبب وجود الكثير من البيانات في الأنابيب (كانت البيانات متوفرة دائمًا). في الحالة التي يكون من المحتمل فيها عدم وجود بيانات في وقت التقديم للدُفعة التالية ، يجب عليك استخدام الإصدار غير المتزامن حتى لا تجهد خمول المعالج.
المثال له تعليقات تشرح بعض النقاط. أود أن ألفت انتباهكم إلى حقيقة أنه على الرغم من أن شظايا البرنامج المسؤول عن القراءة من الأنبوب والمعالجة مفصولة ، عند الكتابة إلى ملف ، تتم قراءة البيانات بالضبط من المكان الذي تتم كتابته فيه عند القراءة من الأنابيب.
سنوات من التطور من أجل ميزة قوية - الرئيسية غير متزامن class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
PipeDataWriter public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
DataProcessor public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
BytesProcessor public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {