Hallo Leser. Seit der Veröffentlichung von .NET Core 2.1 ist ziemlich viel Zeit vergangen. Und solche coolen Innovationen wie Span und Memory sind bereits weithin bekannt. Sie können viel darüber lesen, sehen und hören. Leider erhielt die Bibliothek mit dem Namen System.IO. Pipeslines nicht die gleiche Aufmerksamkeit. Fast alles, was es zu diesem Thema gibt, ist
der einzige Beitrag , der auf vielen Ressourcen übersetzt und kopiert wurde. Es sollte mehr Informationen über diese Technologie geben, um sie aus verschiedenen Blickwinkeln betrachten zu können.

Einführung
Ziel dieser Bibliothek ist es daher, die Verarbeitung von Streaming-Daten zu beschleunigen. Es wurde ursprünglich vom Entwicklungsteam von Kestrel (einem plattformübergreifenden Webserver für ASP.NET Core) erstellt und verwendet. Derzeit ist es für Sterbliche über ein
Nuget-Paket verfügbar.
Bevor wir uns mit dem Thema befassen, können wir uns den Bibliotheksmechanismus als ein verbessertes Analogon von MemoryStream vorstellen. Das Problem mit dem ursprünglichen MemoryStream ist eine übermäßige Anzahl von Kopien. Dies ist offensichtlich, wenn Sie sich daran erinnern, dass ein privates Byte-Array in MemoryStream als Puffer verwendet wird. In den
Lese- und
Schreibmethoden können Sie beispielsweise das Kopieren der Daten deutlich sehen. Daher wird für das Objekt, das in den Stream geschrieben werden soll, eine Kopie im internen Puffer erstellt, und während des Lesens wird eine Kopie der internen Kopie an den Verbraucher zurückgegeben. Es klingt nicht nach der rationalsten Nutzung des Gedächtnisses.
System.IO.Pipelines zielt nicht darauf ab, alle Streams zu ersetzen, sondern ist ein zusätzliches Tool im Arsenal eines Entwicklers, der Hochleistungscode schreibt. Ich schlage vor, dass Sie sich mit den grundlegenden Methoden und Klassen vertraut machen, deren Implementierungsdetails anzeigen und grundlegende Beispiele analysieren.
Beginnen wir mit den Interna und Implementierungsdetails und betrachten gleichzeitig einfache Codefragmente. Danach wird klar, wie es funktioniert und wie es verwendet werden sollte. Bei der Arbeit mit System.IO.Pipelines ist zu beachten, dass das Grundkonzept darin besteht, dass alle Lese- / Schreibvorgänge ohne zusätzliche Zuordnungen erfolgen sollten. Einige auf den ersten Blick attraktive Methoden widersprechen dieser Regel. Dementsprechend beginnt der Code, den Sie so stark beschleunigen möchten, Speicher für neue und neue Daten zuzuweisen, wodurch der Garbage Collector geladen wird.
Die Interna der Bibliothek nutzen die umfangreichsten Möglichkeiten der neuesten Versionen der Sprache und Laufzeit - Span, Memory, Objektpools, ValueTask usw. Es lohnt sich, dort zumindest nach einem großartigen Beispiel für die Verwendung dieser Funktionen in der Produktion zu suchen.
Zu einer Zeit waren einige Entwickler mit der Implementierung von Streams in C # nicht zufrieden, da eine Klasse sowohl zum Lesen als auch zum Schreiben verwendet wurde. Aber wie sie sagen, können Sie keine Methoden aus einer Klasse werfen. Auch wenn der Stream das Lesen / Schreiben / Suchen nicht unterstützt, werden die Eigenschaften CanRead, CanWrite und CanSeek verwendet. Es sieht aus wie eine kleine Krücke. Aber jetzt werden die Dinge anders.
Für die Arbeit mit Pipelines werden zwei Klassen verwendet:
PipeWriter und
PipeReader . Diese Klassen enthalten ungefähr 50 Codezeilen und sind Pseudofassaden (nicht die klassischste ihrer Inkarnationen, da sie eine einzelne Klasse verbergen, nicht viel) für die Klasse
Pipe , die die gesamte grundlegende Logik für die Arbeit mit Daten enthält. Diese Klasse enthält 5 öffentliche Mitglieder: 2 Konstruktoren, 2 Nur-Get-Eigenschaften - Reader und Writer, die Reset () -Methode, die interne Felder auf ihren Ausgangszustand zurücksetzt, damit die Klasse wiederverwendet werden kann. Die übrigen Arbeitsmethoden sind intern und werden über Pseudofassaden aufgerufen.
Beginnen wir mit der Pipe-Klasse
Die Klasseninstanz belegt 320 Bytes, was ziemlich viel ist (fast ein Drittel eines Kilobytes, 2 solcher Objekte konnten nicht in den Speicher von Manchester Mark I passen). Daher ist es eine schlechte Idee, eine große Menge seiner Instanzen zuzuweisen. Darüber hinaus ist das Objekt für den Langzeitgebrauch vorgesehen. Die Verwendung von Pools liefert auch ein Argument für diese Aussage. Die im Pool verwendeten Objekte bleiben für immer erhalten (für die Standard-Pool-Implementierung).
Beachten Sie, dass die Klasse als versiegelt markiert und threadsicher ist - viele Abschnitte des Codes sind ein kritischer Abschnitt und in Sperren eingeschlossen.
Um diese Klasse zu verwenden, sollten Sie eine Instanz der Pipe-Klasse erstellen und die Objekte PipeReader und PipeWriter mit den genannten Eigenschaften abrufen.
Einfache Initialisierungvar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
Betrachten Sie die Methoden zum Arbeiten mit Rohren:
Schreiben mit PipeWriter - WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Lesen mit PipeReader - AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.
Wie im
erwähnten Beitrag angegeben , verwendet die Klasse eine einfach verknüpfte Liste von Puffern. Offensichtlich werden sie jedoch nicht zwischen PipeReader und PipeWriter übertragen - die gesamte Logik befindet sich in einer Klasse. Diese Liste wird sowohl zum Lesen als auch zum Schreiben verwendet. Darüber hinaus werden die zurückgegebenen Daten in dieser Liste gespeichert (daher wird kein Kopieren durchgeführt).
Es gibt auch Objekte, die den Beginn der zu lesenden Daten (ReadHead und Index), das Ende der zu lesenden Daten (ReadTail und Index) und den Beginn des zu schreibenden Speicherplatzes (WriteHead und die Anzahl der geschriebenen gepufferten Bytes) angeben. Hier sind ReadHead, ReadTail und WriteHead bestimmte Elemente (Segmente) der internen Liste von Segmenten, und der Index gibt eine bestimmte Position innerhalb des Segments an. Somit kann die Aufzeichnung von der Mitte eines Segments aus beginnen, ein ganzes nächstes Segment erfassen und in der Mitte des dritten Segments enden. Diese Zeiger werden auf verschiedene Arten verschoben.
Erste Schritte mit PipeWriter-Methoden
Das wird auf den ersten Blick als attraktiv bezeichnet. Es hat eine sehr geeignete und modische Signatur - akzeptiert ReadOnlyMemory, asynchron. Und viele mögen versucht sein, sich besonders daran zu erinnern, dass Span und Memory so schnell und cool sind. Aber schmeichel dir nicht. Diese Methode kopiert lediglich das an sie übergebene ReadOnlyMemory in die interne Liste. Mit "Kopieren" ist ein Aufruf der CopyTo () -Methode gemeint, bei dem nicht nur das Objekt selbst kopiert wird. Alle Daten, die wir aufzeichnen möchten, werden kopiert, wodurch der Speicher geladen wird. Diese Methode sollte nur erwähnt werden, um sicherzustellen, dass es besser ist, sie nicht zu verwenden. Nun, und vielleicht für einige seltene Situationen ist dieses Verhalten angemessen.
Der Hauptteil der Methode ist ein kritischer Abschnitt. Der Zugriff darauf wird über einen Monitor synchronisiert.
Dann kann sich die Frage stellen, wie man etwas schreibt, wenn nicht mit der naheliegendsten und einzig geeigneten Methode
Die Methode verwendet einen Parameter eines Integer-Typs. Darin müssen wir angeben, wie viele Bytes wir in die Pipeline schreiben möchten (welche Größe des Puffers wir wollen). Diese Methode prüft, ob in dem in _writingHeadMemory gespeicherten aktuellen Speicherfragment genügend Speicherplatz zum Schreiben vorhanden ist. Wenn dies ausreicht, wird _writingHeadMemory als Speicher zurückgegeben. Andernfalls wird für die in den Puffer geschriebenen Daten, für die die FlushAsync-Methode nicht aufgerufen wurde, ein anderes BufferSegment aufgerufen, das mit dem vorherigen verbunden ist (hier ist unsere interne Liste). Wenn _writingHeadMemory null ist, wird es mit einem neuen BufferSegment initialisiert. Die Zuweisung des Puffers ist ein kritischer Abschnitt und erfolgt unter Verriegelung.
Ich schlage vor, ein solches Beispiel anzuschauen. Auf den ersten Blick scheint es, dass der Compiler (oder die Laufzeit) den Dämon verführt hat.
Teufelei var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
Aber alles in diesem Beispiel ist verständlich und einfach.
Beim Erstellen einer Pipe-Instanz können wir das
PipeOptions- Objekt im Konstruktor mit Optionen zum Erstellen übergeben.
PipeOptions verfügt über ein Standardfeld für die minimale Segmentgröße. Vor nicht allzu langer Zeit war es 2048, aber
dieses Commit hat diesen Wert auf 4096 aktualisiert. Zum Zeitpunkt des Schreibens dieses Artikels befand sich die 4096-Version im Nuget-Paket vor der Veröffentlichung, die letzte Release-Version hatte einen Wert von 2048. Dies erklärt den Verhalten des ersten Beispiels. Wenn Sie eine kleinere Größe für den Standardpuffer verwenden möchten, können Sie diese in einer Instanz des Typs PipeOptions angeben.
Im zweiten Beispiel, in dem die Mindestgröße angegeben ist, stimmt die Länge ohnehin nicht überein. Dies geschieht, weil die Erstellung eines neuen BufferSegments mithilfe von Pools erfolgt. Eine der Optionen in PipeOptions ist der Speicherpool. Danach wird der angegebene Pool verwendet, um ein neues Segment zu erstellen. Wenn Sie keinen Speicherpool angegeben haben, wird der Standard-ArrayPool verwendet, der, wie Sie wissen, mehrere Buckets für Arrays unterschiedlicher Größe enthält (jeder nächste ist zweimal größer als der vorherige) und wenn er für einen bestimmten angefordert wird Größe sucht es nach einem Eimer mit Arrays geeigneter Größe (dh dem nächstgrößeren oder gleich großen). Dementsprechend ist der neue Puffer mit ziemlicher Sicherheit größer als von Ihnen angefordert. Die minimale Arraygröße im Standard-ArrayPool (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) beträgt 16. Aber keine Sorge, dies ist ein Pool von Arrays. Dementsprechend übt das Array in den allermeisten Fällen keinen Druck auf den Garbage Collector aus und wird später wiederverwendet.
Es funktioniert ähnlich und gibt Span from Memory.
Daher sind GetMemory () oder GetSpan () die Hauptmethoden zum Schreiben. Sie geben uns ein Objekt, an das wir schreiben können. Dazu müssen wir keinen Speicher für neue Arrays von Werten zuweisen, sondern können direkt in die Pipe schreiben. Welche verwendet werden soll, hängt hauptsächlich von der verwendeten API und der Methodenasynchronität ab. In Anbetracht des Vorstehenden stellt sich jedoch eine Frage. Woher weiß der Leser, wie viel wir geschrieben haben? Wenn wir immer eine bestimmte Implementierung des Pools verwenden würden, die ein Array mit genau der gewünschten Größe ergibt, könnte der Leser den gesamten Puffer auf einmal lesen. Wie wir bereits gesagt haben, wird uns jedoch ein Puffer mit einer hohen Wahrscheinlichkeit einer größeren Größe zugewiesen. Dies führt zu der folgenden für den Betrieb erforderlichen Methode.
Eine schrecklich einfache Methode. Es wird die Anzahl der als Argument geschriebenen Bytes verwendet. Sie erhöhen die internen Zähler - _unflushedBytes und _writingHeadBytesBuffered, deren Namen für sich selbst sprechen. Außerdem wird _writingHeadMemory genau auf die Anzahl der geschriebenen Bytes gekürzt (Slices) (mithilfe der Slice-Methode). Daher müssen Sie nach dem Aufrufen dieser Methode einen neuen Speicherblock in Form von Memory oder Span anfordern. Sie können nicht in den vorherigen schreiben. Und der gesamte Körper der Methode ist ein kritischer Abschnitt und läuft unter einem Schloss.
Es scheint, dass der Leser danach Daten empfangen kann. Es ist jedoch noch ein Schritt erforderlich.
Die Methode wird aufgerufen, nachdem wir die erforderlichen Daten in den empfangenen Speicher (GetMemory) geschrieben und angegeben haben, wie viel wir dort geschrieben haben (Advance). Die Methode gibt ValueTask zurück, ist jedoch nicht asynchron (im Gegensatz zu ihrem Nachkommen StreamPipeWriter). ValueTask ist ein spezieller Typ (schreibgeschützte Struktur), der verwendet wird, wenn die meisten Aufrufe nicht asynchron sind. Das heißt, alle erforderlichen Daten sind zum Zeitpunkt des Aufrufs verfügbar und die Methode wird synchron beendet. In sich selbst enthält es entweder Daten oder Task (falls es nicht synchron funktioniert hat). Dies hängt von der Eigenschaft _writerAwaitable.IsCompleted ab. Wenn wir nach Änderungen am Status dieses _writerAwaitable suchen, werden wir feststellen, dass dies geschieht, wenn die Menge der nicht verbrauchten Daten (dies ist nicht genau die gleiche wie die später untersuchten nicht erläuterten Daten) einen bestimmten Schwellenwert überschreitet (_pauseWriterThreshold). Der Standardwert ist 16 Segmentgrößen. Falls gewünscht, kann der Wert in PipeOptions geändert werden. Diese Methode startet auch die Fortsetzung der ReadAsync-Methode, falls eine blockiert wurde.
Gibt ein FlushResult zurück, das zwei Eigenschaften enthält - IsCanceled und IsCompleted. IsCanceled gibt an, ob Flush abgebrochen wurde (CancelPendingFlush () -Aufruf). IsCompleted gibt an, ob der PipeReader abgeschlossen wurde (durch Aufrufen der Methoden Complete () oder CompleteAsync ()).
Der Hauptteil der Methode wird unter dem Schloss ausgeführt.
Andere Methoden von PipeWriter sind aus Sicht der Implementierung nicht interessant und werden viel seltener verwendet, daher wird nur eine kurze Beschreibung gegeben.
# 5 void Complete (Ausnahme Ausnahme = null) oder ValueTask CompleteAsync (Ausnahme Ausnahme = null)
Markiert das zum Schreiben geschlossene Rohr. Eine Ausnahme wird ausgelöst, wenn versucht wird, die Schreibmethoden nach Abschluss zu verwenden. Wenn PipeReader bereits abgeschlossen wurde, wird auch die gesamte Pipe-Instanz abgeschlossen. Die meiste Arbeit wird unter dem Schloss erledigt.
# 6 void CancelPendingFlush ()
Wie der Name schon sagt, wird die aktuelle FlushAsync () -Operation abgebrochen. Es gibt ein Schloss.
# 7 void OnReaderCompleted (Aktion <Ausnahme, Objekt> Rückruf, Objektstatus)
Führt den übergebenen Delegaten aus, wenn der Reader abgeschlossen ist. Es gibt auch ein Schloss.
In der
Dokumentation wird derzeit geschrieben, dass diese Methode bei einigen PipeWriter-Implementierungen möglicherweise nicht aufgerufen wird und in Zukunft entfernt wird. Daher sollten Sie keine Logik an diese Methoden binden.
Es ist Zeit für PipeReader
Hier wird wie in FlushAsync () ValueTask zurückgegeben, was darauf hindeutet, dass die Methode größtenteils synchron ist, aber nicht immer. Hängt vom Status von _readerAwaitable ab. Wie bei FlushAsync müssen Sie feststellen, wann _readerAwaitable auf unvollständig gesetzt ist. Dies geschieht, wenn PipeReader alles aus der internen Liste gelesen hat (oder Daten enthält, die als geprüft markiert wurden und Sie weitere Daten benötigen, um fortzufahren). Was in der Tat offensichtlich ist. Dementsprechend können wir den Schluss ziehen, dass es wünschenswert ist, Pipe genau auf Ihre Arbeit abzustimmen und alle Optionen sorgfältig auf der Grundlage empirisch identifizierter Statistiken festzulegen. Durch eine ordnungsgemäße Konfiguration wird die Wahrscheinlichkeit eines asynchronen Ausführungszweigs verringert und eine effizientere Datenverarbeitung ermöglicht. Fast der gesamte Code in der gesamten Methode ist von einer Sperre umgeben.
Gibt ein mysteriöses
ReadResult zurück . Tatsächlich ist es nur ein Puffer + Flags, der den Status des Vorgangs anzeigt (IsCanceled - ob ReadAsync abgebrochen wurde und IsCompleted angibt, ob der PipeWriter geschlossen wurde). IsCompleted ist ein Wert, der angibt, ob die Methoden PipeWriter Complete () oder CompleteAsync () aufgerufen wurden. Wenn diese Methoden mit einer übergebenen Ausnahme aufgerufen wurden, wird sie beim Versuch des Lesens ausgelöst.
Und wieder hat der Puffer einen mysteriösen Typ -
ReadOnlySequence . Dies ist wiederum das Objekt für den Inhalt von Segmenten
(ReadOnlySequenceSegment) des Anfangs- und des End + Start- und Endindex innerhalb der entsprechenden Segmente. Was eigentlich der Struktur der Pipe-Klasse selbst ähnelt. BufferSegment wird übrigens von ReadOnlySequenceSegment geerbt, was darauf hindeutet, dass BufferSegment in dieser Sequenz verwendet wird. Dank dieser Funktion können Sie unnötige Speicherzuweisungen für die Datenübertragung vom Schreiber zum Leser vermeiden.
ReadOnlySpan kann zur weiteren Verarbeitung aus dem Puffer abgerufen werden. Um das Bild zu vervollständigen, können Sie überprüfen, ob der Puffer einen einzelnen ReadOnlySpan enthält. Wenn es enthält, müssen wir die Auflistung nicht von einem Element aus durchlaufen, und wir können sie mit der First-Eigenschaft abrufen. Andernfalls müssen alle Segmente im Puffer überprüft und jeweils ReadOnlySpan verarbeitet werden.
Diskussionsthema - in der ReadOnlySequence-Klasse werden nullfähige Referenztypen aktiv verwendet und es gibt goto (nicht für die Verschachtelung tiefer Schleifen und nicht im generierten Code) - insbesondere
hier .
Nach der Verarbeitung müssen Sie der Pipe-Instanz signalisieren, dass wir die Daten gelesen haben.
Synchrone Version. Ermöglicht das Abrufen des Ergebnisses, falls vorhanden. Andernfalls wird im Gegensatz zu ReadAsync nicht blockiert und false zurückgegeben. Auch der Code dieser Methode befindet sich im Schloss.
Bei dieser Methode können Sie angeben, wie viele Bytes wir untersuchen und verbrauchen. Daten, die untersucht, aber nicht verbraucht wurden, werden beim nächsten Lesen zurückgegeben. Diese Funktion mag auf den ersten Blick seltsam erscheinen, aber bei der Verarbeitung eines Bytestroms ist es selten erforderlich, jedes Byte einzeln zu verarbeiten. Normalerweise werden Daten mithilfe von Nachrichten ausgetauscht. Es kann vorkommen, dass der Leser beim Lesen eine ganze Nachricht und einen Teil der zweiten Nachricht erhalten hat. Das Ganze muss verarbeitet werden, und ein Teil des zweiten sollte für die Zukunft übrig bleiben, damit er mit dem verbleibenden Teil einhergeht. Die AdvanceTo-Methode verwendet eine SequencePosition, bei der es sich tatsächlich um ein Segment + Index handelt. Wenn Sie alles verarbeiten, was ReadAsync gelesen hat, können Sie buffer.End angeben. Andernfalls müssen Sie explizit eine Position erstellen, die das Segment und den Index angibt, an dem die Verarbeitung gestoppt wurde. Schloss ist unter der Haube.
Wenn die Menge der nicht verbrauchten Informationen unter dem angegebenen Schwellenwert (_resumeWriterThreshold) liegt, wird die Fortsetzung von PipeWriter gestartet, wenn diese blockiert wurde. Standardmäßig beträgt dieser Schwellenwert 8 Segmentvolumes (die Hälfte des Blockierungsschwellenwerts).
# 4 void Complete (Ausnahme Ausnahme = null)
Vervollständigt den PipeReader. Wenn der PipeWriter zu diesem Zeitpunkt abgeschlossen ist, wird die gesamte Pipe-Instanz abgeschlossen. Innen verschließen.
# 5 void CancelPendingRead ()
Ermöglicht das Abbrechen des derzeit ausstehenden Messwerts. Sperren
# 6 void OnWriterCompleted (Aktion <Ausnahme, Objekt> Rückruf, Objektstatus)
Hier können Sie den Delegaten angeben, der nach Abschluss des PipeWriter ausgeführt werden soll.
Wie bei der ähnlichen Methode von PipeWriter gibt es in der
Dokumentation dasselbe Tag, das entfernt wird. Schloss ist unter der Haube.
Beispiel
Die folgende Auflistung zeigt ein Beispiel für die Arbeit mit Rohren.
Seit der Einführung von .NET Core Span und Memory wurden viele Klassen für die Arbeit mit Daten durch Überladungen mit diesen Typen ergänzt. Das allgemeine Interaktionsschema ist also ungefähr das gleiche. In meinem Beispiel habe ich Pipelines verwendet, um mit Pipes zu arbeiten (ich mag ähnliche Wörter) - Betriebssystemobjekte für die Interprozesskommunikation. Die Pipes-API wurde gerade entsprechend erweitert, um Daten in Span und Memory zu lesen. Die asynchrone Version verwendet Speicher, da die asynchrone Methode mithilfe einer automatisch generierten Finite-State-Maschine, in der alle lokalen Variablen und Methodenparameter gespeichert sind, in eine Vorlagenmethode konvertiert wird. Da Span ref readonly struct ist, kann sie nicht platziert werden Der Heap, der Span in einer asynchronen Methode verwendet, ist nicht möglich. Es gibt aber auch eine synchrone Version der Methode, mit der Sie Span verwenden können. In meinem Beispiel habe ich beide ausprobiert und es stellte sich heraus, dass sich die synchrone Version in dieser Situation besser zeigt. Bei Verwendung wird weniger Speicherbereinigung durchgeführt und die Datenverarbeitung ist schneller. Dies lag jedoch nur daran, dass sich viele Daten in der Pipe befanden (die Daten waren immer verfügbar). In der Situation, in der es zum Zeitpunkt der Beantragung des nächsten Stapels wahrscheinlich keine Daten gibt, sollten Sie die asynchrone Version verwenden, um den Prozessorleerlauf nicht zu belasten.
Das Beispiel enthält Kommentare, die einige Punkte erläutern. Ich mache Sie darauf aufmerksam, dass trotz der Tatsache, dass die Fragmente des Programms, die für das Lesen aus der Pipe und die Verarbeitung verantwortlich sind, beim Schreiben in eine Datei getrennt werden, die Daten genau von der Stelle gelesen werden, an der sie beim Lesen von der geschrieben wurden Rohr.
Jahre der Evolution für eine leistungsstarke Funktion - asynchrone Hauptleitung class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
Pipepatawriter public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
Datenprozessor public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
Bytesprozessor public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {