Olá leitor. Muito tempo se passou desde o lançamento do .NET Core 2.1. E inovações legais como Span e Memory já são amplamente conhecidas, você pode ler, ver e ouvir muito sobre elas. No entanto, infelizmente, a biblioteca chamada System.IO Pipeslines não recebeu a mesma atenção. Quase tudo o que existe neste tópico é
a única publicação que foi traduzida e copiada em muitos recursos. Deve haver mais informações sobre essa tecnologia para analisá-la de diferentes ângulos.

1. Introdução
Portanto, esta biblioteca tem como objetivo acelerar o processamento de dados de streaming. Ele foi originalmente criado e usado pela equipe de desenvolvimento do Kestrel (um servidor Web de plataforma cruzada para o ASP.NET Core), mas atualmente está disponível para mortais através de um
pacote nuget .
Antes de nos aprofundarmos no tópico, podemos imaginar o mecanismo da biblioteca como um análogo aprimorado do MemoryStream. O problema com o MemoryStream original é um número excessivo de cópias, o que é óbvio se você se lembrar de que uma matriz de bytes privada é usada dentro do MemoryStream como um buffer. Por exemplo, nos métodos de
leitura e
gravação , você pode ver claramente a cópia dos dados. Assim, para o objeto que queremos gravar no fluxo, uma cópia será criada no buffer interno e, durante a leitura, uma cópia da cópia interna será devolvida ao consumidor. Parece que não é o uso mais racional da memória.
O System.IO.Pipelines não tem como objetivo substituir todos os fluxos, é uma ferramenta adicional no arsenal de um desenvolvedor que escreve código de alto desempenho. Sugiro que você se familiarize com os métodos e classes básicos, veja os detalhes de implementação e analise exemplos básicos.
Vamos começar com os detalhes internos e de implementação, ao mesmo tempo analisando fragmentos de código simples. Depois disso, ficará claro como funciona e como deve ser usado. Ao trabalhar com System.IO.Pipelines, vale lembrar que o conceito básico é que todas as operações de leitura e gravação devem ocorrer sem alocações adicionais. Mas alguns métodos atraentes à primeira vista contradizem essa regra. Assim, o código que você está tentando acelerar com tanta intensidade começa a alocar memória para dados novos e novos, carregando o coletor de lixo.
Os internos da biblioteca usam as mais amplas possibilidades das versões mais recentes do idioma e do tempo de execução - Span, Memória, pools de objetos, ValueTask e assim por diante. Vale a pena procurar pelo menos um ótimo exemplo de uso desses recursos na produção.
Ao mesmo tempo, alguns desenvolvedores não estavam satisfeitos com a implementação de fluxos em C #, porque uma classe era usada para leitura e gravação. Mas como eles dizem, você não pode jogar métodos fora de uma classe. Mesmo se o fluxo não suportar leitura / gravação / busca, as propriedades CanRead, CanWrite e CanSeek serão usadas. Parece uma pequena muleta. Mas agora as coisas se tornam diferentes.
Para trabalhar com pipelines, são usadas 2 classes:
PipeWriter e
PipeReader . Essas classes contêm aproximadamente 50 linhas de código e são pseudo-fachadas (não a mais clássica de suas encarnações, pois ocultam uma única classe, não muito) para a classe
Pipe , que contém toda a lógica básica para trabalhar com dados. Esta classe contém 5 membros públicos: 2 construtores, 2 propriedades get-only - Reader e Writer, o método Reset (), que redefine os campos internos para seu estado inicial para que a classe possa ser reutilizada. Os demais métodos de trabalho são internos e denominados usando pseudo-fachadas.
Vamos começar com a classe de pipe
A instância da classe ocupa 320 bytes, o que é bastante (quase um terço de um kilobyte, dois desses objetos não cabiam na memória do Manchester Mark I). Portanto, alocar uma grande quantidade de instâncias é uma má ideia. Além disso, o objeto é destinado ao uso a longo prazo. O uso de pools também cria um argumento para esta declaração. Os objetos usados no pool permanecerão para sempre (para a implementação do pool padrão).
Observe que a classe está marcada como lacrada e é segura para threads - muitas seções do código são críticas e estão envolvidas em bloqueios.
Para começar a usar essa classe, você deve criar uma instância da classe Pipe e obter os objetos PipeReader e PipeWriter usando as propriedades mencionadas.
Inicialização simplesvar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
Considere os métodos para trabalhar com tubos:
Gravando com PipeWriter - WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Leitura com PipeReader - AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.
Conforme declarado no
post mencionado , a classe usa uma lista de buffers vinculada individualmente. Mas, obviamente, eles não são transferidos entre o PipeReader e o PipeWriter - toda a lógica está em uma classe. Esta lista é usada para leitura e escrita. Além disso, os dados retornados são armazenados nesta lista (para que nenhuma cópia seja executada).
Além disso, existem objetos indicando o início dos dados a serem lidos (ReadHead e índice), o final dos dados a serem lidos (ReadTail e índice) e o início do espaço a ser gravado (WriteHead e o número de bytes armazenados em buffer gravados). Aqui, ReadHead, ReadTail e WriteHead são membros (segmentos) específicos da lista interna de segmentos e o índice indica uma posição específica dentro do segmento. Assim, a gravação pode começar no meio de um segmento, capturar um próximo segmento inteiro e terminar no meio do terceiro. Esses ponteiros são movidos em vários métodos.
Introdução aos métodos PipeWriter
Isso é mencionado atraente à primeira vista. Possui uma assinatura muito adequada e moderna - aceita ReadOnlyMemory, assíncrono. E muitos podem ficar tentados, principalmente lembrando que Span e Memory são tão rápidos e legais. Mas não se iluda. Tudo o que esse método faz é copiar o ReadOnlyMemory passado para ele na lista interna. E por "cópia" entende-se uma chamada para o método CopyTo (), e não copiar apenas o próprio objeto. Todos os dados que queremos gravar serão copiados, carregando assim a memória. Este método deve ser mencionado apenas para garantir que é melhor não usá-lo. Bem, e talvez para algumas situações raras, esse comportamento é apropriado.
O corpo do método é uma seção crítica, o acesso a ele é sincronizado através de um monitor.
Em seguida, pode surgir a pergunta: como escrever algo, se não através do método mais óbvio e único adequado
O método usa um parâmetro de um tipo inteiro. Nele, devemos indicar quantos bytes queremos gravar no pipeline (qual o tamanho do buffer que queremos). Este método verifica se há espaço suficiente para gravação no fragmento de memória atual armazenado em _writingHeadMemory. Se suficiente, _writingHeadMemory é retornado como a memória. Caso contrário, para os dados gravados no buffer, mas para os quais o método FlushAsync não foi chamado, ele será chamado e outro BufferSegment será alocado, que será conectado ao anterior (aqui está nossa lista interna). Se _writingHeadMemory for nulo, ele será inicializado com um novo BufferSegment. E a alocação do buffer é uma seção crítica e é feita sob o bloqueio.
Eu sugiro olhar para um exemplo. À primeira vista, pode parecer que o compilador (ou tempo de execução) tenha enganado o demônio.
Devilry var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
Mas tudo neste exemplo é compreensível e simples.
Ao criar uma instância de Pipe, podemos passar o objeto
PipeOptions para ele no construtor com opções para criação.
PipeOptions possui um campo de tamanho de segmento mínimo padrão. Há pouco tempo, era 2048, mas
esse commit atualizou esse valor para 4096. No momento da redação deste artigo, a versão 4096 estava no nuget-package de pré-lançamento, a última versão possuía o valor 2048. Isso explica o comportamento do primeiro exemplo. Se você for crítico ao usar um tamanho menor para o buffer padrão, poderá especificá-lo em uma instância do tipo PipeOptions.
Mas no segundo exemplo, onde o tamanho mínimo é especificado, o comprimento não corresponde a ele. E isso está acontecendo porque a criação de um novo BufferSegment ocorre usando pools. Uma das opções no PipeOptions é o pool de memória. Depois disso, o pool especificado será usado para criar um novo segmento. Se você não especificou o conjunto de memórias, será utilizado o ArrayPool padrão, que, como você sabe, possui vários buckets para tamanhos diferentes de arrays (cada um deles é 2 vezes maior que o anterior) e quando é solicitado para um determinado tamanho, ele procura um balde com matrizes de tamanho adequado (ou seja, o maior ou maior o mais próximo). Consequentemente, o novo buffer quase certamente será maior do que o solicitado. O tamanho mínimo da matriz no ArrayPool padrão (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) é 16. Mas não se preocupe, esse é um conjunto de matrizes. Dessa forma, na grande maioria dos casos, a matriz não exerce pressão sobre o coletor de lixo e será reutilizada posteriormente.
Funciona da mesma forma, fornecendo Span from Memory.
Portanto, GetMemory () ou GetSpan () são os principais métodos de gravação. Eles nos dão um objeto para o qual podemos escrever. Para fazer isso, não precisamos alocar memória para novas matrizes de valores, podemos escrever diretamente no canal. Qual usar dependerá principalmente da API que você está usando e do método de assincronia. No entanto, tendo em conta o que precede, surge uma questão. Como o leitor saberá quanto escrevemos? Se sempre usamos uma implementação específica do pool, que fornece uma matriz exatamente do mesmo tamanho que o solicitado, o leitor pode ler o buffer inteiro de uma só vez. No entanto, como já dissemos, temos um buffer com alta probabilidade de tamanho maior. Isso leva ao seguinte método necessário para a operação.
Um método terrivelmente simples. Leva o número de bytes escritos como argumento. Eles incrementam os contadores internos - _unflushedBytes e _writingHeadBytesBuffered, cujos nomes falam por si. Ele também trunca (fatias) _writingHeadMemory exatamente o número de bytes gravados (usando o método Slice). Portanto, depois de chamar esse método, você precisará solicitar um novo bloco de memória na forma de Memória ou Extensão, não poderá gravar no anterior. E todo o corpo do método é uma seção crítica e funciona sob um bloqueio.
Parece que depois disso o leitor pode receber dados. Mas é necessário mais um passo.
O método é chamado depois que escrevemos os dados necessários na Memória recebida (GetMemory) e indica quanto escrevemos lá (Avançado). O método retorna ValueTask, no entanto, não é assíncrono (diferente do StreamPipeWriter descendente). ValueTask é um tipo especial (estrutura somente leitura) usado no caso em que a maioria das chamadas não será assíncrona, ou seja, todos os dados necessários estarão disponíveis no momento da chamada e o método terminará de forma síncrona. Dentro de si, ele contém dados ou Tarefas (caso não funcione de forma síncrona). Depende da propriedade _writerAwaitable.IsCompleted. Se procurarmos o que muda o estado deste _writerAwaitable, veremos que isso acontece se a quantidade de dados não consumidos (isso não é exatamente o mesmo que os dados não examinados serão explicados posteriormente) exceder um determinado limite (_pauseWriterThreshold). O valor padrão é 16 tamanhos de segmento. Se desejado, o valor pode ser alterado em PipeOptions. Além disso, esse método inicia a continuação do método ReadAsync, se um foi bloqueado.
Retorna um FlushResult contendo 2 propriedades - IsCanceled e IsCompleted. IsCanceled indica se o Flush foi cancelado (chamada CancelPendingFlush ()). IsCompleted indica se o PipeReader foi concluído (chamando os métodos Complete () ou CompleteAsync ()).
A parte principal do método é realizada sob o bloqueio.
Outros métodos do PipeWriter não são interessantes do ponto de vista da implementação e são usados com muito menos frequência; portanto, apenas uma breve descrição será fornecida.
# 5 void Complete (exceção de exceção = null) ou ValueTask CompleteAsync (exceção de exceção = null)
Marca o tubo fechado para escrever. Uma exceção será lançada ao tentar usar os métodos de gravação após a conclusão. Se o PipeReader já tiver sido concluído, toda a instância do Pipe também será concluída. A maior parte do trabalho é feita sob o bloqueio.
# 6 void CancelPendingFlush ()
Como o nome indica, ele cancela a operação atual FlushAsync (). Há uma fechadura.
# 7 void OnReaderCompleted (ação <exceção, objeto> retorno de chamada, estado do objeto)
Executa o delegado passado quando o leitor é concluído. Há também uma fechadura.
Na
documentação , está atualmente escrito que esse método não pode ser chamado em algumas implementações do PipeWriter e será removido no futuro. Portanto, você não deve vincular a lógica a esses métodos.
Está na hora do PipeReader
Aqui, como em FlushAsync (), ValueTask é retornado, o que sugere que o método é principalmente síncrono, mas nem sempre. Depende do estado de _readerAwaitable. Assim como no FlushAsync, você precisa descobrir quando _readerAwaitable está definido como incompleto. Isso acontece quando o PipeReader lê tudo da lista interna (ou contém dados que foram marcados como examinados e você precisa de mais dados para continuar). O que, de fato, é óbvio. Assim, podemos concluir que é desejável ajustar o Pipe ao seu trabalho, definir todas as suas opções cuidadosamente, com base em estatísticas empiricamente identificadas. A configuração adequada reduzirá a chance de uma ramificação de execução assíncrona e permitirá um processamento mais eficiente dos dados. Quase todo o código em todo o método é cercado por um bloqueio.
Retorna alguns
ReadResult misteriosos. De fato, é apenas um buffer + sinalizadores mostrando o status da operação (IsCanceled - se o ReadAsync foi cancelado e IsCompleted indicando se o PipeWriter foi fechado). IsCompleted é um valor que indica se os métodos PipeWriter Complete () ou CompleteAsync () foram chamados. Se esses métodos foram chamados com uma exceção passada, ela será lançada ao tentar ler.
E, novamente, o buffer tem um tipo misterioso -
ReadOnlySequence . Este, por sua vez, é o objeto do conteúdo dos segmentos
(ReadOnlySequenceSegment) do início e dos índices end + start e end dentro dos segmentos correspondentes. O que na verdade se parece com a estrutura da própria classe Pipe. A propósito, BufferSegment é herdado de ReadOnlySequenceSegment, o que sugere que o BufferSegment é usado nessa sequência. Graças a isso, você pode se livrar de alocações de memória desnecessárias para transferência de dados do gravador para o leitor.
O ReadOnlySpan pode ser obtido no buffer para processamento adicional. Para concluir a imagem, você pode verificar se o buffer contém um único ReadOnlySpan. Se ele contiver, não precisamos iterar a coleção de um elemento e podemos obtê-la usando a propriedade First. Caso contrário, é necessário passar por todos os segmentos no buffer e processar ReadOnlySpan de cada um.
Tópico de discussão - na classe ReadOnlySequence, os tipos de referência anuláveis são usados ativamente e existe o goto (não para aninhamento de loop profundo e não no código gerado) - em particular
aqui .
Após o processamento, você precisa sinalizar para a instância do Pipe que lemos os dados.
Versão síncrona. Permite obter o resultado, se existir. Caso contrário, ao contrário do ReadAsync, ele não bloqueia e retorna false. Além disso, o código deste método está na fechadura.
Nesse método, você pode especificar quantos bytes examinamos e consumimos. Os dados que foram examinados, mas não consumidos, serão retornados na próxima vez que forem lidos. Esse recurso pode parecer estranho à primeira vista, mas ao processar um fluxo de bytes, raramente é necessário processar cada byte individualmente. Normalmente, os dados são trocados usando mensagens. Pode surgir uma situação em que o leitor, ao ler, recebeu uma mensagem inteira e parte da segunda. O todo deve ser processado e parte do segundo deve ser deixada para o futuro, para que ele venha junto com a parte restante. O método AdvanceTo usa uma SequencePosition, que na verdade é um segmento + índice. Ao processar tudo o que o ReadAsync leu, você pode especificar buffer.End. Caso contrário, você precisará criar explicitamente uma posição, indicando o segmento e o índice em que o processamento foi interrompido. A fechadura está embaixo do capô.
Além disso, se a quantidade de informações não consumidas for menor que o limite especificado (_resumeWriterThreshold), ele iniciará a continuação do PipeWriter se tiver sido bloqueado. Por padrão, esse limite é de 8 volumes de segmento (metade do limite de bloqueio).
# 4 vazio (exceção exceção = nulo)
Conclui o PipeReader. Se o PipeWriter for concluído nesse momento, a instância inteira do Pipe será concluída. Trave para dentro.
# 5 void CancelPendingRead ()
Permite cancelar a leitura que está atualmente no estado pendente. Bloquear
# 6 void OnWriterCompleted (ação <exceção, objeto> retorno de chamada, estado do objeto)
Permite especificar o delegado a ser executado após a conclusão do PipeWriter.
Como o método semelhante do PipeWriter, na
documentação há a mesma tag que será removida. A fechadura está embaixo do capô.
Exemplo
A lista abaixo mostra um exemplo de trabalho com tubos.
Desde a introdução do .NET Core Span e Memory, muitas classes para trabalhar com dados foram complementadas por sobrecargas usando esses tipos. Portanto, o esquema geral de interação será aproximadamente o mesmo. No meu exemplo, usei pipelines para trabalhar com pipes (gosto de palavras semelhantes) - objetos do SO para comunicação entre processos. A API de pipes acaba de ser expandida de acordo para ler dados no Span e na Memory. A versão assíncrona usa Memória, já que o método assíncrono será convertido em um método de modelo usando uma máquina de estado finito gerada automaticamente, na qual todas as variáveis locais e parâmetros de método são armazenados e, como Span é ref read-only struct, ele não pode ser colocado em a pilha, respectivamente, usando Span em um método assíncrono é impossível. Mas há também uma versão síncrona do método que permite usar o Span. No meu exemplo, eu tentei os dois e verificou-se que a versão síncrona nessa situação se mostra melhor. Ao usá-lo, ocorre menos coleta de lixo e o processamento de dados é mais rápido. Mas isso ocorreu apenas porque havia muitos dados no canal (os dados estavam sempre disponíveis). Na situação em que é provável que não haja dados no momento da inscrição para o próximo lote, você deve usar a versão assíncrona para não sobrecarregar o processador.
O exemplo tem comentários que explicam alguns pontos. Chamo a atenção para o fato de que, apesar de os fragmentos do programa responsável pela leitura do pipe e do processamento serem separados, ao gravar em um arquivo, os dados são lidos exatamente no local em que foram gravados ao ler no cachimbo.
Anos de evolução em prol de um recurso poderoso - principal assíncrono class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
Pipepatawriter public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
Processador de dados public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
Bytesprocessor public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {