System.IO.Pipelines-面向高性能爱好者的鲜为人知的工具

您好读者。 自.NET Core 2.1发布以来,已经过去了很多时间。 诸如Span和Memory之类的很酷的创新已经被广泛地考虑,您可以阅读,看到和听到很多关于它们的信息。 但是,不幸的是,名为System.IO.Pipelines的库没有得到同样的关注。 关于此主题的几乎所有内容都是许多人在家翻译和发布的唯一帖子 。 绝对应该有更多的信息,以便有兴趣的人可以从不同的角度看技术。



引言


因此,该库旨在加快流数据处理的速度。 它最初由Kestrel(用于ASP.NET Core的跨平台Web服务器)的开发团队创建和使用,但目前通过单独的nuget软件包提供
在深入探讨该主题之前,我们可以将库机制想象为MemoryStream的改进模拟。 原始MemoryStream的问题是拷贝数过多,如果您记得内部使用私有字节数组作为缓冲区,则很明显。 例如,在ReadWrite方法中,复制清晰可见。 因此,对于我们要写入流的对象,将在内部缓冲区中创建一个副本,并且在读取期间,将内部副本的副本传递给使用者。 听起来好像不是最合理地利用空间。
System.IO.Pipelines并非旨在取代所有流,它是开发人员编写高性能代码时的附加工具。 我建议您熟悉基本方法和类,了解它们的内部排列方式,并分析基本示例。

让我们从内部设备开始,同时检查简单的代码片段。 之后,将清楚其工作方式,工作方式以及应如何使用。 使用System.IO.Pipelines时,应记住基本概念是所有读写操作都应在没有附加分配的情况下进行。 但是乍看之下有些吸引人的方法与此规则相矛盾。 因此,您正在努力加速的代码将开始为新数据和新数据分配内存,从而加载垃圾回收器。

图书馆的内部图书馆使用了最新版本的语言和跨度,跨度,内存,对象池,ValueTask等功能,为您提供了最广泛的选择。 至少值得一看,这是在生产中使用这些功能的一个很好的例子。
一次,有些人对用C#实现流不满意,因为一个类用于读取和写入。 但是,正如他们所说,您不能将方法抛出类。 即使该流不支持读/写/移动指针,CanRead,CanWrite和CanSeek属性仍然生效,这看起来像一个小拐杖。 这里的情况有所不同。
要使用管道,需要使用2个类: PipeWriterPipeReader 。 这些类每个都包含约50行,并且是Pipe类的伪门面(不是其最经典的化身,因为在它们后面隐藏着一个类,并且没有很多),它包含处理数据的所有基本逻辑。 在公共成员(2个构造函数,2个仅获得属性)中,Reader和Writer是Reset()方法,该方法将内部字段重置为其初始状态,以便可以重用该类。 使用伪门面称为其他工作方法。

开始上Pipe类


该类实例占用320个字节,这是一个很大的字节(几乎三分之一的字节,其中2个这样的对象无法容纳在曼彻斯特马克一世的记忆中)。 因此,大量分配它不是一个好主意。 此外,该对象的含义旨在长期使用。 使用池也使该语句成为一个参数。 毕竟,池中使用的对象将永远存在(无论如何,在标准中)。
请注意,该类被标记为密封的,并且是线程安全的-代码的许多部分是关键部分,并包装在锁中。
首先,创建Pipe类的实例,并使用上述属性获取PipeReader和PipeWriter对象。

容易初始化
var pipe = new Pipe(); PipeWriter pipeWriter = pipe.Writer; PipeReader pipeReader = pipe.Reader; 


考虑使用管道的方法:
通过PipeWriter进行记录-WriteAsync,GetMemory / GetSpan,Advance,FlushAsync,Complete,CancelPendingFlush,OnReaderCompleted。
用于通过PipeReader进行读取-AdvanceTo,ReadAsync,TryRead,Complete,CancelPendingRead,OnWriterCompleted。

文章所述,该类使用缓冲区的单链接列表。 但是,显然,它们没有在PipeReader和PipeWriter之间传递-所有逻辑都在一个类中。 此列表用于阅读和写作。 此外,返回的数据存储在此列表中。
还有一些对象指示要读取的数据的开头(ReadHead和索引),要读取的数据的末尾(ReadTail和索引)以及要写入的位置的开头(WriteHead和写入的缓冲字节数)。 在这里,ReadHead,ReadTail和WriteHead是列表中的特定段,而索引指示该段内的特定位置。 因此,记录可以从片段的中间开始,捕获整个下一个片段,然后在第三个片段的中间结束。 这些指针以各种方法移动。

PipeWriter方法入门


#1 ValueTask <FlushResult> WriteAsync(ReadOnlyMemory <byte>源,CancellationToken cancelToken)


就是那种诱人的方法。 具有非常合适和流行的签名-接受ReadOnlyMemory,异步。 许多人可能会受到诱惑,尤其要记住Span和Memory是如此之快和酷炫。 但是不要自欺欺人。 该方法所做的只是将传递给它的ReadOnlyMemory复制到内部列表中。 “复制”是指对CopyTo方法的调用,而不是复制对象本身。 也就是说,我们要记录的所有数据都将被复制,从而加载内存。 应该仅研究此方法,以确保最好不要使用它。 好吧,也许对于一些罕见的情况,这种行为是适当的。
该方法的主体是关键部分,它的访问通过监视器进行同步。

然后可能会出现一个问题,如果不通过最明显,最合适的方法,该如何写东西。

#2 内存<byte> GetMemory(int sizeHint)


该方法采用一个整数类型的参数。 在其中,我们必须指出要写入多少个字节(或更多,但绝不更少)。 此方法检查是否有足够的空间可以写入_writingHeadMemory中存储的当前内存片段。 如果足够,则_writingHeadMemory作为Memory返回。 如果不是,则对于写入缓冲区但未调用FlushAsync方法的数据,将调用该数据,并选择另一个BufferSegment,该数据连接到前一个(这里是列表)。 如果没有_writingHeadMemory,则会使用新的BufferSegment对其进行初始化。 而下一个缓冲区的分配是关键部分,它是在锁定下完成的。
我建议看一个这样的例子。 乍一看,似乎编译器(或运行时)已经欺骗了该恶魔。

魔鬼
  var pipeNoOptions = new Pipe(); Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2); Console.WriteLine(memoryOne.Length); //2048  4096 var pipeWithOptions = new Pipe(new PipeOptions(minimumSegmentSize: 5)); Memory<byte> memoryTwo = pipeWithOptions.Writer.GetMemory(2); Console.WriteLine(memoryTwo.Length); //16 


但是此示例中的所有内容都是可以理解和简单的。
创建Pipe实例时,我们可以将带有创建选项的PipeOptions对象传递给构造函数。

PipeOptions具有默认的最小线段尺寸字段。 不久前它是2048,但是此提交更改了所有内容,现在是4096。在撰写本文时,具有4096的版本是预发行软件包,在最新发行版本中是2048。这说明了第一个示例的行为。 如果您要求为标准缓冲区使用较小的大小,则可以在PipeOptions类型的实例中指定它。

但是在第二个示例中,在指示了最小大小的情况下,长度无论如何都不匹配。 这已经发生了,因为使用缓冲池创建了一个新的BufferSegment。 PipeOptions中的选项之一是内存池。 之后,将使用指定的池创建一个新的段。 如果您未指定内存池,则将使用标准ArrayPool,如您所知,该存储池具有多个存储区,用于存储不同大小的数组(每个存储区的大小是前一个存储区的2倍),并且在询问特定大小时,它将查找具有适当大小的数组的存储区(然后有最接近的更大或相等)。 因此,新缓冲区几乎肯定会比您要求的大。 标准ArrayPool(System.Buffers.TlsOverPerCoreLockedStacksArrayPool)中数组的最小大小为16。但是不用担心,它是一个数组池。 因此,在大多数情况下,该阵列不会对垃圾收集器造成负担,而是可以重复使用。

#2.5 跨度<byte> GetSpan(int sizeHint)


它的工作原理类似,从内存获得跨度。

因此,GetMemory()或GetSpan()是主要的写入方法。 它们给了我们可以写入的对象。 为此,我们不需要为新的值数组分配内存,我们可以直接写入内部结构。 使用哪一个主要取决于您使用的API和异步方法。 但是,鉴于上述情况,产生了问题。 读者将如何知道我们写了多少书? 如果我们始终使用该池的特定实现,该实现给出的数组大小与请求的大小完全相同,则读取器可以一次读取整个缓冲区。 但是,正如我们已经说过的,我们很可能分配了一个较大大小的缓冲区。 这导致操作需要以下方法。

#3 void Advance(整数字节)


一种可怕的简单方法。 它以写入的字节数作为参数。 它们增加内部计数器-_unflushedBytes和_writingHeadBytesBuffered,它们的名称不言而喻。 它还将_writingHeadMemory精确地截断为写入的字节数(使用Slice方法)。 因此,在调用此方法后,您需要以“内存”或“跨度”的形式请求一个新的内存块,而无法写入前一个。 该方法的整体是关键部分,并处于锁定状态。

在此之后,阅读器似乎可以接收数据。 但是还需要进一步的步骤。

#4 ValueTask <FlushResult> FlushAsync(CancellationToken cancelledToken)


