System.IO.Pipelines - أداة غير معروفة لمحبي الأداء العالي

مرحبا القارئ. لقد مر الكثير من الوقت منذ إصدار .NET Core 2.1. وقد تم بالفعل النظر في ابتكارات رائعة مثل Span and Memory على نطاق واسع ، يمكنك قراءتها ورؤيتها وسماع الكثير عنها. ومع ذلك ، لسوء الحظ ، لم تتلق مكتبة تسمى System.IO.Pipelines نفس الاهتمام. إن كل شيء تقريبًا حول هذا الموضوع هو المنشور الوحيد الذي ترجمه كثيرون ونشروه في المنزل. يجب أن يكون هناك بالتأكيد مزيد من المعلومات حتى يتمكن المهتمون من إلقاء نظرة على التكنولوجيا من زوايا مختلفة.



مقدمة


لذلك ، تهدف هذه المكتبة إلى تسريع العمل مع معالجة تدفق البيانات. تم إنشاؤه واستخدامه في الأصل بواسطة فريق التطوير في Kestrel (خادم ويب مشترك عبر النظام الأساسي لـ ASP.NET Core) ، ولكن يتم توفيره حاليًا من خلال حزمة nuget منفصلة.
قبل الخوض في الموضوع ، يمكننا أن نتخيل آلية المكتبة كتماثل محسّن لـ MemoryStream. المشكلة في MemoryStream الأصلي هي عدد كبير من النسخ ، وهو أمر واضح إذا كنت تتذكر أنه يتم استخدام صفيف بايت خاص داخل المخزن المؤقت. على سبيل المثال ، في أساليب القراءة والكتابة ، يكون النسخ مرئيًا بوضوح. وبالتالي ، بالنسبة للكائن الذي نريد الكتابة إليه ، سيتم إنشاء نسخة في المخزن المؤقت الداخلي ، وخلال القراءة ، سيتم تسليم نسخة من النسخة الداخلية إلى المستهلك. يبدو أنه ليس الاستخدام الأكثر عقلانية للفضاء.
لا تهدف System.IO.Pipelines إلى استبدال جميع التدفقات ، بل هي أداة إضافية في ترسانة مطور يقوم بكتابة كود عالي الأداء. أقترح عليك أن تتعرف على الأساليب والفصول الأساسية ، وانظر كيف يتم ترتيبها في الداخل ، وتحليل الأمثلة الأساسية.

لنبدأ بالجهاز الداخلي ، وفي نفس الوقت نتفحص شظايا الكود البسيط. بعد ذلك ، سوف يصبح من الواضح ماذا وكيف يعمل ، وكيف ينبغي استخدامه. عند العمل مع System.IO.Pipelines ، تجدر الإشارة إلى أن المفهوم الأساسي هو أن جميع عمليات القراءة والكتابة يجب أن تتم دون تخصيصات إضافية. لكن بعض الأساليب الجذابة للوهلة الأولى تتناقض مع هذه القاعدة. وفقًا لذلك ، يبدأ الكود الذي تحاول تسريعه في تخصيص ذاكرة للبيانات الجديدة والجديدة ، وتحميل أداة تجميع مجمعي البيانات المهملة.

تستخدم المكتبة الداخلية للمكتبة أوسع إمكانيات لأحدث إصدارات اللغة والوقت - Span ، Memory ، تجمعات الكائنات ، ValueTask ، إلخ. يجدر البحث على الأقل عن مثال رائع لاستخدام هذه الميزات في الإنتاج.
في وقت واحد ، كان البعض غير راضٍ عن تنفيذ التدفقات في C # ، لأنه تم استخدام فصل واحد للقراءة والكتابة. لكن ، كما يقولون ، لا يمكنك رمي الأساليب خارج الفصل. حتى إذا لم يكن الدفق يدعم القراءة / الكتابة / تحريك المؤشر ، فإن خصائص CanRead و CanWrite و CanSeek سارية المفعول ، والتي بدت وكأنها عكاز صغير. هنا الأمور مختلفة.
للعمل مع الأنابيب ، يتم استخدام فئتين : PipeWriter و PipeReader . تحتوي هذه الفئات على حوالي 50 سطرًا لكل منها وهي عبارة عن واجهات زائفة (وليس التجسيد الأكثر كلاسيكية ، نظرًا لوجود فئة واحدة مخبأة خلفها ، وليس كثيرًا) لفئة Pipe ، والتي تحتوي على كل المنطق الأساسي للعمل مع البيانات. من بين الأعضاء العموميين - مُنشئين ، وخاصيتين فقط للحصول على - القارئ والكاتب ، طريقة إعادة التعيين () ، التي تعيد تعيين الحقول الداخلية إلى حالتها الأولية بحيث يمكن إعادة استخدام الفئة تسمى أساليب العمل الأخرى باستخدام واجهات زائفة.

