# c# System.IO.Pipelines 管道记录

一些使用 System.IO.Pipelines 管道做网络编程的 备忘

第一步,先从Nuget安装 System.IO.Pipelines

创建使用

C#
//1,从Pipe创建
var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024, resumeWriterThreshold: 128 * 1024));

var writer = pipe.Writer;
var reader = pipe.Reader;

//一个写入线程
while(true)
{
    //从writer申请一段内存,用来给socket接收数据
    Memory<byte> buffer = writer.GetMemory(8 * 1024);
    int length = await Socket.ReadAsync(buffer);
    if (length  == 0)
    {
        break;
    }
    //告诉writer,收到了多少数据
    writer.Advance(length);
    //通知reader,数据可以读了
    //当writer里的数据超过了 pauseWriterThreshold 就会阻塞,当数据少于resumeWriterThreshold时又被放开
    FlushResult result = await writer.FlushAsync();
    if (result.IsCanceled || result.IsCompleted)
    {
        break;
    }
}
writer.Complete();

//一个读取线程
while(true)
{
    //从reader读取数据
    ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false);
    ReadOnlySequence<byte> buffer = readResult.Buffer;
    if (buffer.Length == 0 || readResult.IsCompleted || readResult.IsCanceled)
    {
        break;
    }
    SequencePosition end = await ReadPacket(buffer).ConfigureAwait(false);
    //告诉reader,哪些数据已经被处理了
    reader.AdvanceTo(end);
}
reader.Complete();

//粘包
private async Task<SequencePosition> ReadPacket(ReadOnlySequence<byte> buffer)
{
    //分包
    while (buffer.Length > 4)
    {
        //读取头
        int length = ReaderHead(buffer);
        if (buffer.Length < length + 4)
        {
            break;
        }

        //拼接数据
        using MemoryStream memoryStream = new MemoryStream();
        ReadOnlySequence<byte> cache = buffer.Slice(4, length);
        SequencePosition position = cache.Start;
        while (cache.TryGet(ref position, out ReadOnlyMemory<byte> memory))
        {
            memoryStream.write(memory.Span);
        }
        //给业务去处理数据
        //memoryStream.ToArray();

        //分割去掉已使用的数据
        SequencePosition endPosition = buffer.GetPosition(4 + length);
        buffer = buffer.Slice(endPosition);
    }
    return buffer.Start;
}