在将必要的数据写入接收到的内存并指出在其中写入了多少之后,将调用该方法。 该方法返回一个ValueTask,但是它不是异步的(不同于其后代StreamPipeWriter)。 ValueTask是一种特殊类型(只读结构),用于大多数调用不使用异步的情况,即,在调用时所有必需的数据将可用,并且该方法将同步结束。 内部包含数据或任务(以防无法同步处理)。 它取决于_writerAwaitable.IsCompleted属性的状态。 如果寻找什么改变了该等待对象的状态,我们将看到这种情况发生在未处理(未使用)数据量(与未读取(未检查)不完全相同,将在后面解释)的数据量超过某个阈值的情况下(_pauseWriterThreshold)。 默认值为16段大小。 如果需要,可以在PipeOptions中更改该值。 另外,如果一个方法被阻止,则该方法将启动ReadAsync方法的继续。

返回一个FlushResult,其中包含2个属性-IsCanceled和IsCompleted。 IsCanceled指示是否已清除刷新(CancelPendingFlush调用)。 IsCompleted指示PipeReader是否已完成(通过调用Complete()或CompleteAsync()方法)。
该方法的主要部分在Locke Skywalker下执行。

从实现的角度来看,PipeWriter的其他方法并不令人感兴趣,并且使用的频率要低得多,因此,仅给出简要说明。

#5 void Complete(异常异常= null)或ValueTask CompleteAsync(异常异常= null)


标记管已关闭以进行写入。 完成后,尝试使用该方法进行写入时将引发异常。 如果PipeReader已经完成,则整个Pipe实例也将完成。 大多数工作是在锁下完成的。

#6 void CancelPendingFlush()


顾名思义,它完成了当前的FlushAsync()操作。 有个乐。

#7 void OnReaderCompleted(操作<异常,对象>回调,对象状态)


在阅读器完成时执行委托。 还有一个锁。
文档当前说,在某些PipeWriter后代上可能不会调用此方法,以后会删除该方法。 因此,您不应将逻辑与这些方法联系在一起。

转到PipeReader


#1 ValueTask <ReadResult> ReadAsync(CancellationToken令牌)


在这里,像FlushAsync一样,将返回ValueTask,这表明该方法主要是同步的,但并不总是同步的。 取决于_readerAwaitable的状态。 与FlushAsync一样,您需要查找_readerAwaitable设置为不完整的时间。 当PipeReader从列表中读取了所有内容(或者它包含已标记为已检查的数据并且需要更多数据才能继续)时,就会发生这种情况。 实际上,这是合乎逻辑的。 因此,我们可以得出结论,最好根据经验确定的统计数据对Pipe进行微调,以合理地设置其所有选项。 正确的配置将减少异步执行分支的可能性,并使数据的处理效率更高。 几乎整个方法都被锁包围。

返回一些神秘的ReadResult 。 实际上,它只是显示操作状态的缓冲区+标志(IsCanceled-ReadAsync是否已取消,IsCompleted表明是否已关闭PipeWriter)。 在这种情况下,IsCompleted是一个值,该值指示是否调用了PipeWriter Complete()或CompleteAsync()方法。 如果这些方法被异常调用,则在尝试读取时将引发该异常。

缓冲区再次具有一个神秘的类型-ReadOnlySequence 。 反过来,这是一个对象,用于在相应段内包含开始和结束的段(ReadOnlySequenceSegment) +开始和结束索引。 实际上类似于Pipe类本身的结构。 顺便说一句,BufferSegment是ReadOnlySequenceSegment的后继者,这表明在此使用它。 因此,您可以摆脱不必要的内存分配,以便将数据从写入器传输到读取器。
可以从缓冲区中获取ReadOnlySpan以进行进一步处理。 要完成图片,您可以检查缓冲区是否包含单个ReadOnlySpan。 如果包含,则不需要迭代一个元素的集合,可以使用First属性来获取它。 否则,您需要遍历缓冲区中的所有段并处理每个ReadOnlySpan。

讨论主题-在ReadOnlySequence类中,可空引用类型被积极使用,并且存在goto(不用于退出嵌套且不在生成的代码中)-特别是这里

处理之后,您需要向Pipe实例明确表明我们已经读取了数据。

#2 bool TryRead(输出ReadResult结果)


同步版本。 如果可以得到结果。 如果它还不存在,则与ReadAsync不同,它不会阻塞,但会返回false。 也处于锁定状态。

#3 void AdvanceTo(已消耗SequencePosition,已检查SequencePosition)


在此方法中,您可以指定读取的字节数和处理的字节数。 已读取但未处理的数据将在下次读取时返回。 乍一看,此功能可能看起来很奇怪,但是在处理字节流时,很少需要单独处理每个字节。 通常,使用消息交换数据。 读者可能会在阅读时收到一条完整的消息,而收到第二部分的一部分。 整个过程必须进行处理,第二部分的一部分应在下一次保留,以便与其余部分一起出现。 AdvanceTo方法接受SequencePosition,它实际上是其中的一个段+索引。 处理ReadAsync已读取的所有内容时,可以指定buffer.End。 否则,您将必须显式创建一个位置,以指示停止处理的段和索引。 引擎盖下的乐
另外,如果原始信息量少于已安装的缺陷(_resumeWriterThreshold),则如果它被阻止,它将开始继续PipeWriter。 默认情况下,此阈值为8个分区卷(阻塞阈值的一半)。

