Hola lector Ha pasado mucho tiempo desde el lanzamiento de .NET Core 2.1. Y las innovaciones tan geniales como Span y Memory ya son ampliamente conocidas, puedes leer, ver y escuchar mucho sobre ellas. Sin embargo, desafortunadamente, la biblioteca llamada System.IO. Pipeslines no recibió la misma atención. Casi todo lo que hay sobre este tema es
la única publicación que se ha traducido y copiado en muchos recursos. Debería haber más información sobre esa tecnología para verla desde diferentes ángulos.

Introduccion
Entonces, esta biblioteca tiene como objetivo acelerar el procesamiento de la transmisión de datos. Originalmente fue creado y utilizado por el equipo de desarrollo de Kestrel (un servidor web multiplataforma para ASP.NET Core), pero actualmente está disponible para los mortales a través de un
paquete nuget .
Antes de profundizar en el tema, podemos imaginar el mecanismo de la biblioteca como un análogo mejorado de MemoryStream. El problema con el MemoryStream original es un número excesivo de copias, lo cual es obvio si recuerda que se utiliza una matriz de bytes privada dentro de MemoryStream como un búfer. Por ejemplo, en los métodos de
lectura y
escritura puede ver claramente la copia de datos. Por lo tanto, para el objeto que queremos escribir en la secuencia, se creará una copia en el búfer interno y, durante la lectura, se devolverá una copia de la copia interna al consumidor. Parece que no es el uso más racional de la memoria.
System.IO.Pipelines no tiene como objetivo reemplazar todas las transmisiones, es una herramienta adicional en el arsenal de un desarrollador que escribe código de alto rendimiento. Le sugiero que se familiarice con los métodos y clases básicos, vea los detalles de su implementación y analice ejemplos básicos.
Comencemos con los detalles internos y de implementación, al mismo tiempo mirando fragmentos de código simples. Después de eso, quedará claro cómo funciona y cómo debe usarse. Al trabajar con System.IO.Pipelines, vale la pena recordar que el concepto básico es que todas las operaciones de lectura-escritura deben realizarse sin asignaciones adicionales. Pero algunos métodos que son atractivos a primera vista contradicen esta regla. En consecuencia, el código que intenta acelerar tan rápidamente comienza a asignar memoria para datos nuevos y nuevos, cargando el recolector de basura.
Los componentes internos de la biblioteca utilizan las posibilidades más amplias de las últimas versiones del lenguaje y el tiempo de ejecución: Span, Memoria, agrupaciones de objetos, ValueTask, etc. Vale la pena buscar allí, al menos, un gran ejemplo del uso de estas características en la producción.
Hubo un tiempo en que algunos desarrolladores no estaban satisfechos con la implementación de secuencias en C #, porque una clase se usaba tanto para leer como para escribir. Pero como dicen, no puedes tirar métodos fuera de una clase. Incluso si la transmisión no admite lectura / escritura / búsqueda, se utilizan las propiedades CanRead, CanWrite y CanSeek. Parece una pequeña muleta. Pero ahora las cosas se vuelven diferentes.
Para trabajar con tuberías, se utilizan 2 clases:
PipeWriter y
PipeReader . Estas clases contienen aproximadamente 50 líneas de código y son pseudo-fachadas (no la más clásica de sus encarnaciones, ya que ocultan una sola clase, no muchas) para la clase
Pipe , que contiene toda la lógica básica para trabajar con datos. Esta clase contiene 5 miembros públicos: 2 constructores, 2 propiedades de solo obtención: Reader y Writer, el método Reset (), que restablece los campos internos a su estado inicial para que la clase pueda reutilizarse. Los métodos restantes para el trabajo son internos y se llaman utilizando pseudo-fachadas.
Comencemos con la clase de tubería
La instancia de clase ocupa 320 bytes, que es bastante (casi un tercio de kilobyte, 2 de esos objetos no cabían en la memoria de Manchester Mark I). Entonces, la asignación de una gran cantidad de sus instancias es una mala idea. Además, el objeto está destinado a un uso a largo plazo. El uso de grupos también hace un argumento para esta declaración. Los objetos utilizados en el grupo vivirán para siempre (para la implementación predeterminada del grupo).
Tenga en cuenta que la clase está marcada como sellada y que es segura para subprocesos: muchas secciones del código son una sección crítica y están envueltas en bloqueos.
Para comenzar a usar esta clase, debe crear una instancia de la clase Pipe y obtener los objetos PipeReader y PipeWriter utilizando las propiedades mencionadas.
Inicialización simplevar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
Considere los métodos para trabajar con tuberías:
Escribir con PipeWriter: WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Lectura con PipeReader: AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.
Como se indicó en la
publicación mencionada , la clase utiliza una lista de búferes vinculada individualmente. Pero, obviamente, no se transfieren entre PipeReader y PipeWriter: toda la lógica está en una clase. Esta lista se usa tanto para leer como para escribir. Además, los datos devueltos se almacenan en esta lista (por lo que no se realiza ninguna copia).
Además, hay objetos que indican el comienzo de los datos para leer (ReadHead e index), el final de los datos para leer (ReadTail e index) y el comienzo del espacio para escribir (WriteHead y el número de bytes almacenados en búfer escritos). Aquí ReadHead, ReadTail y WriteHead son miembros específicos (segmentos) de la lista interna de segmentos, y el índice indica una posición específica dentro del segmento. Por lo tanto, la grabación puede comenzar desde el medio de un segmento, capturar un segmento siguiente completo y terminar en el medio del tercero. Estos punteros se mueven en varios métodos.
Comenzando con los métodos PipeWriter
Eso se menciona atractivo a primera vista método. Tiene una firma muy adecuada y de moda: acepta ReadOnlyMemory, asíncrono. Y muchos pueden sentirse tentados, especialmente recordando que Span y Memory son tan rápidos y geniales. Pero no te hagas ilusiones. Todo lo que hace este método es copiar el ReadOnlyMemory pasado a la lista interna. Y por "copiar" se entiende una llamada al método CopyTo (), y no copiar solo el objeto en sí. Todos los datos que queremos grabar se copiarán, cargando así la memoria. Este método debe mencionarse solo para asegurarse de que es mejor no usarlo. Bueno, y tal vez para algunas situaciones raras, este comportamiento es apropiado.
El cuerpo del método es una sección crítica, el acceso al mismo se sincroniza a través de un monitor.
Entonces puede surgir la pregunta, cómo escribir algo, si no es a través del método más obvio y único adecuado
El método toma un parámetro de un tipo entero. En él, debemos indicar cuántos bytes queremos escribir en la tubería (qué tamaño del búfer queremos). Este método verifica si hay suficiente espacio para escribir en el fragmento de memoria actual almacenado en _writingHeadMemory. Si es suficiente, _writingHeadMemory se devuelve como la Memoria. De lo contrario, para los datos escritos en el búfer, pero para los cuales no se llamó al método FlushAsync, se llama y se asigna otro BufferSegment, que está conectado al anterior (aquí está nuestra lista interna). Si _writingHeadMemory es nulo, se inicializa con un nuevo BufferSegment. Y la asignación del búfer es una sección crítica y se realiza bajo el candado.
Sugiero mirar este ejemplo. A primera vista, puede parecer que el compilador (o tiempo de ejecución) ha engañado al demonio.
Diablo var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
Pero todo en este ejemplo es comprensible y simple.
Al crear una instancia de Pipe, podemos pasarle el objeto
PipeOptions en el constructor con opciones para crear.
PipeOptions tiene un campo de tamaño mínimo de segmento predeterminado. No hace mucho tiempo, era 2048, pero
esta confirmación ha actualizado este valor a 4096. Al momento de escribir este artículo, la versión 4096 estaba en prelanzamiento nuget-package, la última versión de lanzamiento tenía un valor de 2048. Esto explica el comportamiento del primer ejemplo. Si es crítico con el uso de un tamaño más pequeño para el búfer predeterminado, puede especificarlo en una instancia del tipo PipeOptions.
Pero en el segundo ejemplo, donde se especifica el tamaño mínimo, la longitud no coincide de todos modos. Y esto está sucediendo porque la creación de un nuevo BufferSegment se produce mediante grupos. Una de las opciones en PipeOptions es el grupo de memoria. Después de eso, el grupo especificado se usará para crear un nuevo segmento. Si no especificó el grupo de memoria, se utilizará el ArrayPool predeterminado, que, como ya sabe, tiene varios depósitos para diferentes tamaños de matrices (cada uno es 2 veces más grande que el anterior) y cuando se solicita para un determinado tamaño, busca un cubo con matrices de tamaño adecuado (es decir, el más grande o igual más cercano). En consecuencia, el nuevo búfer seguramente será más grande de lo que solicitó. El tamaño mínimo de matriz en el ArrayPool predeterminado (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) es 16. Pero no se preocupe, este es un grupo de matrices. En consecuencia, en la gran mayoría de los casos, la matriz no ejerce presión sobre el recolector de basura y se reutilizará más tarde.
Funciona de manera similar, dando Span from Memory.
Por lo tanto, GetMemory () o GetSpan () son los principales métodos para escribir. Nos dan un objeto en el que podemos escribir. Para hacer esto, no necesitamos asignar memoria para nuevas matrices de valores, podemos escribir directamente en la tubería. Cuál usar dependerá principalmente de la API que esté utilizando y del método asincrónico. Sin embargo, en vista de lo anterior, surge una pregunta. ¿Cómo sabrá el lector cuánto escribimos? Si siempre usamos una implementación específica del grupo, que proporciona una matriz del mismo tamaño que el solicitado, el lector podría leer todo el búfer de una vez. Sin embargo, como ya hemos dicho, se nos asigna un búfer con una alta probabilidad de un tamaño mayor. Esto lleva al siguiente método requerido para la operación.
Un método terriblemente simple. Toma el número de bytes escritos como argumento. Incrementan los contadores internos: _unflushedBytes y _writingHeadBytesBuffered, cuyos nombres hablan por sí mismos. También trunca (rebanadas) _writingHeadMemory exactamente a la cantidad de bytes escritos (usando el método Slice). Por lo tanto, después de llamar a este método, debe solicitar un nuevo bloque de memoria en forma de Memoria o Span, no puede escribir en el anterior. Y todo el cuerpo del método es una sección crítica y se ejecuta bajo un candado.
Parece que después de esto el lector puede recibir datos. Pero se necesita un paso más.
Se llama al método después de que escribimos los datos necesarios en la Memoria recibida (GetMemory) e indicamos cuánto escribimos allí (Avance). El método devuelve ValueTask, sin embargo, no es asíncrono (a diferencia de su descendiente StreamPipeWriter). ValueTask es un tipo especial (estructura de solo lectura) que se usa en el caso en que la mayoría de las llamadas no serán asíncronas, es decir, todos los datos necesarios estarán disponibles en el momento de su llamada y el método finalizará sincrónicamente. Dentro de sí mismo contiene datos o Tarea (en caso de que no funcionó sincrónicamente). Depende de la propiedad _writerAwaitable.IsCompleted. Si buscamos qué cambios cambia el estado de este _writerAwaitable, veremos que esto sucede si la cantidad de datos no consumidos (esto no es exactamente lo mismo que los datos no examinados se explicarán más adelante) excede un cierto umbral (_pauseWriterThreshold). El valor predeterminado es 16 tamaños de segmento. Si lo desea, el valor se puede cambiar en PipeOptions. Además, este método inicia la continuación del método ReadAsync, si uno fue bloqueado.
Devuelve un FlushResult que contiene 2 propiedades: IsCanceled e IsCompleted. IsCanceled indica si se ha cancelado Flush (llamada CancelPendingFlush ()). IsCompleted indica si PipeReader se ha completado (llamando a los métodos Complete () o CompleteAsync ()).
La parte principal del método se realiza bajo el candado.
Otros métodos de PipeWriter no son interesantes desde el punto de vista de la implementación y se usan con mucha menos frecuencia, por lo tanto, solo se dará una breve descripción.
# 5 void Complete (excepción de excepción = nulo) o ValueTask CompleteAsync (excepción de excepción = nulo)
Tubo de marcas cerrado para escritura. Se generará una excepción al intentar utilizar los métodos de escritura después de la finalización. Si PipeReader ya se ha completado, también se completará toda la instancia de Pipe. La mayor parte del trabajo se realiza bajo la cerradura.
# 6 nulo CancelPendingFlush ()
Como su nombre lo indica, cancela la operación actual de FlushAsync (). Hay una cerradura
# 7 void OnReaderCompleted (acción <excepción, objeto> devolución de llamada, estado del objeto)
Ejecuta el delegado pasado cuando el lector completa. También hay una cerradura.
En la
documentación actualmente se escribe que este método no se puede invocar en algunas implementaciones de PipeWriter y se eliminará en el futuro. Por lo tanto, no debe vincular la lógica a estos métodos.
Es hora de PipeReader
Aquí, como en FlushAsync (), se devuelve ValueTask, lo que sugiere que el método es principalmente sincrónico, pero no siempre. Depende del estado de _readerAwaitable. Al igual que con FlushAsync, debe encontrar cuándo _readerAwaitable está configurado como incompleto. Esto sucede cuando PipeReader ha leído todo de la lista interna (o contiene datos que se marcaron como examinados y necesita más datos para continuar). Lo cual, de hecho, es obvio. En consecuencia, podemos concluir que es deseable ajustar Pipe a su trabajo, establecer todas sus opciones cuidadosamente, en base a estadísticas identificadas empíricamente. La configuración adecuada reducirá la posibilidad de una rama de ejecución asincrónica y permitirá un procesamiento de datos más eficiente. Casi todo el código en todo el método está rodeado por un candado.
Devuelve un misterioso
ReadResult . De hecho, es solo un búfer + indicadores que muestran el estado de la operación (IsCanceled: si ReadAsync se canceló e IsCompleted indicando si PipeWriter estaba cerrado). IsCompleted es un valor que indica si se llamó a los métodos PipeWriter Complete () o CompleteAsync (). Si se llamó a estos métodos con una excepción aprobada, se lanzará al intentar leer.
Y nuevamente, el búfer tiene un tipo misterioso:
ReadOnlySequence . Esto, a su vez, es el objeto del contenido de los segmentos
(ReadOnlySequenceSegment) del principio y los índices final + inicio y final dentro de los segmentos correspondientes. Que en realidad se asemeja a la estructura de la propia clase Pipe. Por cierto, BufferSegment se hereda de ReadOnlySequenceSegment, lo que sugiere que BufferSegment se usa en esta secuencia. Gracias a esto, puede deshacerse de las asignaciones de memoria innecesarias para la transferencia de datos del escritor al lector.
ReadOnlySpan se puede obtener del búfer para su posterior procesamiento. Para completar la imagen, puede verificar si el búfer contiene un solo ReadOnlySpan. Si contiene, no necesitamos iterar sobre la colección de un elemento y podemos obtenerla usando la propiedad First. De lo contrario, es necesario revisar todos los segmentos en el búfer y procesar ReadOnlySpan de cada uno.
Tema de discusión: en la clase ReadOnlySequence, los tipos de referencia anulables se usan activamente y hay goto (no para anidación de bucle profundo y no en el código generado), en particular,
aquí .
Después del procesamiento, debe indicar a la instancia de Pipe que leemos los datos.
Versión sincrónica. Le permite obtener el resultado si existe. De lo contrario, a diferencia de ReadAsync, no bloquea y devuelve falso. También el código de este método está en la cerradura.
En este método, puede especificar cuántos bytes examinamos y consumimos. Los datos que han sido examinados pero no consumidos se devolverán la próxima vez que se lean. Esta característica puede parecer extraña a primera vista, pero cuando se procesa una secuencia de bytes, rara vez es necesario procesar cada byte individualmente. Por lo general, los datos se intercambian mediante mensajes. Puede surgir una situación en la que el lector, al leer, recibió un mensaje completo y parte del segundo. El todo debe ser procesado, y parte del segundo debe dejarse para el futuro, de modo que venga junto con la parte restante. El método AdvanceTo toma una SequencePosition, que en realidad es un segmento + índice. Al procesar todo lo que ReadAsync ha leído, puede especificar el búfer. Fin. De lo contrario, debe crear explícitamente una posición, indicando el segmento y el índice en el que se detuvo el procesamiento. La cerradura está debajo del capó.
Además, si la cantidad de información no consumida es inferior al umbral especificado (_resumeWriterThreshold), se inicia la continuación de PipeWriter si se bloqueó. Por defecto, este umbral es de 8 volúmenes de segmento (la mitad del umbral de bloqueo).
# 4 vacío completo (excepción de excepción = nulo)
Completa el PipeReader. Si PipeWriter se completa en este punto, se completa toda la instancia de Pipe. Bloquear adentro.
# 5 nulo CancelPendingRead ()
Le permite cancelar la lectura que está actualmente en estado pendiente. Cerradura
# 6 void OnWriterCompleted (Acción <Excepción, objeto> devolución de llamada, estado del objeto)
Le permite especificar el delegado que se ejecutará al completar PipeWriter.
Al igual que el método similar de PipeWriter, en la
documentación hay la misma etiqueta que se eliminará. La cerradura está debajo del capó.
Ejemplo
El listado a continuación muestra un ejemplo de trabajo con tuberías.
Desde la introducción de .NET Core Span y Memory, muchas clases para trabajar con datos se han complementado con sobrecargas que utilizan estos tipos. Entonces, el esquema general de interacción será aproximadamente el mismo. En mi ejemplo, usé tuberías para trabajar con tuberías (me gustan las palabras similares): objetos del sistema operativo para la comunicación entre procesos. La API de canalizaciones se ha ampliado en consecuencia para leer datos en Span y Memory. La versión asincrónica usa memoria, ya que el método asincrónico se convertirá en un método de plantilla usando una máquina de estados finitos autogenerada, en la que se almacenan todas las variables locales y los parámetros del método, y dado que Span es una estructura de solo lectura, no se puede colocar en el montón, respectivamente, usando Span en un método asincrónico es imposible. Pero también hay una versión sincrónica del método que le permite utilizar Span. En mi ejemplo, probé ambos y resultó que la versión síncrona en esta situación se muestra mejor. Al usarlo, se produce menos recolección de basura y el procesamiento de datos es más rápido. Pero esto fue solo porque había una gran cantidad de datos en la tubería (los datos siempre estaban disponibles). En la situación en la que es bastante probable que no haya datos al momento de solicitar el próximo lote, debe usar la versión asincrónica para no forzar el procesador inactivo.
El ejemplo tiene comentarios que explican algunos puntos. Le llamo la atención sobre el hecho de que, a pesar de que los fragmentos del programa responsable de la lectura desde la tubería y el procesamiento están separados, al escribir en un archivo, los datos se leen exactamente desde el lugar donde se escriben al leer desde el tubo
Años de evolución por el bien de una característica poderosa - principal asíncrono class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
Pipepatawriter public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
Procesador de datos public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
Procesador de bytes public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {