System.IO.Pipelines: una herramienta poco conocida para los amantes del alto rendimiento

Hola lector Ha pasado mucho tiempo desde el lanzamiento de .NET Core 2.1. Y las innovaciones tan geniales como Span y Memory ya se han considerado ampliamente, puede leer, ver y escuchar mucho sobre ellas. Sin embargo, desafortunadamente, una biblioteca llamada System.IO.Pipelines no recibió la misma atención. Casi todo lo que se trata sobre este tema es la única publicación que muchos han traducido y publicado en casa. Definitivamente debería haber más información para que los interesados ​​puedan ver la tecnología desde diferentes ángulos.



Introduccion


Por lo tanto, esta biblioteca tiene como objetivo acelerar el trabajo con el procesamiento de datos de transmisión. Originalmente fue creado y utilizado por el equipo de desarrollo de Kestrel (un servidor web multiplataforma para ASP.NET Core), pero actualmente se entrega a través de un paquete nuget separado.
Antes de profundizar en el tema, podemos imaginar el mecanismo de la biblioteca como un análogo mejorado de MemoryStream. El problema con el MemoryStream original es un número excesivo de copias, lo cual es obvio si recuerda que una matriz de bytes privada se usa dentro como un búfer. Por ejemplo, en los métodos de lectura y escritura , la copia es claramente visible. Por lo tanto, para el objeto que queremos escribir en la secuencia, se creará una copia en el búfer interno y, durante la lectura, se entregará una copia de la copia interna al consumidor. Parece que no es el uso más racional del espacio.
System.IO.Pipelines no tiene como objetivo reemplazar todas las transmisiones, es una herramienta adicional en el arsenal de un desarrollador que escribe código de alto rendimiento. Le sugiero que se familiarice con los métodos y clases básicos, vea cómo están organizados en su interior y analice ejemplos básicos.

Comencemos con el dispositivo interno, al mismo tiempo examinando fragmentos de código simples. Después de eso, quedará claro qué y cómo funciona, y cómo debe usarse. Cuando trabaje con System.IO.Pipelines, vale la pena recordar que el concepto básico es que todas las operaciones de lectura-escritura deben realizarse sin asignaciones adicionales. Pero algunos métodos que son atractivos a primera vista contradicen esta regla. En consecuencia, el código que está tratando de acelerar comienza a asignar memoria para datos nuevos y nuevos, cargando el recolector de basura.

La biblioteca interna de la biblioteca utiliza las posibilidades más amplias de las últimas versiones del idioma y el intervalo de tiempo, Span, Memoria, agrupaciones de objetos, ValueTask, etc. Vale la pena ver, al menos, un gran ejemplo del uso de estas funciones en producción.
Hubo un tiempo en que algunos no estaban contentos con la implementación de secuencias en C #, porque una clase se usaba tanto para leer como para escribir. Pero, como dicen, no puedes tirar métodos fuera de una clase. Incluso si la transmisión no admitía leer / escribir / mover el puntero, las propiedades CanRead, CanWrite y CanSeek entraron en vigencia, lo que parecía una pequeña muleta. Aquí las cosas son diferentes.
Para trabajar con tuberías, se utilizan 2 clases: PipeWriter y PipeReader . Estas clases contienen aproximadamente 50 líneas cada una y son pseudo-fachadas (no sus encarnaciones más clásicas, ya que hay una sola clase oculta detrás de ellas, y no muchas) para la clase Pipe , que contiene toda la lógica básica para trabajar con datos. De los miembros públicos, 2 constructores, 2 propiedades de solo obtención: Reader y Writer, el método Reset (), que restablece los campos internos a su estado inicial para que la clase pueda reutilizarse. Otros métodos de trabajo se llaman utilizando pseudo-fachadas.

Para comenzar en la clase Pipe


La instancia de clase ocupa 320 bytes, que es bastante (casi un tercio de kilobyte, 2 de esos objetos no cabían en la memoria de Manchester Mark I). Por lo tanto, asignarlo en grandes cantidades es una mala idea. Además, el significado del objeto está destinado a un uso a largo plazo. El uso de grupos también hace un argumento para esta declaración. Después de todo, los objetos utilizados en el grupo vivirán para siempre (en cualquier caso, en el estándar).
Tenga en cuenta que la clase está marcada como sellada y que es segura para subprocesos: muchas secciones del código son una sección crítica y están envueltas en bloqueos.
Para comenzar, debe crear una instancia de la clase Pipe y obtener los objetos PipeReader y PipeWriter utilizando las propiedades mencionadas.

Fácil inicialización
var pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader; 


Considere los métodos para trabajar con tuberías:
Para grabar a través de PipeWriter: WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Para leer a través de PipeReader: AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.

Como se indica en la publicación , la clase utiliza una lista de búferes vinculada individualmente. Pero, obviamente, no se pasan entre PipeReader y PipeWriter: toda la lógica está en una clase. Esta lista se usa tanto para leer como para escribir. Además, los datos devueltos se almacenan en esta lista.
También hay objetos que indican el comienzo de los datos para leer (ReadHead e index), el final de los datos para leer (ReadTail e index) y el comienzo del lugar para escribir (WriteHead y la cantidad de bytes almacenados en buffer). Aquí ReadHead, ReadTail y WriteHead son un segmento específico de la lista, y el índice indica una posición específica dentro del segmento. Por lo tanto, la grabación puede comenzar desde el medio de un segmento, capturar todo el siguiente segmento y terminar en el medio del tercero. Estos punteros se mueven en varios métodos.

Comenzando con los métodos PipeWriter


# 1 ValueTask <FlushResult> WriteAsync (fuente ReadOnlyMemory <byte>, CancellationToken cancellationToken)


Solo ese método tentador. Tiene una firma muy adecuada y moderna: acepta ReadOnlyMemory, asíncrono. Y muchos pueden sentirse tentados, especialmente recordando que Span y Memory son tan rápidos y geniales. Pero no te hagas ilusiones. Todo lo que este método hace es copiar el ReadOnlyMemory pasado a la lista interna. Y "copiar" significa una llamada al método CopyTo, y no copiar el objeto en sí. Es decir, todos los datos que queremos grabar se copiarán, cargando así la memoria. Este método debe estudiarse solo para asegurarse de que es mejor no usarlo. Bueno, y tal vez para algunas situaciones raras, este comportamiento es apropiado.
El cuerpo del método es una sección crítica, el acceso al mismo se sincroniza a través de un monitor.

Entonces puede surgir la pregunta, cómo escribir algo, si no a través del método más obvio y único adecuado.

# 2 Memoria <byte> GetMemory (int sizeHint)


El método toma un parámetro de un tipo entero. En él debemos indicar cuántos bytes queremos escribir (o más, pero en ningún caso menos). Este método verifica si hay suficiente espacio para escribir en el fragmento de memoria actual almacenado en _writingHeadMemory. Si es suficiente, _writingHeadMemory se devuelve como Memoria. De lo contrario, para los datos escritos en el búfer, pero para los cuales no se llamó al método FlushAsync, se llama y se selecciona otro BufferSegment, que está conectado al anterior (aquí está la lista). En ausencia de _writingHeadMemory, se inicializa con un nuevo BufferSegment. Y la asignación del próximo búfer es una sección crítica y se realiza bajo el candado.
Sugiero un vistazo a tal ejemplo. A primera vista, puede parecer que el compilador (o tiempo de ejecución) ha engañado al demonio.

Diablo
  var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length); //2048  4096 var pipeWithOptions = new Pipe(new PipeOptions(minimumSegmentSize: 5)); Memory<byte> memoryTwo = pipeWithOptions.Writer.GetMemory(2); Console.WriteLine(memoryTwo.Length); //16 


Pero todo en este ejemplo es comprensible y simple.
Al crear una instancia de Pipe, podemos pasar un objeto PipeOptions con opciones para crearlo al constructor.

PipeOptions tiene un campo de tamaño mínimo de segmento predeterminado. No hace mucho tiempo era 2048, pero esta confirmación cambió todo, ahora 4096. Al momento de escribir, la versión con 4096 era un paquete de prelanzamiento, en la última versión de lanzamiento era 2048. Esto explica el comportamiento del primer ejemplo. Si es crítico de usar un tamaño más pequeño para el búfer estándar, puede especificarlo en una instancia del tipo PipeOptions.

