Olá leitor. Muito tempo se passou desde o lançamento do .NET Core 2.1. E inovações legais como Span e Memory já foram consideradas amplamente, você pode ler, ver e ouvir muito sobre elas. No entanto, infelizmente, uma biblioteca chamada System.IO.Pipelines não recebeu a mesma atenção. Quase tudo o que há neste tópico é o
único post que muitos traduziram e publicaram em casa. Definitivamente, deve haver mais informações para que os interessados possam ver a tecnologia de diferentes ângulos.

1. Introdução
Portanto, esta biblioteca tem como objetivo acelerar o trabalho com o processamento de dados de streaming. Ele foi originalmente criado e usado pela equipe de desenvolvimento do Kestrel (um servidor Web de plataforma cruzada para o ASP.NET Core), mas atualmente é entregue por meio de um
pacote de nuget separado.
Antes de nos aprofundarmos no tópico, podemos imaginar o mecanismo da biblioteca como um análogo aprimorado do MemoryStream. O problema com o MemoryStream original é um número excessivo de cópias, o que é óbvio se você se lembrar de que uma matriz de bytes privada é usada dentro como um buffer. Por exemplo, nos métodos de
leitura e
gravação , a cópia é claramente visível. Assim, para o objeto que queremos gravar no fluxo, uma cópia será criada no buffer interno e, durante a leitura, uma cópia da cópia interna será entregue ao consumidor. Parece que não é o uso mais racional do espaço.
O System.IO.Pipelines não tem como objetivo substituir todos os fluxos, é uma ferramenta adicional no arsenal de um desenvolvedor que escreve código de alto desempenho. Sugiro que você se familiarize com os métodos e classes básicos, veja como eles são organizados por dentro e analise exemplos básicos.
Vamos começar com o dispositivo interno, ao mesmo tempo examinando fragmentos de código simples. Depois disso, ficará claro o que e como funciona e como deve ser usado. Ao trabalhar com System.IO.Pipelines, vale lembrar que o conceito básico é que todas as operações de leitura e gravação devem ocorrer sem alocações adicionais. Mas alguns métodos atraentes à primeira vista contradizem essa regra. Consequentemente, o código que você está tentando com tanta rapidez começa a alocar memória para dados novos e novos, carregando o coletor de lixo.
A biblioteca interna da biblioteca usa as mais amplas possibilidades das versões mais recentes do idioma e período, intervalo, memória, conjuntos de objetos, ValueTask etc. Vale a pena procurar pelo menos um ótimo exemplo de uso desses recursos na produção.
Ao mesmo tempo, alguns ficaram descontentes com a implementação de fluxos em C #, porque uma classe era usada para leitura e gravação. Mas, como eles dizem, você não pode jogar métodos fora de uma classe. Mesmo se o fluxo não suportasse a leitura / gravação / movimentação do ponteiro, as propriedades CanRead, CanWrite e CanSeek entrariam em vigor, o que parecia uma pequena muleta. Aqui as coisas são diferentes.
Para trabalhar com tubos, são usadas 2 classes:
PipeWriter e
PipeReader . Essas classes contêm cerca de 50 linhas cada e são pseudo-fachadas (não suas encarnações mais clássicas, pois há uma única classe oculta por trás delas, e não muitas) para a classe
Pipe , que contém toda a lógica básica para trabalhar com dados. Dos membros públicos - 2 construtores, 2 propriedades de obtenção apenas - Reader e Writer, o método Reset (), que redefine os campos internos para seu estado inicial para que a classe possa ser reutilizada. Outros métodos para o trabalho são chamados usando pseudo-fachadas.
Para começar na classe Pipe
A instância da classe ocupa 320 bytes, o que é bastante (quase um terço de um kilobyte, dois desses objetos não cabiam na memória do Manchester Mark I). Portanto, alocá-lo em grandes quantidades é uma má idéia. Além disso, o significado do objeto é destinado ao uso a longo prazo. O uso de pools também cria um argumento para esta declaração. Afinal, os objetos usados na piscina viverão para sempre (em qualquer caso, no padrão).
Observe que a classe está marcada como lacrada e é segura para threads - muitas seções do código são críticas e estão envolvidas em bloqueios.
Para começar, crie uma instância da classe Pipe e obtenha os objetos PipeReader e PipeWriter usando as propriedades mencionadas.
Inicialização fácilvar pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader;
Considere os métodos para trabalhar com tubos:
Para gravação via PipeWriter - WriteAsync, GetMemory / GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Para ler através do PipeReader - AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.
Conforme declarado na
publicação , a classe usa uma lista de buffers vinculada individualmente. Mas, obviamente, eles não são passados entre o PipeReader e o PipeWriter - toda a lógica está em uma classe. Esta lista é usada para leitura e escrita. Além disso, os dados retornados são armazenados nesta lista.
Também existem objetos que indicam o início dos dados a serem lidos (ReadHead e índice), o final dos dados a serem lidos (ReadTail e índice) e o início do local a ser gravado (WriteHead e o número de bytes em buffer gravados). Aqui, ReadHead, ReadTail e WriteHead são um segmento específico da lista e o índice indica uma posição específica dentro do segmento. Assim, a gravação pode começar no meio de um segmento, capturar todo o próximo segmento e terminar no meio do terceiro. Esses ponteiros se movem em vários métodos.
Introdução aos métodos PipeWriter
Apenas esse método tentador. Tem uma assinatura muito adequada e moderna - aceita ReadOnlyMemory, assíncrono. E muitos podem ficar tentados, principalmente lembrando que Span e Memory são tão rápidos e legais. Mas não se iluda. Tudo o que esse método faz é copiar o ReadOnlyMemory passado para ele na lista interna. E "copiar" significa uma chamada para o método CopyTo, e não copiar o próprio objeto. Ou seja, todos os dados que queremos gravar serão copiados, carregando assim a memória. Este método deve ser estudado apenas para garantir que é melhor não usá-lo. Bem, e talvez para algumas situações raras, esse comportamento é apropriado.
O corpo do método é uma seção crítica, o acesso a ele é sincronizado através de um monitor.
Então pode surgir a questão de como escrever algo, se não através do método mais óbvio e único adequado.
O método usa um parâmetro de um tipo inteiro. Nele, devemos indicar quantos bytes queremos escrever (ou mais, mas em nenhum caso menos). Esse método verifica se há espaço suficiente para gravação no fragmento atual de memória armazenado em _writingHeadMemory. Se suficiente, _writingHeadMemory é retornado como memória. Caso contrário, para os dados gravados no buffer, mas para os quais o método FlushAsync não foi chamado, ele será chamado e outro BufferSegment será selecionado, que será conectado ao anterior (aqui está a lista). Na ausência de _writingHeadMemory, ele é inicializado com um novo BufferSegment. E a alocação do próximo buffer é uma seção crítica e é feita sob o bloqueio.
Sugiro uma olhada nesse exemplo. À primeira vista, pode parecer que o compilador (ou tempo de execução) tenha enganado o demônio.
Devilry var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length);
Mas tudo neste exemplo é compreensível e simples.
Ao criar uma instância de Pipe, podemos passar um objeto
PipeOptions com opções para criá-lo ao construtor.
PipeOptions possui um campo de tamanho de segmento mínimo padrão. Há pouco tempo, era 2048, mas
esse commit mudou tudo, agora 4096. No momento da redação, a versão com 4096 era um pacote de pré-lançamento, na versão mais recente era 2048. Isso explica o comportamento do primeiro exemplo. Se você for crítico ao usar um tamanho menor para o buffer padrão, poderá especificá-lo em uma instância do tipo PipeOptions.
Mas no segundo exemplo, onde o tamanho mínimo é indicado, o comprimento não corresponde a ele. E isso já está acontecendo porque a criação de um novo BufferSegment ocorre usando pools. Uma das opções no PipeOptions é o pool de memória. Depois disso, o pool especificado será usado para criar um novo segmento. Se você não especificou seu conjunto de memórias, será utilizado o ArrayPool padrão, que, como você sabe, possui vários buckets para tamanhos diferentes de matrizes (cada um deles é duas vezes maior que o anterior) e, quando solicitado por um tamanho específico, procura um bucket com matrizes de tamanho adequado (então existe o maior ou igual mais próximo). Consequentemente, o novo buffer quase certamente será maior do que o solicitado. O tamanho mínimo da matriz no ArrayPool padrão (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) é 16. Mas não se preocupe, é um conjunto de matrizes. Assim, na grande maioria dos casos, a matriz não pressiona o coletor de lixo e será reutilizada.
Funciona da mesma forma, fornecendo Span from Memory.
Portanto, GetMemory () ou GetSpan () são os principais métodos de gravação. Eles nos dão um objeto para o qual podemos escrever. Para fazer isso, não precisamos alocar memória para novas matrizes de valores, podemos escrever diretamente na estrutura interna. Qual usar dependerá principalmente da API que você está usando e do método assíncrono. No entanto, tendo em conta o que precede, surge uma questão. Como o leitor saberá quanto escrevemos? Se sempre usamos uma implementação específica do pool, que fornece uma matriz exatamente do mesmo tamanho que o solicitado, o leitor pode ler o buffer inteiro de uma só vez. No entanto, como já dissemos, temos um buffer com alta probabilidade de tamanho maior. Isso leva ao seguinte método necessário para a operação.
Um método terrível e simples. Leva o número de bytes escritos como argumento. Eles incrementam os contadores internos - _unflushedBytes e _writingHeadBytesBuffered, cujos nomes falam por si. Ele também trunca _writingHeadMemory exatamente para o número de bytes gravados (usando o método Slice). Portanto, depois de chamar esse método, você precisará solicitar um novo bloco de memória na forma de Memória ou Extensão, não poderá gravar no anterior. E todo o corpo do método é uma seção crítica e funciona sob um bloqueio.
Parece que depois disso o leitor pode receber dados. Mas é necessário mais um passo.
O método é chamado após escrevermos os dados necessários na memória recebida e indicar quanto escrevemos lá. O método retorna um ValueTask, no entanto, não é assíncrono (diferente do StreamPipeWriter descendente). ValueTask é um tipo especial (estrutura somente leitura) usado no caso em que a maioria das chamadas não usa assincronia, ou seja, todos os dados necessários estarão disponíveis no momento da chamada e o método terminará de forma síncrona. No interior, ele contém dados ou Tarefa (caso não funcione de forma síncrona). Depende do estado da propriedade _writerAwaitable.IsCompleted. Se procurarmos o que muda o estado desse objeto em espera, veremos que isso acontece sob a condição de que a quantidade de dados não processados (não consumidos) (isso não é exatamente o mesmo que não lido (não examinado), será explicado mais tarde) exceda um certo limite (_pauseWriterThreshold). O padrão é 16 tamanhos de segmento. Se desejado, o valor pode ser alterado em PipeOptions. Além disso, esse método inicia a continuação do método ReadAsync, se um foi bloqueado.
Retorna um FlushResult contendo 2 propriedades - IsCanceled e IsCompleted. IsCanceled indica se o Flush foi cancelado (chamada CancelPendingFlush). IsCompleted indica se o PipeReader foi concluído (chamando os métodos Complete () ou CompleteAsync ()).
A parte principal do método é realizada sob Locke Skywalker.
Outros métodos do PipeWriter não são interessantes do ponto de vista da implementação e são usados com muito menos frequência; portanto, apenas uma breve descrição será fornecida.
# 5 void Complete (exceção de exceção = null) ou ValueTask CompleteAsync (exceção de exceção = null)
Marca o tubo fechado para escrever. Após a conclusão, uma exceção será lançada ao tentar usar os métodos de gravação. Se o PipeReader já tiver sido concluído, toda a instância do Pipe também será concluída. A maior parte do trabalho é feita sob o bloqueio.
# 6 void CancelPendingFlush ()
Como o nome indica, ele conclui a operação atual FlushAsync (). Há um lok.
# 7 void OnReaderCompleted (ação <exceção, objeto> retorno de chamada, estado do objeto)
Executa o delegado delegado quando o leitor é concluído. Há também uma fechadura.
A
documentação atualmente diz que esse método pode não ser chamado em alguns descendentes de PipeWriter e será removido no futuro. Portanto, você não deve vincular a lógica a esses métodos.
Vá para PipeReader
Aqui, como FlushAsync, um ValueTask é retornado, o que sugere que o método é principalmente síncrono, mas nem sempre. Depende do estado de _readerAwaitable. Assim como no FlushAsync, você precisa descobrir quando _readerAwaitable está definido como incompleto. Isso acontece quando o PipeReader lê tudo da lista (ou contém dados que foram marcados como examinados e precisam de mais dados para continuar). O que, de fato, é lógico. Assim, podemos concluir que é desejável ajustar o Pipe ao seu trabalho, definir todas as suas opções cuidadosamente, com base em estatísticas empiricamente identificadas. A configuração adequada reduzirá a probabilidade de uma ramificação de execução assíncrona e permitirá um processamento mais eficiente dos dados. Quase todo o método é cercado por uma fechadura.
Retorna alguns
ReadResult misteriosos. Na verdade, é apenas um buffer + sinalizadores mostrando o status da operação (IsCanceled - se o ReadAsync foi cancelado e IsCompleted indicando se o PipeWriter foi fechado). Nesse caso, IsCompleted é um valor que indica se os métodos PipeWriter Complete () ou CompleteAsync () foram chamados. Se esses métodos foram chamados com uma exceção, eles serão lançados ao tentar ler.
O buffer novamente tem um tipo misterioso -
ReadOnlySequence . Este, por sua vez, é um objeto para conter
segmentos (ReadOnlySequenceSegment) dos índices
de início e fim + início e fim dentro dos segmentos correspondentes. O que na verdade se parece com a estrutura da própria classe Pipe. A propósito, BufferSegment é o sucessor de ReadOnlySequenceSegment, o que sugere que ele seja usado lá. Graças a isso, você pode se livrar de alocações de memória desnecessárias para transferência de dados do gravador para o leitor.
O ReadOnlySpan pode ser obtido no buffer para processamento adicional. Para concluir a imagem, você pode verificar se o buffer contém um único ReadOnlySpan. Se ele contiver, não precisamos iterar a coleção de um elemento e podemos obtê-la usando a propriedade First. Caso contrário, você precisará revisar todos os segmentos no buffer e processar cada ReadOnlySpan.
Tópico de discussão - na classe ReadOnlySequence, os tipos de referência anuláveis são usados ativamente e existe goto (não para sair do aninhamento e não no código gerado) - em particular,
aquiApós o processamento, você deve deixar claro para a instância do Pipe que lemos os dados.
Versão síncrona. Permite obter o resultado, se for o caso. Se ainda não estiver lá, ao contrário do ReadAsync, ele não bloqueia, mas retorna false. Também na fechadura.
Neste método, você pode especificar quantos bytes lemos e quantos processamos. Os dados que foram lidos, mas não processados, serão retornados na próxima vez que forem lidos. Esse recurso pode parecer estranho à primeira vista, mas ao processar um fluxo de bytes, raramente é necessário processar cada byte individualmente. Normalmente, os dados são trocados usando mensagens. Pode surgir uma situação em que o leitor, ao ler, recebeu uma mensagem inteira e parte da segunda. O todo deve ser processado, e parte do segundo deve ser deixada na próxima vez, para que venha com a parte restante. O método AdvanceTo aceita uma SequencePosition, que na verdade é um segmento + índice. Ao processar tudo o que o ReadAsync leu, você pode especificar buffer.End. Caso contrário, você precisará criar explicitamente uma posição, indicando o segmento e o índice em que o processamento foi interrompido. Sob o capô lok.
Além disso, se a quantidade de informações brutas for menor que a falha instalada (_resumeWriterThreshold), ela iniciará a continuação do PipeWriter se estiver bloqueada. Por padrão, esse limite é de 8 volumes de segmento (metade do limite de bloqueio).
# 4 vazio (exceção exceção = nulo)
Conclui o PipeReader. Se o PipeWriter estiver completo nesse momento, a instância do Pipe inteira será encerrada. Trave para dentro.
# 5 void CancelPendingRead ()
Permite cancelar a leitura atualmente esperada. Locke.
# 6 void OnWriterCompleted (ação <exceção, objeto> retorno de chamada, estado do objeto)
Permite especificar o delegado a ser executado após a conclusão do PipeWriter.
Como o método semelhante para o PipeWriter, a
documentação tem a mesma nota que será removida. Trave sob o capô.
Exemplo
A lista abaixo mostra um exemplo de trabalho com tubos.
Desde a introdução do .NET Core Span e Memory, muitas classes para trabalhar com dados foram complementadas por sobrecargas usando esses tipos. Portanto, o esquema geral de interação será aproximadamente o mesmo. No meu exemplo, usei pipelines para trabalhar com pipes (eu gosto de palavras raiz), ou seja, canais - objetos do SO para comunicação entre processos. A API do canal acaba de ser expandida de acordo para ler dados no Span e na Memory. A versão assíncrona usa Memória, pois o método assíncrono será convertido em um método de modelo usando uma máquina de estado finito gerada automaticamente, na qual todas as variáveis locais e parâmetros de método são armazenados e, como Span é ref read-only struct, ele não pode estar no heap, respectivamente, usando Span em um método assíncrono não é possível. Mas há também uma versão síncrona do método que permite usar o Span. No meu exemplo, eu tentei os dois e verificou-se que a versão síncrona nessa situação se mostra melhor. Ao usá-lo, ocorre menos coleta de lixo e o processamento de dados é mais rápido. Mas isso foi apenas porque havia muitos dados. No caso de uma situação provável em que não haverá dados no momento da inscrição para o próximo lote, você deverá usar a versão assíncrona para não sobrecarregar o processador.
O exemplo tem comentários que explicam alguns pontos. Chamo a atenção para o fato de que, apesar de os fragmentos do programa responsável pela leitura do pipe e do processamento serem separados, ao gravar em um arquivo, os dados são lidos exatamente no local em que foram gravados ao ler no pipe.
Anos de evolução em prol de um recurso poderoso - maine assíncrono class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } }
Pipepatawriter public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested();
Processador de dados public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested();
Bytesprocessor public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor {