Bonjour lecteur. Beaucoup de temps s'est écoulé depuis la sortie de .NET Core 2.1. Et des innovations intéressantes comme Span et Memory sont déjà largement connues, vous pouvez les lire, les voir et en entendre beaucoup parler. Cependant, malheureusement, la bibliothèque appelée System.IO. Pipeslines n'a pas reçu la même attention. Presque tout ce qu'il y a sur ce sujet est
le seul article qui a été traduit et copié sur de nombreuses ressources. Il devrait y avoir plus d'informations sur cette technologie pour la voir sous différents angles.

Présentation
Ainsi, cette bibliothèque vise à accélérer le traitement des données en streaming. Il a été initialement créé et utilisé par l'équipe de développement de Kestrel (un serveur Web multiplateforme pour ASP.NET Core), mais il est actuellement disponible pour les mortels via un
package nuget .
Avant de nous plonger dans le sujet, nous pouvons imaginer le mécanisme de bibliothèque comme un analogue amélioré de MemoryStream. Le problème avec le MemoryStream d'origine est un nombre excessif de copies, ce qui est évident si vous vous souvenez qu'un tableau d'octets privé est utilisé dans MemoryStream comme tampon. Par exemple, dans les méthodes de
lecture et d'
écriture , vous pouvez clairement voir la copie des données. Ainsi, pour l'objet que nous voulons écrire dans le flux, une copie sera créée dans le tampon interne, et lors de la lecture, une copie de la copie interne sera retournée au consommateur. Cela ne ressemble pas à l'utilisation la plus rationnelle de la mémoire.
System.IO.Pipelines n'a pas pour objectif de remplacer tous les flux, c'est un outil supplémentaire dans l'arsenal d'un développeur écrivant du code performant. Je vous suggère de vous familiariser avec les méthodes et classes de base, de voir leurs détails d'implémentation et d'analyser des exemples de base.
Commençons par les éléments internes et les détails de l'implémentation, en regardant en même temps de simples fragments de code. Après cela, il deviendra clair comment cela fonctionne et comment il doit être utilisé. Lorsque vous travaillez avec System.IO.Pipelines, il convient de se rappeler que le concept de base est que toutes les opérations de lecture-écriture doivent avoir lieu sans allocations supplémentaires. Mais certaines méthodes séduisantes à première vue contredisent cette règle. En conséquence, le code que vous essayez d'accélérer si fort commence à allouer de la mémoire pour les nouvelles et nouvelles données, en chargeant le garbage collector.
Les composants internes de la bibliothèque utilisent les possibilités les plus larges des dernières versions du langage et du runtime - étendue, mémoire, pools d'objets, ValueTask et ainsi de suite. Il vaut la peine d'y chercher au moins un excellent exemple d'utilisation de ces fonctionnalités en production.
À un moment donné, certains développeurs n'étaient pas satisfaits de l'implémentation de flux en C #, car une classe était utilisée à la fois pour la lecture et l'écriture. Mais comme on dit, vous ne pouvez pas jeter des méthodes hors d'une classe. Même si le flux ne prend pas en charge la lecture / écriture / recherche, les propriétés CanRead, CanWrite et CanSeek sont utilisées. Cela ressemble à une petite béquille. Mais maintenant, les choses deviennent différentes.
Pour travailler avec des pipelines, 2 classes sont utilisées:
PipeWriter et
PipeReader . Ces classes contiennent environ 50 lignes de code et sont des pseudo-façades (pas la plus classique de ses incarnations, car elles cachent une seule classe, pas beaucoup) pour la classe
Pipe , qui contient toute la logique de base pour travailler avec des données. Cette classe contient 5 membres publics: 2 constructeurs, 2 propriétés get-only - Reader et Writer, la méthode Reset (), qui réinitialise les champs internes à leur état initial afin que la classe puisse être réutilisée. Les autres méthodes de travail sont internes et appelées à l'aide de pseudo-façades.
Commençons par la classe pipe
L'instance de classe occupe 320 octets, ce qui est beaucoup (presque un tiers de kilo-octet, 2 de ces objets ne pouvaient pas tenir dans la mémoire de Manchester Mark I). L'allocation d'une grande quantité de ses instances est donc une mauvaise idée. De plus, l'objet est destiné à une utilisation à long terme. L'utilisation de pools crée également un argument pour cette déclaration. Les objets utilisés dans le pool vivront pour toujours (pour l'implémentation du pool par défaut).
Notez que la classe est marquée comme scellée et qu'elle est thread-safe - de nombreuses sections du code sont une section critique et sont entourées de verrous.
Pour commencer à utiliser cette classe, vous devez créer une instance de la classe Pipe et obtenir les objets PipeReader et PipeWriter en utilisant les propriétés mentionnées.
Initialisation simplevar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
Considérez les méthodes de travail avec les tuyaux:
Écriture avec PipeWriter - WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Lecture avec PipeReader - AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.
Comme indiqué dans l'article
mentionné , la classe utilise une liste de tampons liés individuellement. Mais, évidemment, ils ne sont pas transférés entre PipeReader et PipeWriter - toute la logique est dans une classe. Cette liste est utilisée à la fois pour la lecture et l'écriture. De plus, les données retournées sont stockées dans cette liste (donc aucune copie n'est effectuée).
En outre, il existe des objets indiquant le début des données à lire (ReadHead et index), la fin des données à lire (ReadTail et index) et le début de l'espace à écrire (WriteHead et le nombre d'octets mis en mémoire tampon écrits). Ici, ReadHead, ReadTail et WriteHead sont des membres (segments) spécifiques de la liste interne de segments, et l'index indique une position spécifique dans le segment. Ainsi, l'enregistrement peut commencer au milieu d'un segment, capturer un segment entier suivant et se terminer au milieu du troisième. Ces pointeurs sont déplacés de différentes manières.
Premiers pas avec les méthodes PipeWriter
Cela est mentionné comme une méthode attrayante à première vue. Il a une signature très appropriée et à la mode - accepte ReadOnlyMemory, asynchrone. Et beaucoup peuvent être tentés, surtout en se souvenant que la portée et la mémoire sont si rapides et cool. Mais ne vous flattez pas. Tout ce que cette méthode fait est de copier le ReadOnlyMemory qui lui est passé dans la liste interne. Et par «copier», on entend un appel à la méthode CopyTo (), et non pas copier uniquement l'objet lui-même. Toutes les données que nous voulons enregistrer seront copiées, chargeant ainsi la mémoire. Cette méthode ne doit être mentionnée que pour s'assurer qu'il vaut mieux ne pas l'utiliser. Eh bien, et peut-être pour certaines situations rares, ce comportement est approprié.
Le corps de la méthode est une section critique dont l'accès est synchronisé via un moniteur.
Ensuite, la question peut se poser, comment écrire quelque chose, sinon par la méthode la plus évidente et la seule appropriée
La méthode prend un paramètre d'un type entier. Dans celui-ci, nous devons indiquer combien d'octets nous voulons écrire dans le pipeline (quelle taille de tampon nous voulons). Cette méthode vérifie s'il y a suffisamment d'espace pour écrire dans le fragment de mémoire actuel stocké dans _writingHeadMemory. Si cela est suffisant, _writingHeadMemory est renvoyé en tant que mémoire. Sinon, pour les données écrites dans le tampon, mais pour lesquelles la méthode FlushAsync n'a pas été appelée, elle est appelée et un autre BufferSegment est alloué, qui est connecté au précédent (voici notre liste interne). Si _writingHeadMemory est null, il est initialisé avec un nouveau BufferSegment. Et l'allocation du tampon est une section critique et se fait sous le verrou.
Je suggère de regarder un tel exemple. À première vue, il peut sembler que le compilateur (ou runtime) a séduit le démon.
Devilry var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
Mais tout dans cet exemple est compréhensible et simple.
Lors de la création d'une instance Pipe, nous pouvons lui passer l'objet
PipeOptions dans le constructeur avec des options de création.
PipeOptions a un champ de taille de segment minimum par défaut. Il n'y a pas si longtemps, c'était 2048, mais
ce commit a mis à jour cette valeur à 4096. Au moment de la rédaction de cet article, la version 4096 était en version préliminaire nuget-package, la dernière version avait une valeur de 2048. Cela explique la comportement du premier exemple. Si vous êtes critique quant à l'utilisation d'une taille plus petite pour le tampon par défaut, vous pouvez le spécifier dans une instance du type PipeOptions.
Mais dans le deuxième exemple, où la taille minimale est spécifiée, la longueur ne correspond pas de toute façon. Et cela se produit car la création d'un nouveau BufferSegment se produit à l'aide de pools. L'une des options de PipeOptions est le pool de mémoire. Après cela, le pool spécifié sera utilisé pour créer un nouveau segment. Si vous n'avez pas spécifié de pool de mémoire, le ArrayPool par défaut sera utilisé, qui, comme vous le savez, a plusieurs compartiments pour différentes tailles de tableaux (chacun suivant est 2 fois plus grand que le précédent) et quand il est demandé pour un certain taille, il recherche un compartiment avec des tableaux de taille appropriée (c'est-à-dire, le plus grand ou égal le plus proche). Par conséquent, le nouveau tampon sera presque certainement plus volumineux que vous l'aviez demandé. La taille minimale d'un tableau dans le groupe de tableaux par défaut (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) est de 16. Mais ne vous inquiétez pas, il s'agit d'un pool de tableaux. Par conséquent, dans la grande majorité des cas, la matrice n'exerce pas de pression sur le ramasse-miettes et sera réutilisée ultérieurement.
Il fonctionne de la même façon, offrant une étendue de la mémoire.
Ainsi GetMemory () ou GetSpan () sont les principales méthodes d'écriture. Ils nous donnent un objet sur lequel nous pouvons écrire. Pour ce faire, nous n'avons pas besoin d'allouer de mémoire pour de nouveaux tableaux de valeurs, nous pouvons écrire directement dans le canal. Laquelle utiliser dépendra principalement de l'API que vous utilisez et de la méthode d'asynchronie. Cependant, au vu de ce qui précède, une question se pose. Comment le lecteur saura-t-il combien nous avons écrit? Si nous avons toujours utilisé une implémentation spécifique du pool, qui donne un tableau de la même taille que celle demandée, alors le lecteur pourrait lire la totalité du tampon à la fois. Cependant, comme nous l'avons déjà dit, on nous alloue un tampon avec une forte probabilité d'une taille plus grande. Cela conduit à la méthode suivante requise pour le fonctionnement.
Une méthode terriblement simple. Il prend le nombre d'octets écrits comme argument. Ils incrémentent les compteurs internes - _unflushedBytes et _writingHeadBytesBuffered, dont les noms parlent d'eux-mêmes. Il tronque également (tranches) _writingHeadMemory exactement au nombre d'octets écrits (à l'aide de la méthode Slice). Par conséquent, après avoir appelé cette méthode, vous devez demander un nouveau bloc de mémoire sous forme de mémoire ou d'étendue, vous ne pouvez pas écrire sur le précédent. Et le corps entier de la méthode est une section critique et s'exécute sous un verrou.
Il semble qu'après cela, le lecteur puisse recevoir des données. Mais une étape supplémentaire est nécessaire.
La méthode est appelée après avoir écrit les données nécessaires dans la mémoire reçue (GetMemory) et indiqué combien nous y avons écrit (Advance). La méthode renvoie ValueTask, mais elle n'est pas asynchrone (contrairement à son descendant StreamPipeWriter). ValueTask est un type spécial (structure en lecture seule) utilisé dans le cas où la plupart des appels ne seront pas asynchrones, c'est-à-dire que toutes les données nécessaires seront disponibles au moment de son appel et que la méthode se terminera de manière synchrone. À l'intérieur de lui-même, il contient des données ou une tâche (au cas où cela ne fonctionnerait pas de manière synchrone). Cela dépend de la propriété _writerAwaitable.IsCompleted. Si nous recherchons ce qui change l'état de ce _writerAwaitable, nous verrons que cela se produit si la quantité de données non consommées (ce n'est pas exactement la même chose que les données non examinées seront expliquées plus tard) dépasse un certain seuil (_pauseWriterThreshold). La valeur par défaut est 16 tailles de segment. Si vous le souhaitez, la valeur peut être modifiée dans PipeOptions. En outre, cette méthode démarre la continuation de la méthode ReadAsync, si une a été bloquée.
Renvoie un FlushResult contenant 2 propriétés - IsCanceled et IsCompleted. IsCanceled indique si le vidage a été annulé (appel CancelPendingFlush ()). IsCompleted indique si le PipeReader s'est terminé (en appelant les méthodes Complete () ou CompleteAsync ()).
La partie principale de la méthode est effectuée sous la serrure.
D'autres méthodes de PipeWriter ne sont pas intéressantes du point de vue de la mise en œuvre et sont utilisées beaucoup moins souvent, par conséquent, seule une brève description sera donnée.
# 5 void Complete (Exception exception = null) ou ValueTask CompleteAsync (Exception exception = null)
Marque le tuyau fermé pour écriture. Une exception sera levée lors de la tentative d'utilisation des méthodes d'écriture une fois l'opération terminée. Si PipeReader est déjà terminé, l'instance de tuyau entière est également terminée. La plupart du travail se fait sous la serrure.
# 6 void CancelPendingFlush ()
Comme son nom l'indique, il annule l'opération FlushAsync () en cours. Il y a une serrure.
# 7 void OnReaderCompleted (Action <Exception, objet> rappel, état de l'objet)
Exécute le délégué passé lorsque le lecteur a terminé. Il y a aussi une serrure.
Dans la
documentation, il est actuellement écrit que cette méthode peut ne pas être appelée sur certaines implémentations PipeWriter et sera supprimée à l'avenir. Par conséquent, vous ne devez pas lier la logique à ces méthodes.
Il est temps pour PipeReader
Ici, comme dans FlushAsync (), ValueTask est retourné, ce qui indique que la méthode est principalement synchrone, mais pas toujours. Dépend de l'état de _readerAwaitable. Comme avec FlushAsync, vous devez trouver quand _readerAwaitable est défini sur incomplet. Cela se produit lorsque PipeReader a tout lu dans la liste interne (ou qu'il contient des données marquées comme examinées et que vous avez besoin de plus de données pour continuer). Ce qui, en fait, est évident. En conséquence, nous pouvons conclure qu'il est souhaitable d'affiner Pipe à votre travail, de définir toutes ses options de manière réfléchie, sur la base de statistiques identifiées empiriquement. Une configuration appropriée réduira les chances d'une branche d'exécution asynchrone et permettra un traitement plus efficace des données. Presque tout le code de la méthode entière est entouré d'un verrou.
Renvoie un mystérieux
ReadResult . En fait, c'est juste un tampon + drapeaux indiquant l'état de l'opération (IsCanceled - si ReadAsync a été annulé et IsCompleted indiquant si le PipeWriter a été fermé). IsCompleted est une valeur indiquant si les méthodes PipeWriter Complete () ou CompleteAsync () ont été appelées. Si ces méthodes ont été appelées avec une exception passée, elles seront levées lors de la tentative de lecture.
Et encore une fois, le tampon a un type mystérieux -
ReadOnlySequence . Ceci, à son tour, est l'objet du contenu des segments
(ReadOnlySequenceSegment) des index de début et de fin + début et fin à l'intérieur des segments correspondants. Ce qui ressemble en fait à la structure de la classe Pipe elle-même. Soit dit en passant, BufferSegment est hérité de ReadOnlySequenceSegment, qui indique que BufferSegment est utilisé dans cette séquence. Grâce à cela, vous pouvez simplement vous débarrasser des allocations de mémoire inutiles pour le transfert de données de l'écrivain au lecteur.
ReadOnlySpan peut être obtenu à partir du tampon pour un traitement ultérieur. Pour terminer l'image, vous pouvez vérifier si le tampon contient un seul ReadOnlySpan. S'il contient, nous n'avons pas besoin d'itérer sur la collection d'un élément et nous pouvons l'obtenir en utilisant la propriété First. Sinon, il est nécessaire de parcourir tous les segments du tampon et de traiter ReadOnlySpan de chacun.
Sujet de discussion - dans la classe ReadOnlySequence, les types de référence nullables sont activement utilisés et il y a goto (pas pour l'imbrication en boucle profonde et pas dans le code généré) - en particulier
ici .
Après le traitement, vous devez signaler à l'instance Pipe que nous lisons les données.
Version synchrone. Vous permet d'obtenir le résultat s'il existe. Sinon, contrairement à ReadAsync, il ne bloque pas et renvoie false. Le code de cette méthode se trouve également dans la serrure.
Dans cette méthode, vous pouvez spécifier le nombre d'octets que nous examinons et consommons. Les données examinées mais non consommées seront renvoyées lors de la prochaine lecture. Cette fonctionnalité peut sembler étrange à première vue, mais lors du traitement d'un flux d'octets, il est rarement nécessaire de traiter chaque octet individuellement. Habituellement, les données sont échangées à l'aide de messages. Il peut arriver que le lecteur, lors de la lecture, reçoive un message entier et une partie du second. Le tout doit être traité, et une partie du second doit être laissée pour l'avenir afin qu'elle accompagne la partie restante. La méthode AdvanceTo prend une SequencePosition, qui est en fait un segment + index. Lors du traitement de tout ce que ReadAsync a lu, vous pouvez spécifier buffer.End. Sinon, vous devez créer explicitement une position, en indiquant le segment et l'index auxquels le traitement a été arrêté. La serrure est sous le capot.
De plus, si la quantité d'informations non consommées est inférieure au seuil spécifié (_resumeWriterThreshold), il démarre la poursuite de PipeWriter s'il était bloqué. Par défaut, ce seuil est de 8 volumes de segment (la moitié du seuil de blocage).
# 4 void Complete (Exception exception = null)
Termine le PipeReader. Si le PipeWriter est terminé à ce stade, alors toute l'instance de Pipe se termine. Verrouillez à l'intérieur.
# 5 void CancelPendingRead ()
Vous permet d'annuler la lecture actuellement en attente. Verrouiller
# 6 void OnWriterCompleted (Action <Exception, objet> rappel, état de l'objet)
Vous permet de spécifier le délégué à exécuter à la fin du PipeWriter.
Comme la méthode similaire de PipeWriter, dans la
documentation il y a la même balise qui sera supprimée. La serrure est sous le capot.
Exemple
La liste ci-dessous montre un exemple de travail avec des tuyaux.
Depuis l'introduction de .NET Core Span and Memory, de nombreuses classes pour travailler avec des données ont été complétées par des surcharges utilisant ces types. Le schéma d'interaction général sera donc à peu près le même. Dans mon exemple, j'ai utilisé des pipelines pour travailler avec des tuyaux (j'aime des mots similaires) - des objets OS pour la communication interprocessus. L'API des canaux vient d'être développée en conséquence pour lire les données dans Span et Memory. La version asynchrone utilise la mémoire, car la méthode asynchrone sera convertie en méthode de modèle à l'aide d'une machine à états finis générée automatiquement, dans laquelle toutes les variables locales et les paramètres de méthode sont stockés, et puisque Span est une structure en lecture seule ref, elle ne peut pas être placée dans le tas, respectivement, en utilisant Span dans une méthode asynchrone est impossible. Mais il existe également une version synchrone de la méthode qui vous permet d'utiliser Span. Dans mon exemple, j'ai essayé les deux et il s'est avéré que la version synchrone dans cette situation se montre mieux. Lorsque vous l'utilisez, moins de récupération de place se produit et le traitement des données est plus rapide. Mais c'était uniquement parce qu'il y avait beaucoup de données dans le tuyau (les données étaient toujours disponibles). Dans la situation où il est plutôt probable qu'il n'y aura pas de données au moment de la demande pour le prochain lot, vous devez utiliser la version asynchrone afin de ne pas forcer le processeur au repos.
L'exemple contient des commentaires qui expliquent certains points. J'attire votre attention sur le fait que malgré le fait que les fragments du programme responsable de la lecture du pipe et du traitement soient séparés, lors de l'écriture dans un fichier, les données sont lues exactement de l'endroit où elles sont écrites lors de la lecture du tuyau.
Des années d'évolution pour une fonction puissante - principale asynchrone class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
Pipepatawriter public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
Processeur de données public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
Bytesprocessor public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {