System.IO.Pipelines - un outil peu connu pour les amateurs de hautes performances

Bonjour lecteur. Beaucoup de temps s'est écoulé depuis la sortie de .NET Core 2.1. Et des innovations intéressantes comme Span et Memory ont déjà été largement prises en compte, vous pouvez les lire, les voir et en entendre beaucoup parler. Cependant, malheureusement, une bibliothèque appelée System.IO.Pipelines n'a pas reçu la même attention. Presque tout ce qui est sur ce sujet est le seul message que beaucoup ont traduit et publié à la maison. Il devrait certainement y avoir plus d'informations pour que les personnes intéressées puissent regarder la technologie sous différents angles.



Présentation


Ainsi, cette bibliothèque vise à accélérer le travail avec le traitement des données en streaming. Il a été initialement créé et utilisé par l'équipe de développement de Kestrel (un serveur Web multiplateforme pour ASP.NET Core), mais est actuellement fourni via un package de nuget distinct.
Avant de nous plonger dans le sujet, nous pouvons imaginer le mécanisme de bibliothèque comme un analogue amélioré de MemoryStream. Le problème avec le MemoryStream d'origine est un nombre excessif de copies, ce qui est évident si vous vous souvenez qu'un tableau d'octets privé est utilisé à l'intérieur comme tampon. Par exemple, dans les méthodes de lecture et d' écriture , la copie est clairement visible. Ainsi, pour l'objet que nous voulons écrire dans le flux, une copie sera créée dans le tampon interne, et lors de la lecture, une copie de la copie interne sera remise au consommateur. Cela ne ressemble pas à l'utilisation la plus rationnelle de l'espace.
System.IO.Pipelines n'a pas pour objectif de remplacer tous les flux, c'est un outil supplémentaire dans l'arsenal d'un développeur écrivant du code performant. Je vous suggère de vous familiariser avec les méthodes et classes de base, de voir comment elles sont organisées à l'intérieur et d'analyser des exemples de base.

Commençons par le périphérique interne, en examinant en même temps de simples fragments de code. Après cela, il deviendra clair quoi et comment cela fonctionne, et comment il doit être utilisé. Lorsque vous travaillez avec System.IO.Pipelines, il convient de se rappeler que le concept de base est que toutes les opérations de lecture-écriture doivent avoir lieu sans allocations supplémentaires. Mais certaines méthodes séduisantes à première vue contredisent cette règle. Par conséquent, le code que vous essayez si fort d'accélérer commence à allouer de la mémoire pour les nouvelles et nouvelles données, en chargeant le garbage collector.

La bibliothèque interne de la bibliothèque utilise les possibilités les plus étendues des dernières versions de la langue et de la durée, de l’étendue, de la mémoire, des pools d’objets, de ValueTask, etc. Cela vaut au moins le coup d’œil pour un excellent exemple d’utilisation de ces fonctionnalités en production.
À un moment donné, certains n'étaient pas satisfaits de l'implémentation de flux en C #, car une classe était utilisée à la fois pour la lecture et l'écriture. Mais, comme on dit, vous ne pouvez pas jeter des méthodes hors d'une classe. Même si le flux ne prenait pas en charge la lecture / l'écriture / le déplacement du pointeur, les propriétés CanRead, CanWrite et CanSeek prenaient effet, ce qui ressemblait à une petite béquille. Ici, les choses sont différentes.
Pour travailler avec des tuyaux, 2 classes sont utilisées: PipeWriter et PipeReader . Ces classes contiennent environ 50 lignes chacune et sont des pseudo-façades (pas ses incarnations les plus classiques, car il y a une seule classe cachée derrière elles, et pas beaucoup) pour la classe Pipe , qui contient toute la logique de base pour travailler avec des données. Parmi les membres publics - 2 constructeurs, 2 propriétés get-only - Reader et Writer, la méthode Reset (), qui réinitialise les champs internes à leur état initial afin que la classe puisse être réutilisée. D'autres méthodes de travail sont appelées à l'aide de pseudo-façades.

Pour commencer sur la classe Pipe


L'instance de classe occupe 320 octets, ce qui est beaucoup (presque un tiers de kilo-octet, 2 de ces objets ne pouvaient pas tenir dans la mémoire de Manchester Mark I). Donc, l'allouer en grande quantité est une mauvaise idée. De plus, la signification de l'objet est destinée à une utilisation à long terme. L'utilisation de pools crée également un argument pour cette déclaration. Après tout, les objets utilisés dans la piscine vivront pour toujours (dans tous les cas, dans la norme).
Notez que la classe est marquée comme scellée et qu'elle est thread-safe - de nombreuses sections du code sont une section critique et sont entourées de verrous.
Pour commencer, créez une instance de la classe Pipe et obtenez les objets PipeReader et PipeWriter en utilisant les propriétés mentionnées.

Initialisation facile
var pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader; 


Considérez les méthodes de travail avec les tuyaux:
Pour l'enregistrement via PipeWriter - WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Pour lire via PipeReader - AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.

Comme indiqué dans l'article , la classe utilise une liste de tampons liés individuellement. Mais, évidemment, ils ne sont pas transmis entre PipeReader et PipeWriter - toute la logique est dans une classe. Cette liste est utilisée à la fois pour la lecture et l'écriture. De plus, les données retournées sont stockées dans cette liste.
Il existe également des objets qui indiquent le début des données à lire (ReadHead et index), la fin des données à lire (ReadTail et index) et le début de l'emplacement à écrire (WriteHead et le nombre d'octets mis en mémoire tampon écrits). Ici, ReadHead, ReadTail et WriteHead sont un segment spécifique de la liste et l'index indique une position spécifique dans le segment. Ainsi, l'enregistrement peut commencer au milieu d'un segment, capturer tout le segment suivant et se terminer au milieu du troisième. Ces pointeurs se déplacent de différentes manières.

Prise en main des méthodes PipeWriter


# 1 ValueTask <FlushResult> WriteAsync (source ReadOnlyMemory <byte>, CancellationToken cancelToken)


Juste cette méthode tentante. A une signature très appropriée et à la mode - accepte ReadOnlyMemory, asynchrone. Et beaucoup peuvent être tentés, surtout en se souvenant que la portée et la mémoire sont si rapides et cool. Mais ne vous flattez pas. Tout ce que cette méthode fait est de copier le ReadOnlyMemory qui lui est passé dans la liste interne. Et «copier» signifie un appel à la méthode CopyTo, et non la copie de l'objet lui-même. Autrement dit, toutes les données que nous voulons enregistrer seront copiées, chargeant ainsi la mémoire. Cette méthode ne doit être étudiée que pour s'assurer qu'il vaut mieux ne pas l'utiliser. Eh bien, et peut-être pour certaines situations rares, ce comportement est approprié.
Le corps de la méthode est une section critique dont l'accès est synchronisé via un moniteur.

Ensuite, la question peut se poser, comment écrire quelque chose, sinon par la méthode la plus évidente et la seule appropriée.

# 2 Mémoire <octet> GetMemory (int sizeHint)


La méthode prend un paramètre d'un type entier. Nous devons y indiquer combien d'octets nous voulons écrire (ou plus, mais en aucun cas moins). Cette méthode vérifie s'il y a suffisamment d'espace pour écrire dans le fragment de mémoire actuel stocké dans _writingHeadMemory. Si suffisant, _writingHeadMemory est retourné en tant que mémoire. Sinon, pour les données écrites dans le tampon, mais pour lesquelles la méthode FlushAsync n'a pas été appelée, elle est appelée et un autre BufferSegment est sélectionné, qui est connecté au précédent (voici la liste). En l'absence de _writingHeadMemory, il est initialisé avec un nouveau BufferSegment. Et l'allocation du prochain tampon est une section critique et se fait sous le verrou.
Je suggère de regarder un tel exemple. À première vue, il peut sembler que le compilateur (ou runtime) a séduit le démon.

Devilry
  var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length); //2048  4096 var pipeWithOptions = new Pipe(new PipeOptions(minimumSegmentSize: 5)); Memory<byte> memoryTwo = pipeWithOptions.Writer.GetMemory(2); Console.WriteLine(memoryTwo.Length); //16 


Mais tout dans cet exemple est compréhensible et simple.
Lors de la création d'une instance Pipe, nous pouvons passer un objet PipeOptions avec des options pour le créer au constructeur.

PipeOptions a un champ de taille de segment minimum par défaut. Il n'y a pas si longtemps, c'était 2048, mais ce commit a tout changé, maintenant 4096. Au moment d'écrire ces lignes, la version avec 4096 était un package préliminaire, dans la dernière version, c'était 2048. Cela explique le comportement du premier exemple. Si vous êtes critique quant à l'utilisation d'une taille plus petite pour le tampon standard, vous pouvez la spécifier dans une instance du type PipeOptions.

Mais dans le deuxième exemple, où la taille minimale est indiquée, la longueur ne correspond pas de toute façon. Et cela se produit déjà car la création d'un nouveau BufferSegment se produit à l'aide de pools. L'une des options de PipeOptions est le pool de mémoire. Après cela, le pool spécifié sera utilisé pour créer un nouveau segment. Si vous n'avez pas spécifié votre pool de mémoire, le ArrayPool standard sera utilisé, qui, comme vous le savez, a plusieurs compartiments pour différentes tailles de tableaux (chacun suivant est 2 fois plus grand que le précédent) et, lorsqu'on lui demande une taille spécifique, il recherche un compartiment avec des tableaux de taille appropriée (puis il y a le plus grand ou égal le plus proche). Par conséquent, le nouveau tampon sera certainement plus volumineux que ce que vous avez demandé. La taille minimale du tableau dans le ArrayPool standard (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) est de 16. Mais ne vous inquiétez pas, c'est un pool de tableaux. Par conséquent, dans la grande majorité des cas, la baie ne met pas à rude épreuve le ramasse-miettes et sera réutilisée.

# 2.5 Span <byte> GetSpan (int sizeHint)


Il fonctionne de la même façon, offrant une étendue de la mémoire.

Ainsi GetMemory () ou GetSpan () sont les principales méthodes d'écriture. Ils nous donnent un objet sur lequel nous pouvons écrire. Pour ce faire, nous n'avons pas besoin d'allouer de mémoire pour de nouveaux tableaux de valeurs, nous pouvons écrire directement dans la structure interne. Laquelle utiliser dépendra principalement de l'API que vous utilisez et de la méthode asynchrone. Cependant, au vu de ce qui précède, une question se pose. Comment le lecteur saura-t-il combien nous avons écrit? Si nous avons toujours utilisé une implémentation spécifique du pool, qui donne un tableau de la même taille que celle demandée, alors le lecteur pourrait lire la totalité du tampon à la fois. Cependant, comme nous l'avons déjà dit, on nous alloue un tampon avec une forte probabilité d'une taille plus grande. Cela conduit à la méthode suivante requise pour le fonctionnement.

# 3 void Advance (int octets)


Une terrible méthode simple. Il prend le nombre d'octets écrits comme argument. Ils incrémentent les compteurs internes - _unflushedBytes et _writingHeadBytesBuffered, dont les noms parlent d'eux-mêmes. Il tronque également _writingHeadMemory exactement au nombre d'octets écrits (à l'aide de la méthode Slice). Par conséquent, après avoir appelé cette méthode, vous devez demander un nouveau bloc de mémoire sous forme de mémoire ou d'étendue, vous ne pouvez pas écrire sur le précédent. Et le corps entier de la méthode est une section critique et s'exécute sous un verrou.

Il semblerait qu'après cela, le lecteur puisse recevoir des données. Mais une étape supplémentaire est nécessaire.

# 4 ValueTask <FlushResult> FlushAsync (CancellationToken cancelToken)


La méthode est appelée après que nous ayons écrit les données nécessaires dans la mémoire reçue et indiqué combien nous y avons écrit. La méthode renvoie une ValueTask, mais elle n'est pas asynchrone (contrairement à son descendant StreamPipeWriter). ValueTask est un type spécial (structure en lecture seule) utilisé dans le cas où la plupart des appels n'utiliseront pas l'asynchronie, c'est-à-dire que toutes les données nécessaires seront disponibles au moment de son appel et la méthode se terminera de manière synchrone. À l'intérieur, il contient des données ou une tâche (au cas où cela ne fonctionnerait pas de manière synchrone). Cela dépend de l'état de la propriété _writerAwaitable.IsCompleted. Si nous recherchons ce qui change l'état de cet objet en attente, nous verrons que cela se produit à la condition que la quantité de données non traitées (non consommées) (ce n'est pas exactement la même chose que non lues (non examinées), sera expliqué plus loin) dépasse un certain seuil (_pauseWriterThreshold). La valeur par défaut est 16 tailles de segment. Si vous le souhaitez, la valeur peut être modifiée dans PipeOptions. En outre, cette méthode démarre la continuation de la méthode ReadAsync, si une a été bloquée.

Renvoie un FlushResult contenant 2 propriétés - IsCanceled et IsCompleted. IsCanceled indique si le vidage a été annulé (appel CancelPendingFlush). IsCompleted indique si le PipeReader s'est terminé (en appelant les méthodes Complete () ou CompleteAsync ()).
La partie principale de la méthode est réalisée sous Locke Skywalker.

D'autres méthodes de PipeWriter ne présentent pas d'intérêt du point de vue de la mise en œuvre et sont utilisées beaucoup moins souvent, par conséquent, seule une brève description sera donnée.

# 5 void Complete (Exception exception = null) ou ValueTask CompleteAsync (Exception exception = null)


Marque le tuyau fermé pour écriture. Une fois terminé, une exception sera levée lors de la tentative d'utilisation des méthodes d'écriture. Si PipeReader est déjà terminé, l'instance de tuyau entière est également terminée. La plupart du travail se fait sous la serrure.

# 6 void CancelPendingFlush ()


Comme son nom l'indique, il termine l'opération FlushAsync () en cours. Il y a un lok.

# 7 void OnReaderCompleted (Action <Exception, objet> rappel, état de l'objet)


Exécute le délégué délégué à la fin du lecteur. Il y a aussi une serrure.
La documentation indique actuellement que cette méthode peut ne pas être appelée sur certains descendants de PipeWriter et sera supprimée à l'avenir. Par conséquent, vous ne devez pas lier la logique à ces méthodes.

Allez sur PipeReader


# 1 ValueTask <ReadResult> ReadAsync (jeton CancellationToken)


Ici, comme FlushAsync, une ValueTask est retournée, ce qui indique que la méthode est principalement synchrone, mais pas toujours. Dépend de l'état de _readerAwaitable. Comme avec FlushAsync, vous devez trouver quand _readerAwaitable est défini sur incomplet. Cela se produit lorsque PipeReader lit tout dans la liste (ou qu'il contient des données qui ont été marquées comme examinées et ont besoin de plus de données pour continuer). Ce qui, en fait, est logique. En conséquence, nous pouvons conclure qu'il est souhaitable d'affiner Pipe à votre travail, de définir toutes ses options de manière réfléchie, sur la base de statistiques identifiées empiriquement. Une configuration appropriée réduira la probabilité d'une branche d'exécution asynchrone et permettra un traitement plus efficace des données. Presque toute la méthode est entourée d'un verrou.

Renvoie un mystérieux ReadResult . En fait, il s'agit simplement d'un tampon + indicateurs indiquant l'état de l'opération (IsCanceled - si ReadAsync a été annulé et IsCompleted indiquant si le PipeWriter a été fermé). Dans ce cas, IsCompleted est une valeur indiquant si les méthodes PipeWriter Complete () ou CompleteAsync () ont été appelées. Si ces méthodes ont été appelées avec une exception, elles seront levées lors de la tentative de lecture.

Le tampon a de nouveau un type mystérieux - ReadOnlySequence . Ceci, à son tour, est un objet pour contenir des segments (ReadOnlySequenceSegment) des indices de début et de fin + début et fin à l'intérieur des segments correspondants. Ce qui ressemble en fait à la structure de la classe Pipe elle-même. Soit dit en passant, BufferSegment est le successeur de ReadOnlySequenceSegment, ce qui suggère qu'il y est utilisé. Grâce à cela, vous pouvez simplement vous débarrasser des allocations de mémoire inutiles pour le transfert de données de l'écrivain au lecteur.
ReadOnlySpan peut être obtenu à partir du tampon pour un traitement ultérieur. Pour terminer l'image, vous pouvez vérifier si le tampon contient un seul ReadOnlySpan. S'il contient, nous n'avons pas besoin d'itérer sur la collection d'un élément et nous pouvons l'obtenir en utilisant la propriété First. Sinon, vous devez parcourir tous les segments du tampon et traiter chaque ReadOnlySpan.

Sujet de discussion - dans la classe ReadOnlySequence, les types de référence nullables sont activement utilisés et il y a goto (pas pour quitter l'imbrication et pas dans le code généré) - en particulier

Après le traitement, vous devez indiquer clairement à l'instance Pipe que nous avons lu les données.

# 2 bool TryRead (résultat ReadResult)


Version synchrone. Vous permet d'obtenir le résultat s'il l'est. S'il n'est pas déjà là, contrairement à ReadAsync, il ne bloque pas, mais renvoie false. Aussi dans la serrure.

# 3 void AdvanceTo (SequencePosition consommé, SequencePosition examiné)


Dans cette méthode, vous pouvez spécifier combien d'octets nous lisons et combien traités. Les données lues mais non traitées seront renvoyées lors de la prochaine lecture. Cette fonctionnalité peut sembler étrange à première vue, mais lors du traitement d'un flux d'octets, il est rarement nécessaire de traiter chaque octet individuellement. En règle générale, les données sont échangées à l'aide de messages. Il peut arriver que le lecteur, lors de la lecture, reçoive un message entier et une partie du second. Le tout doit être traité, et une partie du second doit être laissée la prochaine fois afin qu'elle accompagne la partie restante. La méthode AdvanceTo accepte une SequencePosition, qui est en fait un segment + index. Lors du traitement de tout ce que ReadAsync a lu, vous pouvez spécifier buffer.End. Sinon, vous devrez créer explicitement une position, indiquant le segment et l'index auquel le traitement a été arrêté. Sous le capot lok.
De plus, si la quantité d'informations brutes est inférieure à la faille installée (_resumeWriterThreshold), il démarre la poursuite de PipeWriter s'il était bloqué. Par défaut, ce seuil est de 8 volumes de segment (la moitié du seuil de blocage).

# 4 void Complete (Exception exception = null)


Termine PipeReader. Si PipeWriter est terminé à ce stade, l'instance Pipe entière se termine. Verrouillez à l'intérieur.

# 5 void CancelPendingRead ()


Vous permet d'annuler la lecture actuellement attendue. Locke.

# 6 void OnWriterCompleted (Action <Exception, objet> rappel, état de l'objet)


Vous permet de spécifier le délégué à exécuter à la fin du PipeWriter.
Comme la méthode similaire pour PipeWriter, la documentation a la même note qui sera supprimée. Serrure sous le capot.

Exemple



La liste ci-dessous montre un exemple de travail avec des tuyaux.
Depuis l'introduction de .NET Core Span and Memory, de nombreuses classes pour travailler avec des données ont été complétées par des surcharges utilisant ces types. Le schéma d'interaction général sera donc à peu près le même. Dans mon exemple, j'ai utilisé des pipelines pour travailler avec des tuyaux (j'aime les mots racines), c'est-à-dire canaux - objets OS pour la communication interprocessus. L'API de canal vient d'être étendue en conséquence pour lire les données dans la portée et la mémoire. La version asynchrone utilise la mémoire, car la méthode asynchrone sera convertie en méthode de modèle à l'aide d'une machine à états finis générée automatiquement, dans laquelle toutes les variables locales et les paramètres de méthode sont stockés, et puisque Span est une structure en lecture seule ref, elle ne peut pas être sur le tas, respectivement, en utilisant Span dans une méthode asynchrone n'est pas possible. Mais il existe également une version synchrone de la méthode qui vous permet d'utiliser Span. Dans mon exemple, j'ai essayé les deux et il s'est avéré que la version synchrone dans cette situation se montre mieux. Lorsque vous l'utilisez, moins de récupération de place se produit et le traitement des données est plus rapide. Mais c'était uniquement parce qu'il y avait beaucoup de données. Dans le cas où une situation est probable dans laquelle il n'y aura pas de données au moment de la demande pour le prochain lot, vous devez utiliser la version asynchrone afin de ne pas forcer le processeur au repos.
L'exemple contient des commentaires qui expliquent certains points. J'attire votre attention sur le fait que malgré le fait que les fragments du programme responsable de la lecture du tube et du traitement soient séparés, lors de l'écriture dans un fichier, les données sont lues exactement de l'endroit où elles sont écrites lors de la lecture du tube.

Des années d'évolution pour une fonction puissante - Maine asynchrone
  class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } } 


Pipepatawriter
  public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested(); ////       Memory<T> //Memory<byte> buffer = _pipeWriter.GetMemory(); ////       Memory<T> ////         -       . //int readBytes = await _namedPipe.ReadAsync(buffer, token); //         PipeWriter Span //         -       . int readBytes = _namedPipe.Read(_pipeWriter.GetSpan()); //      ,        //         if (readBytes == 0) { await Task.Delay(500, token); continue; } // ,       _pipeWriter.Advance(readBytes); //  ,      PipeReader FlushResult result = await _pipeWriter.FlushAsync(token); //  PipeReader  ,       //        ,      if (result.IsCompleted) { break; } } //  _pipeWriter     Pipe _pipeWriter.Complete(); } } 


Processeur de données
  public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested(); //     Pipe ReadResult result = await _pipeReader.ReadAsync(token); ReadOnlySequence<byte> buffer = result.Buffer; //      await _bytesProcessor.ProcessBytesAsync(buffer, token); // ,      .       ,   //  ,               //    IBytesProcessor.ProcessBytesAsync   ,    _pipeReader.AdvanceTo(buffer.End); //  PipeWriter  ,      //      ,      if (result.IsCompleted) { break; } } //  _pipeReader     Pipe _pipeReader.Complete(); } } 


Bytesprocessor
  public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor { //,         IDisposable readonly FileStream _fileStream = new FileStream("buffer", FileMode.Create); public Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token) { if (bytesSequence.IsSingleSegment) { ProcessSingle(bytesSequence.First.Span); } else { foreach (var segment in bytesSequence) { ProcessSingle(segment.Span); } } return Task.CompletedTask; } private void ProcessSingle(ReadOnlySpan<byte> span) { _fileStream.Write(span); } } 

Source: https://habr.com/ru/post/fr464921/


All Articles