Hallo Leser. Seit der Veröffentlichung von .NET Core 2.1 ist ziemlich viel Zeit vergangen. Und solche coolen Innovationen wie Span und Memory wurden bereits weit verbreitet, man kann viel darüber lesen, sehen und hören. Leider erhielt eine Bibliothek namens System.IO.Pipelines nicht die gleiche Aufmerksamkeit. Fast alles, was zu diesem Thema gehört, ist der
einzige Beitrag , den viele zu Hause übersetzt und gepostet haben. Es sollte auf jeden Fall mehr Informationen geben, damit Interessenten die Technologie aus verschiedenen Blickwinkeln betrachten können.

Einführung
Ziel dieser Bibliothek ist es daher, die Arbeit mit der Streaming-Datenverarbeitung zu beschleunigen. Es wurde ursprünglich vom Entwicklungsteam von Kestrel (einem plattformübergreifenden Webserver für ASP.NET Core) erstellt und verwendet, wird jedoch derzeit über ein separates
Nuget-Paket bereitgestellt .
Bevor wir uns mit dem Thema befassen, können wir uns den Bibliotheksmechanismus als ein verbessertes Analogon von MemoryStream vorstellen. Das Problem mit dem ursprünglichen MemoryStream ist eine übermäßige Anzahl von Kopien. Dies ist offensichtlich, wenn Sie sich daran erinnern, dass ein privates Byte-Array im Inneren als Puffer verwendet wird. Beispielsweise ist bei den
Lese- und
Schreibmethoden das Kopieren deutlich sichtbar. Daher wird für das Objekt, das in den Stream geschrieben werden soll, eine Kopie im internen Puffer erstellt, und während des Lesens wird eine Kopie der internen Kopie an den Verbraucher geliefert. Es klingt nicht nach der rationalsten Raumnutzung.
System.IO.Pipelines zielt nicht darauf ab, alle Streams zu ersetzen, sondern ist ein zusätzliches Tool im Arsenal eines Entwicklers, der Hochleistungscode schreibt. Ich schlage vor, dass Sie sich mit den grundlegenden Methoden und Klassen vertraut machen, sehen, wie sie im Inneren angeordnet sind, und grundlegende Beispiele analysieren.
Beginnen wir mit dem internen Gerät und untersuchen gleichzeitig einfache Codefragmente. Danach wird klar, was und wie es funktioniert und wie es verwendet werden soll. Bei der Arbeit mit System.IO.Pipelines ist zu beachten, dass das Grundkonzept darin besteht, dass alle Lese- / Schreibvorgänge ohne zusätzliche Zuordnungen erfolgen sollten. Einige auf den ersten Blick attraktive Methoden widersprechen dieser Regel. Dementsprechend beginnt der Code, den Sie so schnell zu beschleunigen versuchen, Speicher für neue und neue Daten zuzuweisen, wodurch der Garbage Collector geladen wird.
Die interne Bibliothek der Bibliothek nutzt die umfangreichsten Möglichkeiten der neuesten Versionen von Sprache und Zeitspanne, Spanne, Speicher, Objektpools, ValueTask usw. Zumindest ein gutes Beispiel für die Verwendung dieser Funktionen in der Produktion ist einen Blick wert.
Zu einer Zeit waren einige mit der Implementierung von Streams in C # unzufrieden, da eine Klasse sowohl zum Lesen als auch zum Schreiben verwendet wurde. Aber wie sie sagen, können Sie keine Methoden aus einer Klasse werfen. Selbst wenn der Stream das Lesen / Schreiben / Verschieben des Zeigers nicht unterstützte, wurden die Eigenschaften CanRead, CanWrite und CanSeek wirksam, die wie eine kleine Krücke aussahen. Hier ist das anders.
Für die Arbeit mit Pipes werden zwei Klassen verwendet:
PipeWriter und
PipeReader . Diese Klassen enthalten jeweils etwa 50 Zeilen und sind Pseudofassaden (nicht die klassischsten Inkarnationen, da eine einzige Klasse dahinter verborgen ist und nicht viele) für die
Pipe- Klasse, die die gesamte grundlegende Logik für die Arbeit mit Daten enthält. Von den öffentlichen Mitgliedern - 2 Konstruktoren, 2 Nur-Get-Eigenschaften - Reader und Writer, die Reset () -Methode, die interne Felder auf ihren Ausgangszustand zurücksetzt, damit die Klasse wiederverwendet werden kann. Andere Arbeitsmethoden werden als Pseudofassaden bezeichnet.
Erste Schritte mit der Pipe-Klasse
Die Klasseninstanz belegt 320 Bytes, was ziemlich viel ist (fast ein Drittel eines Kilobytes, 2 solcher Objekte konnten nicht in den Speicher von Manchester Mark I passen). Es ist also eine schlechte Idee, es in großen Mengen zuzuweisen. Darüber hinaus ist die Bedeutung des Objekts für den langfristigen Gebrauch bestimmt. Die Verwendung von Pools liefert auch ein Argument für diese Aussage. Immerhin werden die im Pool verwendeten Objekte für immer leben (auf jeden Fall im Standard).
Beachten Sie, dass die Klasse als versiegelt markiert und threadsicher ist - viele Abschnitte des Codes sind ein kritischer Abschnitt und in Sperren eingeschlossen.
Erstellen Sie zunächst eine Instanz der Pipe-Klasse und rufen Sie die Objekte PipeReader und PipeWriter mit den genannten Eigenschaften ab.
Einfache Initialisierungvar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
Betrachten Sie die Methoden zum Arbeiten mit Rohren:
Für die Aufnahme über PipeWriter - WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Zum Lesen von PipeReader - AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.
Wie im
Beitrag angegeben , verwendet die Klasse eine einfach verknüpfte Liste von Puffern. Offensichtlich werden sie jedoch nicht zwischen PipeReader und PipeWriter übertragen - die gesamte Logik befindet sich in einer Klasse. Diese Liste wird sowohl zum Lesen als auch zum Schreiben verwendet. Darüber hinaus werden die zurückgegebenen Daten in dieser Liste gespeichert.
Es gibt auch Objekte, die den Beginn der zu lesenden Daten (ReadHead und Index), das Ende der zu lesenden Daten (ReadTail und Index) und den Beginn der zu schreibenden Stelle (WriteHead und die Anzahl der geschriebenen gepufferten Bytes) angeben. Hier sind ReadHead, ReadTail und WriteHead ein bestimmtes Segment aus der Liste, und der Index gibt eine bestimmte Position innerhalb des Segments an. Somit kann die Aufzeichnung in der Mitte eines Segments beginnen, das gesamte nächste Segment erfassen und in der Mitte des dritten Segments enden. Diese Zeiger bewegen sich auf verschiedene Arten.
Erste Schritte mit PipeWriter-Methoden
Genau diese verlockende Methode. Hat eine sehr geeignete und trendige Signatur - akzeptiert ReadOnlyMemory, asynchron. Und viele mögen versucht sein, sich besonders daran zu erinnern, dass Span und Memory so schnell und cool sind. Aber schmeichel dir nicht. Diese Methode kopiert lediglich das an sie übergebene ReadOnlyMemory in die interne Liste. Und "Kopieren" bedeutet einen Aufruf der CopyTo-Methode und nicht das Kopieren des Objekts selbst. Das heißt, alle Daten, die wir aufzeichnen möchten, werden kopiert, wodurch der Speicher geladen wird. Diese Methode sollte nur untersucht werden, um sicherzustellen, dass es besser ist, sie nicht zu verwenden. Nun, und vielleicht für einige seltene Situationen ist dieses Verhalten angemessen.
Der Hauptteil der Methode ist ein kritischer Abschnitt. Der Zugriff darauf wird über einen Monitor synchronisiert.
Dann kann sich die Frage stellen, wie man etwas schreibt, wenn nicht mit der naheliegendsten und einzig geeigneten Methode.
Die Methode verwendet einen Parameter eines Integer-Typs. Darin müssen wir angeben, wie viele Bytes wir schreiben möchten (oder mehr, aber auf keinen Fall weniger). Diese Methode prüft, ob in dem in _writingHeadMemory gespeicherten aktuellen Speicherfragment genügend Speicherplatz zum Schreiben vorhanden ist. Wenn dies ausreicht, wird _writingHeadMemory als Speicher zurückgegeben. Wenn nicht, wird für die Daten, die in den Puffer geschrieben wurden, für die jedoch die FlushAsync-Methode nicht aufgerufen wurde, ein anderes BufferSegment aufgerufen, das mit dem vorherigen verbunden ist (hier die Liste). In Abwesenheit von _writingHeadMemory wird es mit einem neuen BufferSegment initialisiert. Die Zuweisung des nächsten Puffers ist ein kritischer Abschnitt und erfolgt unter der Sperre.
Ich schlage einen Blick auf ein solches Beispiel vor. Auf den ersten Blick scheint es, dass der Compiler (oder die Laufzeit) den Dämon verführt hat.
Teufelei var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
Aber alles in diesem Beispiel ist verständlich und einfach.
Beim Erstellen einer Pipe-Instanz können wir ein
PipeOptions- Objekt mit Optionen zum Erstellen an den Konstruktor übergeben.
PipeOptions verfügt über ein Standardfeld für die minimale Segmentgröße. Vor nicht allzu langer Zeit war es 2048, aber
dieses Commit änderte alles, jetzt 4096. Zum Zeitpunkt des Schreibens war die Version mit 4096 ein Vorabversionspaket, in der neuesten Release-Version war es 2048. Dies erklärt das Verhalten des ersten Beispiels. Wenn Sie eine kleinere Größe für den Standardpuffer verwenden möchten, können Sie diese in einer Instanz des Typs PipeOptions angeben.
Im zweiten Beispiel, in dem die Mindestgröße angegeben ist, stimmt die Länge ohnehin nicht überein. Und dies geschieht bereits, weil die Erstellung eines neuen BufferSegments mithilfe von Pools erfolgt. Eine der Optionen in PipeOptions ist der Speicherpool. Danach wird der angegebene Pool verwendet, um ein neues Segment zu erstellen. Wenn Sie Ihren Speicherpool nicht angegeben haben, wird der Standard-ArrayPool verwendet, der, wie Sie wissen, mehrere Buckets für unterschiedliche Arraysgrößen enthält (jeder nächste ist zweimal größer als der vorherige). Wenn Sie nach einer bestimmten Größe gefragt werden, wird nach einem Bucket mit Arrays geeigneter Größe gesucht (dann) es ist das nächste größer oder gleich). Dementsprechend ist der neue Puffer mit ziemlicher Sicherheit größer als von Ihnen angefordert. Die minimale Arraygröße im Standard-ArrayPool (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) beträgt 16. Machen Sie sich jedoch keine Sorgen, da dies ein Pool von Arrays ist. Dementsprechend belastet das Array in den allermeisten Fällen den Garbage Collector nicht und wird wiederverwendet.
Es funktioniert ähnlich und gibt Span from Memory.
Daher sind GetMemory () oder GetSpan () die Hauptmethoden zum Schreiben. Sie geben uns ein Objekt, an das wir schreiben können. Dazu müssen wir keinen Speicher für neue Wertearrays zuweisen, sondern können direkt in die interne Struktur schreiben. Welche verwendet werden soll, hängt hauptsächlich von der verwendeten API und der asynchronen Methode ab. In Anbetracht des Vorstehenden stellt sich jedoch eine Frage. Woher weiß der Leser, wie viel wir geschrieben haben? Wenn wir immer eine bestimmte Implementierung des Pools verwenden würden, die ein Array mit genau der gewünschten Größe ergibt, könnte der Leser den gesamten Puffer auf einmal lesen. Wie wir bereits gesagt haben, wird uns jedoch ein Puffer mit einer hohen Wahrscheinlichkeit einer größeren Größe zugewiesen. Dies führt zu der folgenden für den Betrieb erforderlichen Methode.
Eine schrecklich einfache Methode. Es wird die Anzahl der als Argument geschriebenen Bytes verwendet. Sie erhöhen die internen Zähler - _unflushedBytes und _writingHeadBytesBuffered, deren Namen für sich selbst sprechen. Außerdem wird _writingHeadMemory genau auf die Anzahl der geschriebenen Bytes gekürzt (mithilfe der Slice-Methode). Daher müssen Sie nach dem Aufrufen dieser Methode einen neuen Speicherblock in Form von Memory oder Span anfordern. Sie können nicht in den vorherigen schreiben. Und der gesamte Körper der Methode ist ein kritischer Abschnitt und läuft unter einem Schloss.
Es scheint, dass der Leser danach Daten empfangen kann. Es ist jedoch noch ein Schritt erforderlich.
Die Methode wird aufgerufen, nachdem wir die erforderlichen Daten in den empfangenen Speicher geschrieben und angegeben haben, wie viel wir dort geschrieben haben. Die Methode gibt eine ValueTask zurück, ist jedoch nicht asynchron (im Gegensatz zu ihrem Nachkommen StreamPipeWriter). ValueTask ist ein spezieller Typ (schreibgeschützte Struktur), der verwendet wird, wenn die meisten Aufrufe keine Asynchronität verwenden, dh alle erforderlichen Daten zum Zeitpunkt des Aufrufs verfügbar sind und die Methode synchron endet. Im Inneren enthält es entweder Daten oder Task (falls es nicht synchron funktioniert hat). Dies hängt vom Status der Eigenschaft _writerAwaitable.IsCompleted ab. Wenn wir nach Änderungen des Status dieses wartenden Objekts suchen, werden wir feststellen, dass dies unter der Bedingung geschieht, dass die Menge der nicht verarbeiteten (nicht verbrauchten) Daten (dies ist nicht genau die gleiche wie die Menge der ungelesenen (nicht untersuchten) Daten, die später erläutert wird) einen bestimmten Schwellenwert überschreitet (_pauseWriterThreshold). Der Standardwert beträgt 16 Segmentgrößen. Falls gewünscht, kann der Wert in PipeOptions geändert werden. Diese Methode startet auch die Fortsetzung der ReadAsync-Methode, falls eine blockiert wurde.
Gibt ein FlushResult zurück, das zwei Eigenschaften enthält - IsCanceled und IsCompleted. IsCanceled gibt an, ob Flush abgebrochen wurde (CancelPendingFlush-Aufruf). IsCompleted gibt an, ob der PipeReader abgeschlossen wurde (durch Aufrufen der Methoden Complete () oder CompleteAsync ()).
Der Hauptteil der Methode wird unter Locke Skywalker durchgeführt.
Andere Methoden von PipeWriter sind aus Sicht der Implementierung nicht von Interesse und werden viel seltener verwendet, daher wird nur eine kurze Beschreibung gegeben.
# 5 void Complete (Ausnahme Ausnahme = null) oder ValueTask CompleteAsync (Ausnahme Ausnahme = null)
Markiert das zum Schreiben geschlossene Rohr. Nach Abschluss wird eine Ausnahme ausgelöst, wenn versucht wird, die Methoden zum Schreiben zu verwenden. Wenn PipeReader bereits abgeschlossen wurde, wird auch die gesamte Pipe-Instanz abgeschlossen. Die meiste Arbeit wird unter dem Schloss erledigt.
# 6 void CancelPendingFlush ()
Wie der Name schon sagt, wird die aktuelle FlushAsync () -Operation abgeschlossen. Es gibt ein lok.
# 7 void OnReaderCompleted (Aktion <Ausnahme, Objekt> Rückruf, Objektstatus)
Führt den delegierten Delegaten aus, wenn der Reader abgeschlossen ist. Es gibt auch ein Schloss.
In der
Dokumentation heißt es derzeit, dass diese Methode bei einigen PipeWriter-Nachkommen möglicherweise nicht aufgerufen wird und in Zukunft entfernt wird. Daher sollten Sie keine Logik an diese Methoden binden.
Gehen Sie zu PipeReader
Hier wird wie bei FlushAsync eine ValueTask zurückgegeben, die darauf hinweist, dass die Methode größtenteils synchron ist, jedoch nicht immer. Hängt vom Status von _readerAwaitable ab. Wie bei FlushAsync müssen Sie feststellen, wann _readerAwaitable auf unvollständig gesetzt ist. Dies geschieht, wenn PipeReader alles aus der Liste liest (oder Daten enthält, die als geprüft markiert wurden und weitere Daten benötigen, um fortzufahren). Was in der Tat logisch ist. Dementsprechend können wir den Schluss ziehen, dass es wünschenswert ist, Pipe genau auf Ihre Arbeit abzustimmen und alle Optionen sorgfältig auf der Grundlage empirisch identifizierter Statistiken festzulegen. Durch eine ordnungsgemäße Konfiguration wird die Wahrscheinlichkeit eines asynchronen Ausführungszweigs verringert und eine effizientere Datenverarbeitung ermöglicht. Fast die gesamte Methode ist von einem Schloss umgeben.
Gibt ein mysteriöses
ReadResult zurück . Tatsächlich ist es nur ein Puffer + Flags, der den Status des Vorgangs anzeigt (IsCanceled - ob ReadAsync abgebrochen wurde und IsCompleted angibt, ob der PipeWriter geschlossen wurde). In diesem Fall ist IsCompleted ein Wert, der angibt, ob die Methoden PipeWriter Complete () oder CompleteAsync () aufgerufen wurden. Wenn diese Methoden mit einer Ausnahme aufgerufen wurden, wird sie beim Lesen ausgelöst.
Der Puffer hat wieder einen mysteriösen Typ -
ReadOnlySequence . Dies ist wiederum ein Objekt zum Enthalten von
Segmenten (ReadOnlySequenceSegment) der Anfangs- und End- + Start- und Endindizes innerhalb der entsprechenden Segmente. Was eigentlich der Struktur der Pipe-Klasse selbst ähnelt. BufferSegment ist übrigens der Nachfolger von ReadOnlySequenceSegment, was darauf hindeutet, dass es dort verwendet wird. Dank dieser Funktion können Sie unnötige Speicherzuweisungen für die Datenübertragung vom Schreiber zum Leser vermeiden.
ReadOnlySpan kann zur weiteren Verarbeitung aus dem Puffer abgerufen werden. Um das Bild zu vervollständigen, können Sie überprüfen, ob der Puffer einen einzelnen ReadOnlySpan enthält. Wenn es enthält, müssen wir die Auflistung nicht von einem Element aus durchlaufen, und wir können sie mit der First-Eigenschaft abrufen. Andernfalls müssen Sie alle Segmente im Puffer durchgehen und jedes ReadOnlySpan verarbeiten.
Diskussionsthema - in der ReadOnlySequence-Klasse werden nullfähige Referenztypen aktiv verwendet und es gibt goto (nicht zum Verlassen der Verschachtelung und nicht im generierten Code) - insbesondere
hierNach der Verarbeitung müssen Sie der Pipe-Instanz klar machen, dass wir die Daten gelesen haben.
Synchrone Version. Ermöglicht es Ihnen, das Ergebnis zu erhalten, wenn dies der Fall ist. Wenn es nicht bereits vorhanden ist, wird es im Gegensatz zu ReadAsync nicht blockiert, sondern gibt false zurück. Auch im Schloss.
Bei dieser Methode können Sie angeben, wie viele Bytes wir lesen und wie viele verarbeitet werden. Daten, die gelesen, aber nicht verarbeitet wurden, werden beim nächsten Lesen zurückgegeben. Diese Funktion mag auf den ersten Blick seltsam erscheinen, aber bei der Verarbeitung eines Bytestroms ist es selten erforderlich, jedes Byte einzeln zu verarbeiten. In der Regel werden Daten mithilfe von Nachrichten ausgetauscht. Es kann vorkommen, dass der Leser beim Lesen eine ganze Nachricht und einen Teil der zweiten Nachricht erhalten hat. Das Ganze muss verarbeitet werden, und ein Teil des zweiten sollte beim nächsten Mal übrig bleiben, damit er mit dem verbleibenden Teil einhergeht. Die AdvanceTo-Methode akzeptiert eine SequencePosition, bei der es sich tatsächlich um ein Segment + einen Index handelt. Wenn Sie alles verarbeiten, was ReadAsync gelesen hat, können Sie buffer.End angeben. Andernfalls müssen Sie explizit eine Position erstellen, die das Segment und den Index angibt, an dem die Verarbeitung gestoppt wurde. Unter der Haube lok.
Wenn die Menge der Rohdaten geringer ist als der installierte Fehler (_resumeWriterThreshold), wird die Fortsetzung von PipeWriter gestartet, wenn es blockiert wurde. Standardmäßig beträgt dieser Schwellenwert 8 Segmentvolumes (die Hälfte des Blockierungsschwellenwerts).
# 4 void Complete (Ausnahme Ausnahme = null)
Vervollständigt PipeReader. Wenn PipeWriter zu diesem Zeitpunkt abgeschlossen ist, wird die gesamte Pipe-Instanz beendet. Innen verschließen.
# 5 void CancelPendingRead ()
Ermöglicht das Abbrechen des aktuell erwarteten Messwerts. Locke.
# 6 void OnWriterCompleted (Aktion <Ausnahme, Objekt> Rückruf, Objektstatus)
Hier können Sie den Delegaten angeben, der nach Abschluss des PipeWriter ausgeführt werden soll.
Wie bei der ähnlichen Methode für PipeWriter enthält die
Dokumentation denselben Hinweis, der entfernt wird. Unter der Haube verriegeln.
Beispiel
Die folgende Auflistung zeigt ein Beispiel für die Arbeit mit Rohren.
Seit der Einführung von .NET Core Span und Memory wurden viele Klassen für die Arbeit mit Daten durch Überladungen mit diesen Typen ergänzt. Das allgemeine Interaktionsschema ist also ungefähr das gleiche. In meinem Beispiel habe ich Pipelines verwendet, um mit Pipes zu arbeiten (ich mag Wurzelwörter), d. H. Kanäle - Betriebssystemobjekte für die Interprozesskommunikation. Die Kanal-API wurde gerade entsprechend erweitert, um Daten in Span und Memory zu lesen. Die asynchrone Version verwendet Speicher, da die asynchrone Methode mithilfe einer automatisch generierten endlichen Zustandsmaschine, in der alle lokalen Variablen und Methodenparameter gespeichert sind, in eine Vorlagenmethode konvertiert wird. Da Span ref readonly struct ist, kann sie mit Span nicht auf dem Heap sein bei einer asynchronen Methode ist dies nicht möglich. Es gibt aber auch eine synchrone Version der Methode, mit der Sie Span verwenden können. In meinem Beispiel habe ich beide ausprobiert und es stellte sich heraus, dass sich die synchrone Version in dieser Situation besser zeigt. Bei Verwendung wird weniger Speicherbereinigung durchgeführt und die Datenverarbeitung ist schneller. Dies lag jedoch nur daran, dass viele Daten vorhanden waren. Für den Fall, dass zum Zeitpunkt der Beantragung des nächsten Stapels wahrscheinlich keine Daten vorliegen, sollten Sie die asynchrone Version verwenden, um den Prozessorleerlauf nicht zu belasten.
Das Beispiel enthält Kommentare, die einige Punkte erläutern. Ich mache Sie darauf aufmerksam, dass trotz der Tatsache, dass die Fragmente des Programms, die für das Lesen aus der Pipe und die Verarbeitung verantwortlich sind, beim Schreiben in eine Datei getrennt werden, die Daten genau an der Stelle gelesen werden, an der sie beim Lesen aus der Pipe geschrieben wurden.
Jahre der Evolution für ein mächtiges Feature - asynchrone Maine class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
Pipepatawriter public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
Datenprozessor public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
Bytesprozessor public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {