System.IO。管道-高性能爱好者的鲜为人知的工具

您好读者。 自.NET Core 2.1发布以来,已经过去了很多时间。 诸如Span和Memory之类的很酷的创新已经广为人知,您可以阅读,看到和听到很多关于它们的信息。 但是,不幸的是,名为System.IO。Pipeslines的库没有得到同样的关注。 关于该主题的几乎所有内容都是唯一已翻译并复制到许多资源上的帖子 。 应该有更多关于该技术的信息,以便从不同角度对其进行研究。



引言


因此,该库旨在加速流数据的处理。 它最初由Kestrel(用于ASP.NET Core的跨平台Web服务器)的开发团队创建和使用,但目前可通过nuget包供凡人使用。

在深入探讨该主题之前,我们可以将库机制想象为MemoryStream的改进模拟。 原始MemoryStream的问题是副本数量过多,如果您记得在MemoryStream内部使用了专用字节数组作为缓冲区,则很明显。 例如,在ReadWrite方法中,您可以清楚地看到数据复制。 因此,对于我们要写入流的对象,将在内部缓冲区中创建一个副本,并在读取期间将内部副本的副本返回给使用者。 听起来不是最合理的内存使用方式。

System.IO.Pipelines并非旨在取代所有流,它是开发人员编写高性能代码时的附加工具。 我建议您熟悉基本方法和类,查看其实现细节并分析基本示例。

让我们从内部和实现细节开始,同时查看简单的代码片段。 在那之后,将变得清楚如何工作以及如何使用它。 使用System.IO.Pipelines时,应记住基本概念是所有读写操作都应在没有附加分配的情况下进行。 但是乍看之下有些吸引人的方法与此规则相矛盾。 因此,您试图如此加速的代码开始为新数据和新数据分配内存,从而加载垃圾回收器。

该库的内部使用了该语言和运行时的最新版本的最大可能性-跨度,内存,对象池,ValueTask等。 值得一看,至少是在生产中使用这些功能的一个很好的例子。

一次,一些开发人员对C#中的流实现不满意,因为一个类用于读取和写入。 但是正如他们所说,您不能将方法扔出类。 即使该流不支持读取/写入/查找,也会使用CanRead,CanWrite和CanSeek属性。 看起来像个小拐杖。 但是现在情况变得不同了。

要使用管道,需要使用2个类: PipeWriterPipeReader 。 这些类包含大约50行代码,并且是Pipe类的伪门面(不是其最经典的体现,因为它们隐藏了一个类,不是很多),它包含处理数据的所有基本逻辑。 此类包含5个公共成员:2个构造函数,2个仅获取属性-Reader和Writer,以及Reset()方法,该方法将内部字段重置为其初始状态,以便可以重用该类。 其余的工作方法是内部的,并使用伪门面进行调用。

让我们开始管道类


该类实例占用320个字节,这是一个很大的字节(几乎三分之一的字节,其中2个这样的对象无法容纳在曼彻斯特马克一世的记忆中)。 因此,分配大量实例是一个坏主意。 此外,该物体旨在长期使用。 使用池也使该语句成为一个参数。 池中使用的对象将永久存在(对于默认池实现)。
请注意,该类被标记为密封的,并且是线程安全的-代码的许多部分都是关键部分,并包装在锁中。

要开始使用此类,您应该创建Pipe类的实例,并使用上述属性获取PipeReader和PipeWriter对象。

简单的初始化
var pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader; 


考虑使用管道的方法:
使用PipeWriter编写-WriteAsync,GetMemory / GetSpan,Advance,FlushAsync,Complete,CancelPendingFlush,OnReaderCompleted。

使用PipeReader进行阅读-AdvanceTo,ReadAsync,TryRead,Complete,CancelPendingRead,OnWriterCompleted。

如前所述,该类使用缓冲区的单链接列表。 但是,显然,它们不是在PipeReader和PipeWriter之间传递的-所有逻辑都在一个类中。 此列表用于阅读和写作。 而且,返回的数据存储在此列表中(因此不执行复制)。

此外,还有一些对象指示要读取的数据的开头(ReadHead和索引),要读取的数据的末尾(ReadTail和索引)以及要写入的空间的开头(WriteHead和写入的缓冲字节数)。 在这里,ReadHead,ReadTail和WriteHead是内部段列表的特定成员(段),而索引指示段内的特定位置。 因此,记录可以从片段的中间开始,捕获一个完整的下一个片段,然后在第三个片段的中间结束。 这些指针以各种方法移动。

PipeWriter方法入门


#1 ValueTask <FlushResult> WriteAsync(ReadOnlyMemory <byte>源,CancellationToken cancelToken)


乍一看这就是有吸引力的方法。 它具有非常合适且时尚的签名-接受ReadOnlyMemory,异步。 许多人可能会受到诱惑,尤其要记住Span和Memory是如此之快和酷炫。 但是不要自欺欺人。 该方法所做的只是将传递给它的ReadOnlyMemory复制到内部列表中。 “复制”是指对CopyTo()方法的调用,而不是仅复制对象本身。 我们要记录的所有数据将被复制,从而加载内存。 应该仅提及此方法,以确保最好不要使用它。 好吧,也许对于一些罕见的情况,这种行为是适当的。
该方法的主体是关键部分,它的访问通过监视器进行同步。

然后可能会出现一个问题,如果不通过最明显,唯一合适的方法,该如何写一些东西

#2 内存<byte> GetMemory(int sizeHint)


该方法采用一个整数类型的参数。 在其中,我们必须指出要写入管道的字节数(所需缓冲区的大小)。 此方法检查_writingHeadMemory中存储的当前内存片段中是否有足够的写入空间。 如果足够,则将_writingHeadMemory作为内存返回。 否则,对于写入缓冲区但未调用FlushAsync方法的数据,将调用该数据并分配另一个BufferSegment,该数据与上一个缓冲区连接(这是我们的内部列表)。 如果_writingHeadMemory为null,则使用新的BufferSegment对其进行初始化。 缓冲区的分配是关键部分,并在锁定下完成。

我建议看一个这样的例子。 乍一看,似乎编译器(或运行时)已经欺骗了该恶魔。

魔鬼
  var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length); //2048 or 4096 var pipeWithOptions = new Pipe(new PipeOptions(minimumSegmentSize: 5)); Memory<byte> memoryTwo = pipeWithOptions.Writer.GetMemory(2); Console.WriteLine(memoryTwo.Length); //16 


但是此示例中的所有内容都是可以理解和简单的。
在创建Pipe实例时,我们可以在构造函数中将PipeOptions对象与创建选项一起传递给它。

PipeOptions具有默认的最小线段尺寸字段。 不久前,它是2048,但是此提交将其值更新为4096。在撰写本文时,4096版本位于prerelease nuget-package中,最后一个发行版本的值为2048。第一个示例的行为。 如果您对默认缓冲区使用较小的大小很挑剔,则可以在PipeOptions类型的实例中指定它。

但是在第二个示例中,在指定最小大小的情况下,长度无论如何都不匹配。 之所以发生这种情况,是因为使用缓冲池创建了一个新的BufferSegment。 PipeOptions中的选项之一是内存池。 之后,将使用指定的池创建一个新的段。 如果您未指定内存池,则将使用默认的ArrayPool,如您所知,该存储池具有多个存储桶,用于存储不同大小的数组(每个存储桶的大小是前一个存储桶的2倍),并且在需要特定存储空间时使用大小,它将搜索具有适当大小(即最接近的较大或相等)数组的存储桶。 因此,新缓冲区几乎肯定会比您要求的大。 默认ArrayPool(System.Buffers.TlsOverPerCoreLockedStacksArrayPool)中的最小数组大小为16。但是不用担心,这是一个数组池。 因此,在绝大多数情况下,该阵列不会对垃圾收集器施加压力,并将在以后重新使用。

#2.5 跨度<byte> GetSpan(int sizeHint)


它的工作原理类似,从内存获得跨度。

因此,GetMemory()或GetSpan()是主要的写入方法。 它们给了我们可以写入的对象。 为此,我们不需要为新的值数组分配内存,我们可以直接写入管道。 使用哪一个主要取决于您使用的API和异步方法。 但是,鉴于上述情况,产生了问题。 读者将如何知道我们写了多少书? 如果我们始终使用该池的特定实现,该实现提供了与请求的大小完全相同的数组,那么读取器可以一次读取整个缓冲区。 但是,正如我们已经说过的,我们很可能分配了一个较大大小的缓冲区。 这导致操作需要以下方法。

#3 void Advance(整数字节)


一种非常简单的方法。 它以写入的字节数作为参数。 它们增加内部计数器-_unflushedBytes和_writingHeadBytesBuffered,它们的名称不言而喻。 它还将_writingHeadMemory完全截断(切片)为写入的字节数(使用Slice方法)。 因此,在调用此方法后,您需要以“内存”或“跨度”的形式请求一个新的内存块,而无法写入前一个。 该方法的整体是关键部分,并处于锁定状态。

在此之后,阅读器似乎可以接收数据了。 但是还需要进一步的步骤。

#4 ValueTask <FlushResult> FlushAsync(CancellationToken cancelledToken)