Pero en el segundo ejemplo, donde se indica el tamaño mínimo, la longitud no coincide de todos modos. Y esto ya está sucediendo porque la creación de un nuevo BufferSegment se produce mediante grupos. Una de las opciones en PipeOptions es el grupo de memoria. Después de eso, el grupo especificado se usará para crear un nuevo segmento. Si no especificó su grupo de memoria, se utilizará el ArrayPool estándar, que, como sabe, tiene varios cubos para diferentes tamaños de matrices (cada uno es 2 veces más grande que el anterior) y, cuando se le solicita un tamaño específico, busca un cubo con matrices de tamaño adecuado (luego existe el mayor o igual más cercano). En consecuencia, el nuevo búfer seguramente será más grande de lo que solicitó. El tamaño mínimo de matriz en el ArrayPool estándar (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) es 16. Pero no se preocupe, porque se trata de un conjunto de matrices. En consecuencia, en la gran mayoría de los casos, la matriz no ejerce presión sobre el recolector de basura y se reutilizará.

# 2.5 Span <byte> GetSpan (int sizeHint)


Funciona de manera similar, dando Span from Memory.

Por lo tanto, GetMemory () o GetSpan () son los principales métodos para escribir. Nos dan un objeto en el que podemos escribir. Para hacer esto, no necesitamos asignar memoria para nuevas matrices de valores, podemos escribir directamente en la estructura interna. Cuál usar dependerá principalmente de la API que esté utilizando y del método asincrónico. Sin embargo, en vista de lo anterior, surge una pregunta. ¿Cómo sabrá el lector cuánto escribimos? Si siempre usamos una implementación específica del grupo, que proporciona una matriz del mismo tamaño que el solicitado, el lector podría leer todo el búfer de una vez. Sin embargo, como ya hemos dicho, se nos asigna un búfer con una alta probabilidad de un tamaño mayor. Esto lleva al siguiente método requerido para la operación.

# 3 void Advance (int bytes)


Un método simple y terrible. Toma el número de bytes escritos como argumento. Incrementan los contadores internos: _unflushedBytes y _writingHeadBytesBuffered, cuyos nombres hablan por sí mismos. También trunca _writingHeadMemory exactamente al número de bytes escritos (usando el método Slice). Por lo tanto, después de llamar a este método, debe solicitar un nuevo bloque de memoria en forma de Memoria o Span, no puede escribir en el anterior. Y todo el cuerpo del método es una sección crítica y se ejecuta bajo un candado.

Parece que después de esto el lector puede recibir datos. Pero se necesita un paso más.

# 4 ValueTask <FlushResult> FlushAsync (CancellationToken cancellationToken)


Se llama al método después de que escribimos los datos necesarios en la Memoria recibida e indicamos cuánto escribimos allí. El método devuelve una ValueTask, sin embargo, no es asíncrona (a diferencia de su descendiente StreamPipeWriter). ValueTask es un tipo especial (estructura de solo lectura) que se usa en el caso de que la mayoría de las llamadas no usen la asincronía, es decir, todos los datos necesarios estarán disponibles en el momento de su llamada y el método finalizará de forma sincrónica. En el interior, contiene datos o Tarea (en caso de que no funcionó sincrónicamente). Depende del estado de la propiedad _writerAwaitable.IsCompleted. Si buscamos qué cambia el estado de este objeto en espera, veremos que esto sucede bajo la condición de que la cantidad de datos no procesados ​​(no consumidos) (esto no es exactamente lo mismo que no leídos (no examinados), se explicará más adelante) excede un cierto umbral (_pauseWriterThreshold). El valor predeterminado es 16 tamaños de segmento. Si lo desea, el valor se puede cambiar en PipeOptions. Además, este método inicia la continuación del método ReadAsync, si uno fue bloqueado.

Devuelve un FlushResult que contiene 2 propiedades: IsCanceled e IsCompleted. IsCanceled indica si Flush ha sido cancelado (llamada CancelPendingFlush). IsCompleted indica si PipeReader se completó (llamando a los métodos Complete () o CompleteAsync ()).
La parte principal del método se realiza bajo Locke Skywalker.

Otros métodos de PipeWriter no son interesantes desde el punto de vista de la implementación y se usan con mucha menos frecuencia, por lo tanto, solo se proporcionará una breve descripción.

# 5 void Complete (excepción de excepción = nulo) o ValueTask CompleteAsync (excepción de excepción = nulo)


Tubo de marcas cerrado para escritura. Al finalizar, se lanzará una excepción al intentar utilizar los métodos para escribir. Si PipeReader ya se ha completado, también se completará toda la instancia de Pipe. La mayor parte del trabajo se realiza bajo la cerradura.