لتبدأ في فئة الأنابيب


يشغل مثيل الفئة 320 بايت ، وهو كثير جدًا (ثلث كيلوبايت تقريبًا ، لا يمكن احتواء 2 من هذه الكائنات في ذاكرة Manchester Mark I). لذلك لتخصيصه بكميات كبيرة هي فكرة سيئة. علاوة على ذلك ، فإن معنى الكائن مخصص للاستخدام طويل الأجل. استخدام تجمعات يجعل حجة لهذا البيان أيضا. بعد كل شيء ، ستعيش الكائنات المستخدمة في التجمع إلى الأبد (في أي حال ، في المعيار).
لاحظ أنه تم وضع علامة على الفصل على أنه مختوم وأنه آمن للخيط - فالكثير من أقسام الكود هي قسم حرج ويتم لفها بأقفال.
للبدء ، تحتاج إلى إنشاء مثيل لفئة Pipe والحصول على كائنات PipeReader و PipeWriter باستخدام الخصائص المذكورة.

سهولة التهيئة
var pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader; 


النظر في أساليب العمل مع الأنابيب:
للتسجيل عبر PipeWriter - WriteAsync ، GetMemory / GetSpan ، Advance ، FlushAsync ، Complete ، CancelPendingFlush ، OnReaderCompleted.
للقراءة من خلال PipeReader - AdvanceTo ، ReadAsync ، TryRead ، Complete ، CancelPendingRead ، OnWriterCompleted.

كما هو مذكور في المنشور ، يستخدم الفصل قائمة مفردة مرتبطة من المخازن المؤقتة. ولكن من الواضح أنه لا يتم تمريرها بين PipeReader و PipeWriter - كل المنطق في فئة واحدة. تستخدم هذه القائمة في القراءة والكتابة. علاوة على ذلك ، يتم تخزين البيانات التي تم إرجاعها في هذه القائمة.
هناك أيضًا كائنات تشير إلى بداية البيانات المراد قراءتها (ReadHead و index) ، ونهاية البيانات المراد قراءتها (ReadTail و index) وبداية مكان الكتابة (WriteHead وعدد وحدات البايت المخزنة مؤقتًا المكتوبة). يعد ReadHead و ReadTail و WriteHead جزءًا محددًا من القائمة ، ويشير الفهرس إلى موضع معين داخل القطاع. وبالتالي ، يمكن أن يبدأ التسجيل من منتصف مقطع ما ، ويلتقط الجزء التالي بأكمله وينتهي في منتصف الجزء الثالث. هذه المؤشرات تتحرك بطرق مختلفة.

الشروع في العمل مع أساليب PipeWriter


# 1 ValueTask <FlushResult> WriteAsync (ReadOnlyMemory <byte> source، CancellationToken cancellationToken)


فقط تلك الطريقة المغرية. لديه توقيع مناسب للغاية وعصري - يقبل ReadOnlyMemory ، غير متزامن. وقد يتم إغراء الكثير منهم ، خاصةً تذكر أن Span and Memory سريعان وباردان. ولكن لا تملق نفسك. كل ما تفعله هذه الطريقة هو نسخ ReadOnlyMemory الذي تم تمريره إليها في القائمة الداخلية. و "copy" تعني دعوة إلى الأسلوب CopyTo ، وليس نسخ الكائن نفسه. أي ، سيتم نسخ جميع البيانات التي نريد تسجيلها ، وبالتالي تحميل الذاكرة. يجب دراسة هذه الطريقة فقط للتأكد من أنه من الأفضل عدم استخدامها. حسنًا ، وربما يكون هذا السلوك مناسبًا لبعض المواقف النادرة.
جسم هذه الطريقة هو قسم حرج ، تتم مزامنة الوصول إليها من خلال شاشة.

ثم قد يطرح السؤال ، كيف تكتب شيئًا ما ، إن لم يكن من خلال الطريقة الأكثر وضوحًا والأكثر ملائمة.

# 2 الذاكرة <بايت> GetMemory (int sizeHint)


تأخذ الطريقة معلمة واحدة من نوع صحيح. يجب أن نشير فيه إلى عدد البايتات التي نرغب في كتابتها (أو أكثر ، ولكن ليس بأي حال أقل). يتحقق هذا الأسلوب مما إذا كانت هناك مساحة كافية للكتابة في الجزء الحالي من الذاكرة المخزنة في _writingHeadMemory. إذا كانت كافية ، يتم إرجاع _writingHeadMemory كذاكرة. إذا لم يكن كذلك ، فبالنسبة للبيانات المكتوبة إلى المخزن المؤقت ، ولكن لم يتم استدعاء طريقة FlushAsync الخاصة بها ، يتم استدعاؤها ويتم تحديد BufferSegment آخر ، وهو متصل بالواحدة السابقة (فيما يلي القائمة). في غياب _writingHeadMemory ، تتم تهيئته باستخدام BufferSegment جديد. وتخصيص المخزن المؤقت التالي هو قسم حاسم ويتم تحت القفل.
أقترح نظرة على هذا المثال. للوهلة الأولى ، قد يبدو أن المترجم (أو وقت التشغيل) أفسد الشيطان.

وحشية
  var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length); //2048  4096 var pipeWithOptions = new Pipe(new PipeOptions(minimumSegmentSize: 5)); Memory<byte> memoryTwo = pipeWithOptions.Writer.GetMemory(2); Console.WriteLine(memoryTwo.Length); //16 


لكن كل شيء في هذا المثال مفهوم وبسيط.
عند إنشاء مثيل Pipe ، يمكننا تمرير كائن PipeOptions مع خيارات لإنشائه إلى المُنشئ.

يحتوي PipeOptions على حقل حجم قطعة الحد الأدنى الافتراضي. منذ وقت ليس ببعيد كان عام 2048 ، لكن هذا الالتزام غير كل شيء ، الآن 4096. في وقت كتابة هذا التقرير ، كانت النسخة التي تحتوي على 4096 عبارة عن حزمة تجريبية ، وفي آخر إصدار كان الإصدار 2048. وهذا ما يفسر سلوك المثال الأول. إذا كنت تنتقد استخدام حجم أصغر للمخزن المؤقت القياسي ، يمكنك تحديده في مثيل نوع PipeOptions.

ولكن في المثال الثاني ، حيث يشار إلى الحد الأدنى للحجم ، فإن الطول لا يطابقه على أي حال. وهذا يحدث بالفعل لأن إنشاء BufferSegment جديد يحدث باستخدام تجمعات. أحد الخيارات في PipeOptions هو تجمع الذاكرة. بعد ذلك ، سيتم استخدام التجمع المحدد لإنشاء شريحة جديدة. إذا لم تحدد تجمع الذاكرة الخاص بك ، فسيتم استخدام ArrayPool القياسي ، والذي ، كما تعلمون ، يحتوي على عدة دلاء لأحجام مختلفة من المصفوفات (كل واحدة تالية أكبر مرتين من السابقة) وعند البحث عن حجم محدد ، فإنها تبحث عن دلو مع صفائف ذات حجم مناسب (ثم هناك أقرب أكبر أو مساوٍ). وفقًا لذلك ، سيكون المخزن المؤقت الجديد أكبر تقريبًا مما طلبته. الحد الأدنى لحجم الصفيف في ArrayPool القياسي (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) هو 16. لكن لا تقلق ، لأن هذا هو مجموعة من الصفائف. وفقًا لذلك ، في الغالبية العظمى من الحالات ، لا تضع الصفيف ضغطًا على جامع البيانات المهملة وسيتم إعادة استخدامها.

# 2.5 Span <byte> GetSpan (int sizeHint)


يعمل بشكل مشابه ، مع إعطاء سبان من الذاكرة.

وبالتالي GetMemory () أو GetSpan () هي الطرق الرئيسية للكتابة. يعطوننا كائنًا يمكننا الكتابة إليه. للقيام بذلك ، لا نحتاج إلى تخصيص ذاكرة لمصفوفات جديدة من القيم ، يمكننا الكتابة مباشرة إلى الهيكل الداخلي. أي واحد لاستخدامه يعتمد بشكل أساسي على واجهة برمجة التطبيقات التي تستخدمها والأسلوب غير المتزامن. ومع ذلك ، في ضوء ما سبق ، يطرح سؤال. كيف سيعرف القارئ كم كتبنا؟ إذا استخدمنا دائمًا تطبيقًا محددًا للتجمع ، والذي يعطي صفيفًا بنفس الحجم المطلوب تمامًا ، عندئذٍ يمكن للقارئ قراءة المخزن المؤقت بالكامل مرة واحدة. ومع ذلك ، كما قلنا بالفعل ، يتم تخصيص مخزن مؤقت لدينا مع وجود احتمال كبير بحجم أكبر. هذا يؤدي إلى الطريقة التالية المطلوبة للتشغيل.

# 3 void Advance (int bytes)


طريقة بسيطة رهيبة. يستغرق عدد البايتات المكتوبة كوسيطة. يقومون بزيادة العدادات الداخلية - _unflushedBytes و _writingHeadBytesBuffered ، الذين تتحدث أسماؤهم عن أنفسهم. يقوم أيضًا باقتطاع _writingHeadMemory تمامًا لعدد البايتات المكتوبة (باستخدام طريقة Slice). لذلك ، بعد استدعاء هذه الطريقة ، تحتاج إلى طلب كتلة ذاكرة جديدة في شكل Memory أو Span ، لا يمكنك الكتابة إلى السابقة. والجسم كله من الطريقة هو قسم حاسم ويعمل تحت قفل.

يبدو أنه بعد هذا يمكن للقارئ تلقي البيانات. ولكن هناك حاجة إلى خطوة أخرى.

# 4 ValueTask <FlushResult> FlushAsync (CancellationToken cancellationToken)


تسمى هذه الطريقة بعد أن نكتب البيانات اللازمة إلى الذاكرة المستلمة ونشير إلى مقدار ما كتبنا هناك. تقوم الطريقة بإرجاع ValueTask ، ومع ذلك فهي غير متزامنة (على عكس StreamPipeWriter الخاص به). تعد ValueTask نوعًا خاصًا (بنية للقراءة فقط) يُستخدم في الحالة التي لن تستخدم فيها معظم المكالمات غير متزامن ، أي أن جميع البيانات اللازمة ستكون متاحة في وقت الاتصال وستنتهي الطريقة بشكل متزامن. في الداخل ، يحتوي على بيانات أو مهمة (في حالة عدم تشغيلها بشكل متزامن). يعتمد ذلك على حالة الخاصية _writerAwaitable.IsCompleted. إذا بحثنا عن التغييرات التي طرأت على حالة كائن الانتظار هذا ، فسنرى أن هذا يحدث بشرط أن تكون كمية البيانات غير المعالجة (غير المستهلكة) (هذه ليست بالضبط نفس البيانات غير المقروءة (لم يتم فحصها) ، والتي سيتم شرحها لاحقًا) تتجاوز حدًا معينًا (_pauseWriterThreshold). الافتراضي هو 16 قطعة الأحجام. إذا رغبت في ذلك ، يمكن تغيير القيمة في PipeOptions. أيضًا ، تبدأ هذه الطريقة في متابعة أسلوب ReadAsync ، إذا تم حظره.

إرجاع FlushResult يحتوي على 2 خصائص - IsCanceled و IsCompleted. يشير IsCanceled إلى ما إذا كان Flush قد تم إلغاؤه (استدعاء CancelPendingFlush). يشير IsCompleted إلى ما إذا كان تم إكمال PipeReader (عن طريق استدعاء أساليب Complete () أو CompleteAsync ()).
يتم تنفيذ الجزء الرئيسي من هذه الطريقة تحت لوك سكاي ووكر.

الأساليب الأخرى لـ PipeWriter لا تهم من وجهة نظر التنفيذ ويتم استخدامها بشكل أقل كثيرًا ، لذلك سيتم تقديم وصف موجز فقط.

# 5 باطلة كاملة (استثناء استثناء = خالية) أو ValueTask CompleteAsync (استثناء استثناء = فارغ)


علامات أنبوب مغلقة للكتابة. عند الانتهاء ، سيتم طرح استثناء عند محاولة استخدام طرق الكتابة. إذا تم إكمال PipeReader بالفعل ، فسيتم أيضًا إكمال مثيل توجيه الإخراج بالكامل. يتم معظم العمل تحت القفل.

# 6 باطلة CancelPendingFlush ()


كما يوحي الاسم ، فإنه يكمل عملية FlushAsync () الحالية. هناك لوك.

# 7 void OnReaderCompleted (رد الفعل <الاستثناء ، الكائن> رد الاتصال ، حالة الكائن)


ينفذ المفوض المفوض عند اكتمال القارئ. هناك أيضا قفل.
تشير الوثائق حاليًا إلى أنه قد لا يتم استدعاء هذه الطريقة على بعض أحفاد PipeWriter وسيتم إزالتها في المستقبل. لذلك ، يجب أن لا تربط المنطق بهذه الطرق.

انتقل إلى PipeReader


# 1 ValueTask <ReadResult> ReadAsync (إلغاء رمز مميز)


هنا ، مثل FlushAsync ، يتم إرجاع ValueTask ، مما يشير إلى أن الطريقة متزامنة في الغالب ، ولكن ليس دائمًا. يعتمد على حالة _readerAwaitable. كما هو الحال مع FlushAsync ، تحتاج إلى البحث عن وقت ضبط _readerAwaitable على عدم اكتماله. يحدث هذا عندما يقرأ PipeReader كل شيء من القائمة (أو أنه يحتوي على بيانات تم تعليمها على أنها تم فحصها وتحتاج إلى مزيد من البيانات للمتابعة). وهو في الواقع منطقي. وفقًا لذلك ، يمكننا أن نستنتج أنه من المستحسن ضبط توجيهات الإخراج إلى عملك ، لتعيين جميع خياراته بعناية ، استنادًا إلى إحصائيات محددة تجريبياً. سوف يقلل التكوين الصحيح من احتمال وجود فرع تنفيذ غير متزامن ويسمح بمعالجة أكثر كفاءة للبيانات. تقريبا الأسلوب بأكمله محاط بقفل.

إرجاع بعض القراءة الغامضة. في الواقع ، إنها مجرد علامة + عازلة توضح حالة العملية (IsCanceled - ما إذا كان ReadAsync قد تم إلغاؤه و IsCompleted يشير إلى ما إذا كان PipeWriter مغلقًا). في هذه الحالة ، IsCompleted هي قيمة تشير إلى ما إذا كان تم استدعاء أساليب PipeWriter Complete () أو CompleteAsync (). إذا تم استدعاء هذه الأساليب مع استثناء ، فسيتم طرحها عند محاولة القراءة.