在将必要的数据写入接收到的内存(GetMemory)并指出我们在其中写入了多少之后(Advance),将调用该方法。 该方法返回ValueTask,但是它不是异步的(不同于其后代StreamPipeWriter)。 ValueTask是一种特殊类型(只读结构),用于大多数调用不是异步的情况,即所有必要的数据在调用时都可用,并且该方法将同步结束。 它本身包含数据或任务(以防无法同步工作)。 它取决于_writerAwaitable.IsCompleted属性。 如果寻找导致_writerAwaitable状态改变的内容,我们将看到,如果未使用的数据量(与未检查的数据不完全相同,将在后面解释)不超过某个阈值(_pauseWriterThreshold),则会发生这种情况。 默认值为16段大小。 如果需要,可以在PipeOptions中更改该值。 另外,如果一个方法被阻止,则该方法将启动ReadAsync方法的继续。

返回一个FlushResult,其中包含2个属性-IsCanceled和IsCompleted。 IsCanceled指示是否已取消刷新(CancelPendingFlush()调用)。 IsCompleted指示PipeReader是否已完成(通过调用Complete()或CompleteAsync()方法)。
该方法的主要部分是在锁下执行的。

从实现的角度来看,PipeWriter的其他方法并不令人感兴趣,并且使用频率较低,因此仅给出简要说明。

#5 void Complete(异常异常= null)或ValueTask CompleteAsync(异常异常= null)


标记管已关闭以进行写入。 完成后尝试使用write方法时将引发异常。 如果PipeReader已经完成,则整个Pipe实例也将完成。 大多数工作是在锁下完成的。

#6 void CancelPendingFlush()


顾名思义,它取消了当前的FlushAsync()操作。 有一把锁。

#7 void OnReaderCompleted(操作<异常,对象>回调,对象状态)


读者完成后,执行传递的委托。 还有一个锁。
当前在文档中写道,在某些PipeWriter实现中可能不会调用此方法,以后会删除该方法。 因此,您不应将逻辑与这些方法联系在一起。

是时候使用PipeReader


#1 ValueTask <ReadResult> ReadAsync(CancellationToken令牌)


在这里,就像在FlushAsync()中一样,将返回ValueTask,这表明该方法主要是同步的,但并不总是同步的。 取决于_readerAwaitable的状态。 与FlushAsync一样,您需要查找_readerAwaitable设置为不完整的时间。 当PipeReader从内部列表中读取了所有内容(或者它包含标记为已检查的数据,并且您需要更多数据才能继续)时,就会发生这种情况。 实际上,这是显而易见的。 因此,我们可以得出结论,最好根据经验确定的统计数据对Pipe进行微调,以合理地设置其所有选项。 正确的配置将减少异步执行分支的机会,并使数据的处理效率更高。 整个方法中几乎所有代码都被锁包围。

返回一些神秘的ReadResult 。 实际上,它只是一个缓冲区+标志,用于显示操作的状态(IsCanceled-ReadAsync是否已取消,IsCompleted指示PipeWriter是否已关闭)。 IsCompleted是一个值,该值指示是否调用了PipeWriter Complete()或CompleteAsync()方法。 如果调用这些方法并传递了异常,则在尝试读取时将引发该异常。

同样,缓冲区具有一个神秘的类型-ReadOnlySequence 。 反过来,这是开始和结束的分段(ReadOnlySequenceSegment)的内容的对象+相应分段内的start和end索引。 实际上类似于Pipe类本身的结构。 顺便说一句,BufferSegment继承自ReadOnlySequenceSegment,这表明在此序列中使用了BufferSegment。 因此,您可以摆脱不必要的内存分配,以便将数据从写入器传输到读取器。
可以从缓冲区中获取ReadOnlySpan以进行进一步处理。 要完成图片,您可以检查缓冲区是否包含单个ReadOnlySpan。 如果包含,则不需要迭代一个元素的集合,可以使用First属性来获取它。 否则,有必要遍历缓冲区中的所有段并处理每个段的ReadOnlySpan。

讨论主题-在ReadOnlySequence类中,可空引用类型被积极使用,并且在这里特别是goto(不适用于深层循环嵌套和生成的代码)。

处理后,您需要通知Pipe实例我们已读取数据。

#2 bool TryRead(输出ReadResult结果)


同步版本。 允许您获得结果(如果存在)。 否则,与ReadAsync不同,它不会阻塞并返回false。 该方法的代码也在锁中。

#3 void AdvanceTo(已消耗SequencePosition,已检查SequencePosition)


在这种方法中,您可以指定我们检查和使用多少字节。 已检查但尚未使用的数据将在下次读取时返回。 乍一看,此功能可能看起来很奇怪,但是在处理字节流时,很少需要单独处理每个字节。 通常,数据是使用消息交换的。 读者可能会在阅读时收到一条完整的消息,而收到第二部分的一部分。 整个必须进行处理,第二部分的一部分应留待将来使用,以便与其余部分一起使用。 AdvanceTo方法采用SequencePosition,它实际上是一个段+索引。 处理ReadAsync已读取的所有内容时,可以指定buffer.End。 否则,您必须显式创建一个位置,以指示停止处理的段和索引。 锁在引擎盖下。
另外,如果未使用的信息量小于指定的阈值(_resumeWriterThreshold),则如果PipeWriter被阻止,它将开始继续PipeWriter。 默认情况下,此阈值为8个分区卷(阻塞阈值的一半)。

#4 void Complete(异常exception = null)


完成PipeReader。 如果此时PipeWriter完成,则整个Pipe实例完成。 锁在里面

#5 void CancelPendingRead()


允许您取消当前处于待处理状态的读数。 锁扣

#6 void OnWriterCompleted(操作<异常,对象>回调,对象状态)


允许您指定在PipeWriter完成时执行的委托。
与PipeWriter的类似方法一样,在文档中也有将被删除的相同标签。 锁在引擎盖下。

例子


下面的清单显示了使用管道的示例。
自引入.NET Core Span和Memory以来,使用这些类型的重载已为许多用于数据处理的类提供了补充。 因此,一般的交互方案将大致相同。 在我的示例中,我使用管道来处理管道(我喜欢类似的词)-用于进程间通信的OS对象。 管道API刚刚进行了相应的扩展,以读取Span和Memory中的数据。 异步版本使用内存,因为异步方法将使用自动生成的有限状态机转换为模板方法,该方法将存储所有局部变量和方法参数,并且由于Span是ref只读结构,因此无法将其放置在分别在异步方法中使用Span是不可能的。 但是,该方法还有一个同步版本,可让您使用Span。 在我的示例中,我尝试了两者,结果表明在这种情况下,同步版本显示得更好。 使用它时,垃圾收集较少,并且数据处理速度更快。 但这仅仅是因为管道中有很多数据(数据始终可用)。 在很有可能在申请下一个批处理时没有数据的情况下,您应该使用异步版本,以免使处理器处于空闲状态。
该示例具有注释,解释了一些要点。 我提请您注意以下事实:尽管负责从管道读取和处理的程序片段是分开的,但在写入文件时,数据是从读取时准确地从写入位置读取的。管道。

为了强大的功能,经过多年的发展-异步主
  class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } } 


Pipepatawriter
  public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested(); //// when working with the asynchronous method, use Memory <T> //Memory<byte> buffer = _pipeWriter.GetMemory(); //// asynchronous reading from a named pipe in Memory <T> //// there can be any operation to obtain data - from reading from a file to random generation. //int readBytes = await _namedPipe.ReadAsync(buffer, token); // synchronous reading from the named pipe to the requested from PipeWriter Span // there can be any operation to obtain data - from reading from a file to random generation. int readBytes = _namedPipe.Read(_pipeWriter.GetSpan()); // if there was nothing in the channel, release the thread for half a second and try again // in other cases we can break the loop, it's just example if (readBytes == 0) { await Task.Delay(500, token); continue; } // specify the amount of bytes read from the pipe _pipeWriter.Advance(readBytes); // flush data to make them available PipeReader FlushResult result = await _pipeWriter.FlushAsync(token); // if PipeReader has been completed, it no longer needs to write data // PS this behavior was chosen by me as an example, it depends on business logic if (result.IsCompleted) { break; } } // complete _pipeWriter to complete the entire instance of pipe _pipeWriter.Complete(); } } 


数据处理器
  public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested(); // reading data from a pipe instance ReadResult result = await _pipeReader.ReadAsync(token); ReadOnlySequence<byte> buffer = result.Buffer; // We perform calculations with the data obtained. await _bytesProcessor.ProcessBytesAsync(buffer, token); // indicate to which position the data was processed. In this case, everything is written to the file. // in situations where not all data has been processed, you need to create a position manually using the buffer and index // in this situation, IBytesProcessor.ProcessBytesAsync can be supplemented by returning this position _pipeReader.AdvanceTo(buffer.End); // if PipeWriter has been completed, reading is no longer necessary // this behavior was chosen by me as an example, it depends on business logic if (result.IsCompleted) { break; } } // complete _pipeReader to complete the entire instance of pipe _pipeReader.Complete(); } } 


字节处理器
  public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor { //Let's imagine that in this class there is a normal constructor and IDisposable readonly FileStream _fileStream = new FileStream("buffer", FileMode.Create); public Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token) { if (bytesSequence.IsSingleSegment) { ProcessSingle(bytesSequence.First.Span); } else { foreach (var segment in bytesSequence) { ProcessSingle(segment.Span); } } return Task.CompletedTask; } private void ProcessSingle(ReadOnlySpan<byte> span) { _fileStream.Write(span); } } 

Source: https://habr.com/ru/post/zh-CN466137/


All Articles