# 6 nulo CancelPendingFlush ()


Como su nombre lo indica, completa la operación actual de FlushAsync (). Hay un lok.

# 7 void OnReaderCompleted (acción <excepción, objeto> devolución de llamada, estado del objeto)


Ejecuta el delegado delegado cuando el lector completa. También hay una cerradura.
La documentación actualmente dice que este método no se puede invocar en algunos descendientes de PipeWriter y se eliminará en el futuro. Por lo tanto, no debe vincular la lógica a estos métodos.

Ir a PipeReader


# 1 ValueTask <ReadResult> ReadAsync (token CancellationToken)


Aquí, como FlushAsync, se devuelve una ValueTask, que sugiere que el método es principalmente sincrónico, pero no siempre. Depende del estado de _readerAwaitable. Al igual que con FlushAsync, debe encontrar cuándo _readerAwaitable está configurado como incompleto. Esto sucede cuando PipeReader lee todo de la lista (o contiene datos que se marcaron como examinados y necesita más datos para continuar). Lo cual, de hecho, es lógico. En consecuencia, podemos concluir que es deseable ajustar Pipe a su trabajo, establecer todas sus opciones cuidadosamente, en base a estadísticas identificadas empíricamente. La configuración adecuada reducirá la probabilidad de una bifurcación de ejecución asincrónica y permitirá un procesamiento de datos más eficiente. Casi todo el método está rodeado por una cerradura.

Devuelve un misterioso ReadResult . De hecho, es solo un búfer + indicadores que muestran el estado de la operación (IsCanceled: si ReadAsync se canceló e IsCompleted indicando si PipeWriter estaba cerrado). En este caso, IsCompleted es un valor que indica si se llamó a los métodos PipeWriter Complete () o CompleteAsync (). Si se llamaron a estos métodos con una excepción, se lanzarán al intentar leer.

El búfer nuevamente tiene un tipo misterioso: ReadOnlySequence . Esto, a su vez, es un objeto para contener segmentos (ReadOnlySequenceSegment) de los índices de inicio y fin + inicio y fin dentro de los segmentos correspondientes. Que en realidad se asemeja a la estructura de la propia clase Pipe. Por cierto, BufferSegment es el sucesor de ReadOnlySequenceSegment, lo que sugiere que se usa allí. Gracias a esto, puede deshacerse de las asignaciones de memoria innecesarias para la transferencia de datos del escritor al lector.
ReadOnlySpan se puede obtener del búfer para su posterior procesamiento. Para completar la imagen, puede verificar si el búfer contiene un solo ReadOnlySpan. Si contiene, no necesitamos iterar sobre la colección de un elemento y podemos obtenerla usando la propiedad First. De lo contrario, debe revisar todos los segmentos en el búfer y procesar cada ReadOnlySpan.

Tema de discusión: en la clase ReadOnlySequence, los tipos de referencia anulables se usan activamente y hay goto (no para salir de la anidación y no en el código generado), en particular, aquí

Después del procesamiento, debe dejar en claro a la instancia de Pipe que hemos leído los datos.

# 2 bool TryRead (resultado de ReadResult)


Versión sincrónica. Le permite obtener el resultado si es así. Si aún no está allí, a diferencia de ReadAsync, no bloquea, pero devuelve falso. También en la cerradura.

# 3 void AdvanceTo (SequencePosition consumido, SequencePosition examinado)


En este método, puede especificar cuántos bytes leemos y cuántos procesamos. Los datos que hayan sido leídos pero no procesados ​​se devolverán la próxima vez que se lean. Esta característica puede parecer extraña a primera vista, pero cuando se procesa una secuencia de bytes, rara vez es necesario procesar cada byte individualmente. Por lo general, los datos se intercambian mediante mensajes. Puede surgir una situación en la que el lector, al leer, recibió un mensaje completo y parte del segundo. El todo debe ser procesado, y parte del segundo debe dejarse la próxima vez para que venga junto con la parte restante. El método AdvanceTo acepta una SequencePosition, que en realidad es un segmento + índice. Al procesar todo lo que ReadAsync ha leído, puede especificar el búfer. Fin. De lo contrario, tendrá que crear explícitamente una posición, indicando el segmento y el índice en el que se detuvo el procesamiento. Debajo del capó lok.
Además, si la cantidad de información en bruto es menor que la falla instalada (_resumeWriterThreshold), se inicia la continuación de PipeWriter si se bloqueó. Por defecto, este umbral es de 8 volúmenes de segmento (la mitad del umbral de bloqueo).

# 4 vacío completo (excepción de excepción = nulo)


Completa PipeReader. Si PipeWriter está completo en este punto, la instancia completa de Pipe finaliza. Bloquear adentro.

# 5 nulo CancelPendingRead ()


Le permite cancelar la lectura que se espera actualmente. Locke

# 6 void OnWriterCompleted (Acción <Excepción, objeto> devolución de llamada, estado del objeto)


Le permite especificar el delegado que se ejecutará cuando PipeWriter se complete.
Al igual que el método similar para PipeWriter, la documentación tiene la misma nota que se eliminará. Bloqueo debajo del capó.

Ejemplo



El listado a continuación muestra un ejemplo de trabajo con tuberías.
Desde la introducción de .NET Core Span y Memory, muchas clases para trabajar con datos se han complementado con sobrecargas que utilizan estos tipos. Entonces, el esquema general de interacción será aproximadamente el mismo. En mi ejemplo, usé tuberías para trabajar con tuberías (me gustan las palabras raíz), es decir. canales: objetos del sistema operativo para la comunicación entre procesos. El canal API se ha ampliado en consecuencia para leer datos en Span y Memory. La versión asincrónica usa memoria, ya que el método asincrónico se convertirá en un método de plantilla usando una máquina de estados finitos autogenerada, en la que se almacenan todas las variables locales y los parámetros del método, y dado que Span es una estructura de solo lectura, no puede estar en el montón, respectivamente, usando Span en un método asincrónico no es posible. Pero también hay una versión sincrónica del método que le permite utilizar Span. En mi ejemplo, probé ambos y resultó que la versión síncrona en esta situación se muestra mejor. Al usarlo, se produce menos recolección de basura y el procesamiento de datos es más rápido. Pero esto fue solo porque había muchos datos. En el caso de que sea probable una situación en la que no habrá datos al momento de solicitar el siguiente lote, debe usar la versión asincrónica para no forzar el procesador inactivo.
El ejemplo tiene comentarios que explican algunos puntos. Le llamo la atención sobre el hecho de que, a pesar de que los fragmentos del programa responsable de la lectura desde la tubería y el procesamiento están separados, al escribir en un archivo, los datos se leen exactamente desde el lugar donde se escriben al leer desde la tubería.

Años de evolución por el bien de una característica poderosa: Maine asíncrono
  class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } } 


Pipepatawriter
  public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested(); ////       Memory<T> //Memory<byte> buffer = _pipeWriter.GetMemory(); ////       Memory<T> ////         -       . //int readBytes = await _namedPipe.ReadAsync(buffer, token); //         PipeWriter Span //         -       . int readBytes = _namedPipe.Read(_pipeWriter.GetSpan()); //      ,        //         if (readBytes == 0) { await Task.Delay(500, token); continue; } // ,       _pipeWriter.Advance(readBytes); //  ,      PipeReader FlushResult result = await _pipeWriter.FlushAsync(token); //  PipeReader  ,       //        ,      if (result.IsCompleted) { break; } } //  _pipeWriter     Pipe _pipeWriter.Complete(); } } 


Procesador de datos
  public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested(); //     Pipe ReadResult result = await _pipeReader.ReadAsync(token); ReadOnlySequence<byte> buffer = result.Buffer; //      await _bytesProcessor.ProcessBytesAsync(buffer, token); // ,      .       ,   //  ,               //    IBytesProcessor.ProcessBytesAsync   ,    _pipeReader.AdvanceTo(buffer.End); //  PipeWriter  ,      //      ,      if (result.IsCompleted) { break; } } //  _pipeReader     Pipe _pipeReader.Complete(); } } 


Procesador de bytes
  public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor { //,         IDisposable readonly FileStream _fileStream = new FileStream("buffer", FileMode.Create); public Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token) { if (bytesSequence.IsSingleSegment) { ProcessSingle(bytesSequence.First.Span); } else { foreach (var segment in bytesSequence) { ProcessSingle(segment.Span); } } return Task.CompletedTask; } private void ProcessSingle(ReadOnlySpan<byte> span) { _fileStream.Write(span); } } 

Source: https://habr.com/ru/post/464921/


All Articles