一些使用 System.IO.Pipelines 管道做网络编程的 备忘
第一步,先从Nuget安装 System.IO.Pipelines
创建使用
C#
//1,从Pipe创建
var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024, resumeWriterThreshold: 128 * 1024));
var writer = pipe.Writer;
var reader = pipe.Reader;
//一个写入线程
while(true)
{
//从writer申请一段内存,用来给socket接收数据
Memory<byte> buffer = writer.GetMemory(8 * 1024);
int length = await Socket.ReadAsync(buffer);
if (length == 0)
{
break;
}
//告诉writer,收到了多少数据
writer.Advance(length);
//通知reader,数据可以读了
//当writer里的数据超过了 pauseWriterThreshold 就会阻塞,当数据少于resumeWriterThreshold时又被放开
FlushResult result = await writer.FlushAsync();
if (result.IsCanceled || result.IsCompleted)
{
break;
}
}
writer.Complete();
//一个读取线程
while(true)
{
//从reader读取数据
ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = readResult.Buffer;
if (buffer.Length == 0 || readResult.IsCompleted || readResult.IsCanceled)
{
break;
}
SequencePosition end = await ReadPacket(buffer).ConfigureAwait(false);
//告诉reader,哪些数据已经被处理了
reader.AdvanceTo(end);
}
reader.Complete();
//粘包
private async Task<SequencePosition> ReadPacket(ReadOnlySequence<byte> buffer)
{
//分包
while (buffer.Length > 4)
{
//读取头
int length = ReaderHead(buffer);
if (buffer.Length < length + 4)
{
break;
}
//拼接数据
using MemoryStream memoryStream = new MemoryStream();
ReadOnlySequence<byte> cache = buffer.Slice(4, length);
SequencePosition position = cache.Start;
while (cache.TryGet(ref position, out ReadOnlyMemory<byte> memory))
{
memoryStream.write(memory.Span);
}
//给业务去处理数据
//memoryStream.ToArray();
//分割去掉已使用的数据
SequencePosition endPosition = buffer.GetPosition(4 + length);
buffer = buffer.Slice(endPosition);
}
return buffer.Start;
}