يحتوي المخزن المؤقت مرة أخرى على نوع غامض - ReadOnlySequence . هذا ، بدوره ، هو كائن لاحتواء المقاطع (ReadOnlySequenceSegment) من مؤشرات البداية والنهاية + البداية والنهاية داخل الأجزاء المقابلة. الذي يشبه في الواقع هيكل فئة الأنابيب نفسها. بالمناسبة ، BufferSegment هي الخلف لـ ReadOnlySequenceSegment ، مما يشير إلى أنه يتم استخدامه هناك. بفضل هذا ، يمكنك فقط التخلص من تخصيصات الذاكرة غير الضرورية لنقل البيانات من كاتب إلى قارئ.
يمكن الحصول على ReadOnlySpan من المخزن المؤقت لمزيد من المعالجة. لإكمال الصورة ، يمكنك التحقق مما إذا كان المخزن المؤقت يحتوي على ReadOnlySpan واحد. إذا كانت تحتوي على ، فلن نحتاج إلى التكرار على المجموعة من عنصر واحد ويمكننا الحصول عليها باستخدام الخاصية الأولى. وإلا ، فأنت بحاجة إلى تجاوز كافة الأجزاء الموجودة في المخزن المؤقت ومعالجة كل ReadOnlySpan.

موضوع المناقشة - في فئة ReadOnlySequence ، يتم استخدام أنواع المراجع الفارغة بشكل نشط وهناك goto (وليس للخروج من التداخل وليس في الكود الذي تم إنشاؤه) - على وجه الخصوص ، هنا

بعد المعالجة ، يجب أن توضح لمثيل الإخراج أننا قد قرأنا البيانات.

# 2 bool TryRead (خارج نتيجة ReadResult)


إصدار متزامن. يتيح لك الحصول على النتيجة إذا كانت كذلك. إذا لم يكن موجودًا بالفعل ، فخلافًا لـ ReadAsync ، لا يتم حظره ، ولكنه يُرجع الخطأ. أيضا في القفل.

# 3 باطلة AdvanceTo (SequencePosition المستهلكة ، SequencePosition فحص)


في هذه الطريقة ، يمكنك تحديد عدد البايتات التي نقرأها وعدد الكميات التي تمت معالجتها. سيتم إرجاع البيانات التي تمت قراءتها ولكن لم تتم معالجتها في المرة التالية التي تتم قراءتها. قد تبدو هذه الميزة غريبة للوهلة الأولى ، ولكن عند معالجة دفق من البايتات ، نادراً ما يكون من الضروري معالجة كل بايت على حدة. عادة ، يتم تبادل البيانات باستخدام الرسائل. قد ينشأ موقف أن القارئ ، عند القراءة ، تلقى رسالة واحدة كاملة وجزءًا من الثانية. يجب معالجة الكل ، ويجب ترك جزء من الثانية في المرة التالية حتى يأتي مع الجزء المتبقي. تقبل طريقة AdvanceTo SequencePosition ، وهو في الواقع فهرس + قطعة فيه. عند معالجة كل ما قرأه ReadAsync ، يمكنك تحديد المخزن المؤقت. وإلا ، فسيتعين عليك إنشاء موضع بشكل صريح ، مع الإشارة إلى القطاع والفهرس اللذين توقفت عندهما المعالجة. تحت غطاء محرك السيارة لوك.
أيضًا ، إذا كانت كمية المعلومات الأولية أقل من العيب الذي تم تثبيته (_resumeWriterThreshold) ، فإنه يبدأ في متابعة PipeWriter إذا تم حظره. بشكل افتراضي ، تكون هذه العتبة عبارة عن 8 مجلدات قطعة (نصف عتبة الحظر).

# 4 الفراغ مكتمل (استثناء استثناء = فارغ)


اكتمال PipeReader. إذا كان PipeWriter مكتملًا في هذه المرحلة ، فسيتم إنهاء مثيل توجيه الإخراج بأكمله. قفل في الداخل.

# 5 باطلة CancelPendingRead ()


يسمح لك بإلغاء القراءة المتوقعة حاليًا. الموضع.

# 6 باطل OnWriterCompleted (رد الفعل <الاستثناء ، الكائن> رد الاتصال ، حالة الكائن)


يسمح لك بتحديد المفوض للتنفيذ عند اكتمال PipeWriter.
مثل الطريقة المماثلة لـ PipeWriter ، تحتوي الوثائق على نفس الملاحظة التي سيتم إزالتها. قفل تحت غطاء محرك السيارة.

مثال