#4 void Complete(异常exception = null)


完成PipeReader。 如果此时PipeWriter完成,则整个Pipe实例终止。 锁在里面

#5 void CancelPendingRead()


允许您取消当前预期的读数。 洛克

#6 void OnWriterCompleted(操作<异常,对象>回调,对象状态)


允许您指定在PipeWriter完成时执行的委托。
与PipeWriter的类似方法一样, 文档中的注释也将被删除。 锁在引擎盖下。

例子



下面的清单显示了使用管道的示例。
自引入.NET Core Span和Memory以来,使用这些类型的重载已为许多用于数据处理的类提供了补充。 因此,一般的交互方案将大致相同。 在我的示例中,我使用管道来处理管道(我喜欢词根词),即 通道-用于进程间通信的OS对象。 通道API刚刚进行了相应的扩展,以读取Span和Memory中的数据。 异步版本使用内存,因为异步方法将使用自动生成的有限状态机转换为模板方法,在该方法中存储所有局部变量和方法参数,并且由于Span是ref只读结构,因此使用Span不能分别位于堆上在异步方法中是不可能的。 但是,该方法还有一个同步版本,可让您使用Span。 在我的示例中,我尝试了两者,结果表明在这种情况下,同步版本显示得更好。 使用它时,垃圾收集较少,并且数据处理速度更快。 但这仅仅是因为有大量数据。 如果很可能在申请下一个批处理时没有数据,则应使用异步版本,以免使处理器空闲。
该示例具有注释,解释了一些要点。 我要提请您注意的事实是,尽管负责从管道读取和处理的程序片段是分开的,但是在写入文件时,还是从管道读取数据时完全从写入位置读取数据。

为了强大的功能,经过多年的发展-异步缅因州
  class Program { static async Task Main(string args) { var pipe = new Pipe(); var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe"); var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader); var cts = new CancellationTokenSource(); await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token)); } } 


Pipepatawriter
  public class PipeDataWriter { private readonly NamedPipeClientStream _namedPipe; private readonly PipeWriter _pipeWriter; private const string Servername = "."; public PipeDataWriter(PipeWriter pipeWriter, string pipeName) { _pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter)); _namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In); } public async Task ReadFromPipeAsync(CancellationToken token) { await _namedPipe.ConnectAsync(token); while (true) { token.ThrowIfCancellationRequested(); ////       Memory<T> //Memory<byte> buffer = _pipeWriter.GetMemory(); ////       Memory<T> ////         -       . //int readBytes = await _namedPipe.ReadAsync(buffer, token); //         PipeWriter Span //         -       . int readBytes = _namedPipe.Read(_pipeWriter.GetSpan()); //      ,        //         if (readBytes == 0) { await Task.Delay(500, token); continue; } // ,       _pipeWriter.Advance(readBytes); //  ,      PipeReader FlushResult result = await _pipeWriter.FlushAsync(token); //  PipeReader  ,       //        ,      if (result.IsCompleted) { break; } } //  _pipeWriter     Pipe _pipeWriter.Complete(); } } 


数据处理器
  public class DataProcessor { private readonly IBytesProcessor _bytesProcessor; private readonly PipeReader _pipeReader; public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader) { _bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor)); _pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader)); } public async Task StartProcessingDataAsync(CancellationToken token) { while (true) { token.ThrowIfCancellationRequested(); //     Pipe ReadResult result = await _pipeReader.ReadAsync(token); ReadOnlySequence<byte> buffer = result.Buffer; //      await _bytesProcessor.ProcessBytesAsync(buffer, token); // ,      .       ,   //  ,               //    IBytesProcessor.ProcessBytesAsync   ,    _pipeReader.AdvanceTo(buffer.End); //  PipeWriter  ,      //      ,      if (result.IsCompleted) { break; } } //  _pipeReader     Pipe _pipeReader.Complete(); } } 


字节处理器
  public interface IBytesProcessor { Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token); } public class ConsoleBytesProcessor : IBytesProcessor { //,         IDisposable readonly FileStream _fileStream = new FileStream("buffer", FileMode.Create); public Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token) { if (bytesSequence.IsSingleSegment) { ProcessSingle(bytesSequence.First.Span); } else { foreach (var segment in bytesSequence) { ProcessSingle(segment.Span); } } return Task.CompletedTask; } private void ProcessSingle(ReadOnlySpan<byte> span) { _fileStream.Write(span); } } 

Source: https://habr.com/ru/post/zh-CN464921/


All Articles