توضح القائمة أدناه مثالًا على العمل مع الأنابيب.
منذ إدخال .NET Core Span و Memory ، تم استكمال العديد من الفئات للعمل مع البيانات بواسطة الأحمال الزائدة باستخدام هذه الأنواع. وبالتالي فإن مخطط التفاعل العام سيكون هو نفسه تقريبا. في المثال الخاص بي ، استخدمت خطوط الأنابيب للعمل مع الأنابيب (أحب الكلمات الجذرية) ، أي القنوات - كائنات نظام التشغيل للاتصال بين العمليات. تم توسيع واجهة برمجة تطبيقات القناة وفقًا لذلك لقراءة البيانات في Span و Memory. يستخدم الإصدار غير المتزامن الذاكرة ، نظرًا لأن الطريقة غير المتزامنة سيتم تحويلها إلى طريقة قالب باستخدام آلية الحالة المحدودة المولدة تلقائيًا ، حيث يتم تخزين جميع المتغيرات المحلية ومعلمات الطريقة ، وحيث أن Span يتم إعادة هيكلة للقراءة فقط ، فلا يمكن أن تكون على الكومة ، على التوالي ، باستخدام Span بطريقة غير متزامنة غير ممكن. ولكن هناك أيضًا إصدار متزامن للطريقة التي تسمح لك باستخدام Span. في المثال الخاص بي ، جربت كليهما واتضح أن الإصدار المتزامن في هذا الموقف يظهر نفسه بشكل أفضل. عند استخدامه ، يحدث تجميع أقل للقمامة ، وتكون معالجة البيانات أسرع. ولكن هذا كان فقط لأنه كان هناك الكثير من البيانات. في حالة احتمال حدوث حالة عدم وجود بيانات في وقت التقديم للدُفعة التالية ، يجب عليك استخدام الإصدار غير المتزامن حتى لا يجهد خمول المعالج.
المثال له تعليقات تشرح بعض النقاط. أود أن ألفت انتباهكم إلى حقيقة أنه على الرغم من حقيقة أن شظايا البرنامج المسؤول عن القراءة من الأنبوب والمعالجة مفصولة ، عند الكتابة إلى ملف ، تتم قراءة البيانات بالضبط من المكان الذي تتم كتابته فيه عند القراءة من الأنبوب.

سنوات من التطور من أجل ميزة قوية - مين غير متزامن
  class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } } 


PipeDataWriter
  public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested(); ////       Memory<T> //Memory<byte> buffer = _pipeWriter.GetMemory(); ////       Memory<T> ////         -       . //int readBytes = await _namedPipe.ReadAsync(buffer, token); //         PipeWriter Span //         -       . int readBytes = _namedPipe.Read(_pipeWriter.GetSpan()); //      ,        //         if (readBytes == 0) { await Task.Delay(500, token); continue; } // ,       _pipeWriter.Advance(readBytes); //  ,      PipeReader FlushResult result = await _pipeWriter.FlushAsync(token); //  PipeReader  ,       //        ,      if (result.IsCompleted) { break; } } //  _pipeWriter     Pipe _pipeWriter.Complete(); } } 


DataProcessor
  public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested(); //     Pipe ReadResult result = await _pipeReader.ReadAsync(token); ReadOnlySequence<byte> buffer = result.Buffer; //      await _bytesProcessor.ProcessBytesAsync(buffer, token); // ,      .       ,   //  ,               //    IBytesProcessor.ProcessBytesAsync   ,    _pipeReader.AdvanceTo(buffer.End); //  PipeWriter  ,      //      ,      if (result.IsCompleted) { break; } } //  _pipeReader     Pipe _pipeReader.Complete(); } } 


BytesProcessor
  public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor { //,         IDisposable readonly FileStream _fileStream = new FileStream("buffer", FileMode.Create); public Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token) { if (bytesSequence.IsSingleSegment) { ProcessSingle(bytesSequence.First.Span); } else { foreach (var segment in bytesSequence) { ProcessSingle(segment.Span); } } return Task.CompletedTask; } private void ProcessSingle(ReadOnlySpan<byte> span) { _fileStream.Write(span); } } 

Source: https://habr.com/ru/post/ar464921/


All Articles