diff --git a/samples/ClientApplication/ClientApplication.csproj b/samples/ClientApplication/ClientApplication.csproj index 9f2037a7..4f3a11a7 100644 --- a/samples/ClientApplication/ClientApplication.csproj +++ b/samples/ClientApplication/ClientApplication.csproj @@ -6,6 +6,15 @@ + + + + + + + + + @@ -21,5 +30,9 @@ + + + + diff --git a/samples/ClientApplication/Program.cs b/samples/ClientApplication/Program.cs index c84f0f68..3421354d 100644 --- a/samples/ClientApplication/Program.cs +++ b/samples/ClientApplication/Program.cs @@ -18,6 +18,8 @@ using Microsoft.Extensions.Logging; using Protocols; using Bedrock.Framework.Experimental.Protocols.RabbitMQ.Methods; +using ServerApplication.Framing.VariableSized.LengthFielded; +using Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded; namespace ClientApplication { @@ -39,9 +41,10 @@ static async Task Main(string[] args) Console.WriteLine("4. Echo Server With TLS enabled"); Console.WriteLine("5. In Memory Transport Echo Server and client"); Console.WriteLine("6. Length prefixed custom binary protocol"); - Console.WriteLine("7. Talk to local docker dameon"); - Console.WriteLine("8. Memcached protocol"); - Console.WriteLine("9. RebbitMQ protocol"); + Console.WriteLine("7. Header prefixed protocol"); + Console.WriteLine("8. Talk to local docker daemon"); + Console.WriteLine("9. Memcached protocol"); + Console.WriteLine("0. RabbitMQ protocol"); while (true) { @@ -78,16 +81,21 @@ static async Task Main(string[] args) await CustomProtocol(); } else if (keyInfo.Key == ConsoleKey.D7) + { + Console.WriteLine("Variable size length fielded protocol."); + await VariableSizedLengthFieldedProtocol(); + } + else if (keyInfo.Key == ConsoleKey.D8) { Console.WriteLine("Talk to local docker daemon"); await DockerDaemon(serviceProvider); } - else if (keyInfo.Key == ConsoleKey.D8) + else if (keyInfo.Key == ConsoleKey.D9) { Console.WriteLine("Send Request To Memcached"); await MemcachedProtocol(serviceProvider); } - else if (keyInfo.Key == ConsoleKey.D9) + else if (keyInfo.Key == ConsoleKey.D0) { Console.WriteLine("RabbitMQ test"); await RabbitMQProtocol(serviceProvider); @@ -353,6 +361,50 @@ private static async Task CustomProtocol() } } + private static async Task VariableSizedLengthFieldedProtocol() + { + using var loggerFactory = LoggerFactory.Create(builder => + { + builder.SetMinimumLevel(LogLevel.Debug); + builder.AddConsole(); + }); + + var client = new ClientBuilder() + .UseSockets() + .UseConnectionLogging(loggerFactory: loggerFactory) + .Build(); + + await using var connection = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 5006)); + Console.WriteLine($"Connected to {connection.RemoteEndPoint}"); + Console.WriteLine("Enter 'c' to close the connection."); + + var headerFactory = new HeaderFactory(); + var protocol = new LengthFieldedProtocol(Helper.HeaderLength, (headerSequence) => headerFactory.CreateHeader(headerSequence)); + await using var writer = connection.CreateWriter(); + + while (true) + { + Console.WriteLine("Enter the text: "); + var line = Console.ReadLine(); + if (line.Equals("c", StringComparison.Ordinal)) + { + break; + } + + Console.WriteLine("Enter a number as custom data: "); + int someCustomData = int.Parse(Console.ReadLine()); + + var payload = Encoding.UTF8.GetBytes(line); + var header = headerFactory.CreateHeader(payload.Length, someCustomData); + var frame = new Frame(header, payload); + + await writer.WriteAsync(protocol, frame); + } + + connection.Abort(); + await connection.DisposeAsync(); + } + private static async Task DockerDaemon(IServiceProvider serviceProvider) { var client = new ClientBuilder(serviceProvider) diff --git a/samples/ServerApplication/Framing/VariableSized/LengthFielded/Header.cs b/samples/ServerApplication/Framing/VariableSized/LengthFielded/Header.cs new file mode 100644 index 00000000..6d3de218 --- /dev/null +++ b/samples/ServerApplication/Framing/VariableSized/LengthFielded/Header.cs @@ -0,0 +1,61 @@ +using System; +using Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded; + +namespace ServerApplication.Framing.VariableSized.LengthFielded +{ + internal class Header : IHeader, IEquatable
+ { + private byte[] _headerAsArray; + + public int PayloadLength { get; } + public int SomeCustomData { get; } + + public Header(int payloadLength, int someCustomData) + { + PayloadLength = payloadLength; + SomeCustomData = someCustomData; + } + + public Header(ReadOnlySpan headerAsSpan) + { + PayloadLength = BitConverter.ToInt32(headerAsSpan.Slice(0, 4)); + SomeCustomData = BitConverter.ToInt32(headerAsSpan.Slice(4)); + } + + public ReadOnlySpan AsSpan() + { + // Lazy creating the array. + if (_headerAsArray is null) + { + var payloadLengthAsArray = BitConverter.GetBytes(PayloadLength); + var someCustomDataAsArray = BitConverter.GetBytes(SomeCustomData); + + _headerAsArray = new byte[Helper.HeaderLength]; + _headerAsArray[0] = payloadLengthAsArray[0]; + _headerAsArray[1] = payloadLengthAsArray[1]; + _headerAsArray[2] = payloadLengthAsArray[2]; + _headerAsArray[3] = payloadLengthAsArray[3]; + _headerAsArray[4] = someCustomDataAsArray[0]; + _headerAsArray[5] = someCustomDataAsArray[1]; + _headerAsArray[6] = someCustomDataAsArray[2]; + _headerAsArray[7] = someCustomDataAsArray[3]; + } + + return _headerAsArray.AsSpan(); + } + + public override string ToString() => $"Payload length: {PayloadLength} - Some custom data: {SomeCustomData}"; + + #region IEquatable + public override bool Equals(object obj) => Equals((Header)obj); + + public override int GetHashCode() => HashCode.Combine(PayloadLength, SomeCustomData); + + public bool Equals(Header other) => PayloadLength == other.PayloadLength && SomeCustomData.Equals(other.SomeCustomData); + + public static bool operator ==(Header left, Header right) => left.Equals(right); + + public static bool operator !=(Header left, Header right) => !(left == right); + #endregion + } +} diff --git a/samples/ServerApplication/Framing/VariableSized/LengthFielded/HeaderFactory.cs b/samples/ServerApplication/Framing/VariableSized/LengthFielded/HeaderFactory.cs new file mode 100644 index 00000000..44af9642 --- /dev/null +++ b/samples/ServerApplication/Framing/VariableSized/LengthFielded/HeaderFactory.cs @@ -0,0 +1,26 @@ +using System; +using System.Buffers; + +namespace ServerApplication.Framing.VariableSized.LengthFielded +{ + internal class HeaderFactory + { + public Header CreateHeader(int payloadLength, int someCustomData) => new Header(payloadLength, someCustomData); + + public Header CreateHeader(in ReadOnlySequence headerSequence) + { + if (headerSequence.IsSingleSegment) + { + return CreateHeader(headerSequence.FirstSpan); + } + else + { + Span headerData = stackalloc byte[Helper.HeaderLength]; + headerSequence.CopyTo(headerData); + return CreateHeader(headerData); + } + } + + public Header CreateHeader(in ReadOnlySpan headerData) => new Header(headerData); + } +} diff --git a/samples/ServerApplication/Framing/VariableSized/LengthFielded/HeaderPrefixedApplication.cs b/samples/ServerApplication/Framing/VariableSized/LengthFielded/HeaderPrefixedApplication.cs new file mode 100644 index 00000000..a0a45f07 --- /dev/null +++ b/samples/ServerApplication/Framing/VariableSized/LengthFielded/HeaderPrefixedApplication.cs @@ -0,0 +1,88 @@ +using System; +using System.Buffers; +using System.Text; +using System.Threading.Tasks; +using Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded; +using Bedrock.Framework.Protocols; +using Microsoft.AspNetCore.Connections; +using Microsoft.Extensions.Logging; + +namespace ServerApplication.Framing.VariableSized.LengthFielded +{ + internal partial class HeaderPrefixedApplication : ConnectionHandler + { + private readonly ILogger _logger; + private readonly HeaderFactory _headerFactory; + + #region Logs +#if NETCOREAPP3_1 + private static readonly Action _logConnected = + LoggerMessage.Define(logLevel: LogLevel.Information, eventId: 0, formatString: "{ConnectionId} connected."); + + private static readonly Action _logMessageReceived = + LoggerMessage.Define(logLevel: LogLevel.Information, eventId: 0, formatString: "Message received - Header: {Header} - Payload: {Payload}"); + + private static readonly Action _logDisconnected = + LoggerMessage.Define(logLevel: LogLevel.Information, eventId: 0, formatString: "{ConnectionId} disconnected."); +#elif NET6_0_OR_GREATER + [LoggerMessage(0, LogLevel.Information, "{ConnectionId} connected.")] + partial void LogConnected(string connectionId); + + [LoggerMessage(0, LogLevel.Information, "Message received - Header: {Header} - Payload: {Payload}")] + partial void LogMessageReceived(IHeader header, string payload); + + [LoggerMessage(0, LogLevel.Information, "{ConnectionId} disconnected.")] + partial void LogDisconnected(string connectionId); +#endif + #endregion + + public HeaderPrefixedApplication(ILogger logger, HeaderFactory headerFactory) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _headerFactory = headerFactory ?? throw new ArgumentNullException(nameof(headerFactory)); + } + + public override async Task OnConnectedAsync(ConnectionContext connection) + { +#if NETCOREAPP3_1 + _logConnected(_logger, connection.ConnectionId, null); +#elif NET6_0_OR_GREATER + LogConnected(connection.ConnectionId); +#endif + // Use the header prefixed protocol + var headerFactory = _headerFactory; // Capturing members in anonymous methods results memory leak, that's why we introduce a local variable. + var protocol = new LengthFieldedProtocol(Helper.HeaderLength, (headerSequence) => headerFactory.CreateHeader(headerSequence)); + var reader = connection.CreateReader(); + var writer = connection.CreateWriter(); + + while (true) + { + try + { + var result = await reader.ReadAsync(protocol); + var message = result.Message; + +#if NETCOREAPP3_1 + _logMessageReceived(_logger, message.Header, Encoding.UTF8.GetString(message.Payload.ToArray()), null); +#elif NET6_0_OR_GREATER + LogMessageReceived(message.Header, Encoding.UTF8.GetString(message.Payload)); +#endif + if (result.IsCompleted) + { + break; + } + } + finally + { + reader.Advance(); + } + } + +#if NETCOREAPP3_1 + _logDisconnected(_logger, connection.ConnectionId, null); +#elif NET6_0_OR_GREATER + LogDisconnected(connection.ConnectionId); +#endif + } + } +} diff --git a/samples/ServerApplication/Framing/VariableSized/LengthFielded/Helper.cs b/samples/ServerApplication/Framing/VariableSized/LengthFielded/Helper.cs new file mode 100644 index 00000000..9ad822f8 --- /dev/null +++ b/samples/ServerApplication/Framing/VariableSized/LengthFielded/Helper.cs @@ -0,0 +1,8 @@ +namespace ServerApplication.Framing.VariableSized.LengthFielded +{ + internal static class Helper + { + // Size of the Header. In this case 2 * int -> 2 * 4. + public static int HeaderLength => 8; + } +} diff --git a/samples/ServerApplication/Program.cs b/samples/ServerApplication/Program.cs index 359c2bed..88e47cc4 100644 --- a/samples/ServerApplication/Program.cs +++ b/samples/ServerApplication/Program.cs @@ -7,6 +7,7 @@ using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using ServerApplication.Framing.VariableSized.LengthFielded; namespace ServerApplication { @@ -23,6 +24,7 @@ public static async Task Main(string[] args) }); services.AddSignalR(); + services.AddSingleton(); var serviceProvider = services.BuildServiceProvider(); @@ -56,8 +58,13 @@ public static async Task Main(string[] args) }) .UseConnectionLogging().UseConnectionHandler()); + // Length prefixed server sockets.Listen(IPAddress.Loopback, 5005, builder => builder.UseConnectionLogging().UseConnectionHandler()); + + // Header prefixed server + sockets.Listen(IPAddress.Loopback, 5006, + builder => builder.UseConnectionLogging().UseConnectionHandler()); }) .Build(); diff --git a/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/Frame.cs b/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/Frame.cs new file mode 100644 index 00000000..4d9df07a --- /dev/null +++ b/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/Frame.cs @@ -0,0 +1,20 @@ +using System.Buffers; + +namespace Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded +{ + public readonly struct Frame + { + public IHeader Header { get; } + public ReadOnlySequence Payload { get; } + + public Frame(IHeader header, byte[] payload) : this(header, new ReadOnlySequence(payload)) + { + } + + public Frame(IHeader header, ReadOnlySequence payload) + { + Header = header; + Payload = payload; + } + } +} diff --git a/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/IHeader.cs b/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/IHeader.cs new file mode 100644 index 00000000..2f7caf3e --- /dev/null +++ b/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/IHeader.cs @@ -0,0 +1,11 @@ +using System; + +namespace Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded +{ + public interface IHeader + { + public int PayloadLength { get; } + + public ReadOnlySpan AsSpan(); + } +} diff --git a/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/LengthFieldedProtocol.cs b/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/LengthFieldedProtocol.cs new file mode 100644 index 00000000..bb47e1ba --- /dev/null +++ b/src/Bedrock.Framework.Experimental/Protocols/Framing/VariableSized/LengthFielded/LengthFieldedProtocol.cs @@ -0,0 +1,93 @@ +using System; +using System.Buffers; +using Bedrock.Framework.Protocols; + +namespace Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded +{ + public class LengthFieldedProtocol : IMessageReader, IMessageWriter + { + private readonly int _headerLength; + private readonly Func, IHeader> _createHeader; + + private IHeader _header; + + public LengthFieldedProtocol(int headerLength, Func, IHeader> createHeader) + { + if (headerLength <= 0) + { + throw new ArgumentOutOfRangeException(nameof(headerLength), "Header length must be greater than 0."); + } + + _headerLength = headerLength; + _createHeader = createHeader ?? throw new ArgumentNullException(nameof(createHeader)); + } + + #region IMessageReader + public bool TryParseMessage(in ReadOnlySequence input, ref SequencePosition consumed, ref SequencePosition examined, out Frame message) + { + // Header + if (_header == null) + { + if (!TryParseHeader(input, out var headerSequence)) + { + message = default; + return false; + } + + _header = _createHeader(headerSequence); + } + + if (!TryParsePayload(input, out var payloadSequence)) + { + message = default; + return false; + } + + consumed = payloadSequence.End; + examined = consumed; + message = new Frame(_header, payloadSequence); + _header = null; + return true; + } + + private bool TryParseHeader(in ReadOnlySequence input, out ReadOnlySequence headerSequence) + { + if (input.Length < _headerLength) + { + headerSequence = default; + return false; + } + + headerSequence = input.Slice(0, _headerLength); + return true; + } + + private bool TryParsePayload(in ReadOnlySequence input, out ReadOnlySequence payloadSequence) + { + int messageLength = _headerLength + _header.PayloadLength; + if (input.Length < messageLength) + { + payloadSequence = default; + return false; + } + + payloadSequence = input.Slice(_headerLength, _header.PayloadLength); + return true; + } + #endregion + + #region IMessageWriter + public void WriteMessage(Frame message, IBufferWriter output) + { + // Header + output.Write(message.Header.AsSpan()); + + // Payload + foreach (var payloadSegment in message.Payload) + { + output.Write(payloadSegment.Span); + } + } + #endregion + } +} diff --git a/tests/Bedrock.Framework.Benchmarks/Bedrock.Framework.Benchmarks.csproj b/tests/Bedrock.Framework.Benchmarks/Bedrock.Framework.Benchmarks.csproj index 6d2c383a..38b846a1 100644 --- a/tests/Bedrock.Framework.Benchmarks/Bedrock.Framework.Benchmarks.csproj +++ b/tests/Bedrock.Framework.Benchmarks/Bedrock.Framework.Benchmarks.csproj @@ -5,6 +5,12 @@ netcoreapp3.1;net6.0 + + + + + + diff --git a/tests/Bedrock.Framework.Benchmarks/Framing/VariableSized/LengthFielded/LengthFieldedProtocolBenchmarks.cs b/tests/Bedrock.Framework.Benchmarks/Framing/VariableSized/LengthFielded/LengthFieldedProtocolBenchmarks.cs new file mode 100644 index 00000000..3d612611 --- /dev/null +++ b/tests/Bedrock.Framework.Benchmarks/Framing/VariableSized/LengthFielded/LengthFieldedProtocolBenchmarks.cs @@ -0,0 +1,97 @@ +using Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded; +using Bedrock.Framework.Protocols; +using BenchmarkDotNet.Attributes; +using ServerApplication.Framing.VariableSized.LengthFielded; +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Threading.Tasks; + +namespace Bedrock.Framework.Benchmarks.Framing.VariableSized.LengthFielded +{ + /// + /// Benchmarks for . + /// + [MemoryDiagnoser] + public class LengthFieldedProtocolBenchmarks + { + private readonly Pipe _dataPipe; + private readonly PipeWriter _writer; + private readonly HeaderFactory _headerFactory; + private readonly int _customHeaderData; + private readonly Header _header; + private readonly ProtocolReader _protocolReader; + private readonly LengthFieldedProtocol _lengthFieldedProtocol; + private readonly byte[] _data; + private readonly int _halfDataSize; + + [Params(10, 100, 1000)] + public int MessageSize { get; set; } + + public LengthFieldedProtocolBenchmarks() + { + _dataPipe = new Pipe(); + _writer = _dataPipe.Writer; + _data = new byte[MessageSize]; + _data.AsSpan().Fill(1); + _halfDataSize = MessageSize / 2; + + _headerFactory = new HeaderFactory(); + _customHeaderData = 0; + _header = _headerFactory.CreateHeader(_data.Length, _customHeaderData); + _lengthFieldedProtocol = new LengthFieldedProtocol(Helper.HeaderLength, (headerSequence) => _headerFactory.CreateHeader(headerSequence)); + _protocolReader = new ProtocolReader(_dataPipe.Reader); + } + + /// + /// Baseline writing the data to the pipe with out Bedrock; used to compare the cost of adding the Lenght Fielded Protocol. + /// + [Benchmark(Baseline = true)] + public async ValueTask PipeOnly() + { + _writer.Write(_header.AsSpan()); + _writer.Write(_data.AsSpan()); + await _writer.FlushAsync(); + + var result = await _dataPipe.Reader.ReadAsync(); + + _dataPipe.Reader.AdvanceTo(result.Buffer.End); + } + + + /// + /// Benchmark reading a stream where the entire message is always available. + /// + [Benchmark] + public async ValueTask ReadProtocolWithWholeMessageAvailable() + { + _writer.Write(_header.AsSpan()); + _writer.Write(_data.AsSpan()); + await _writer.FlushAsync(); + + await _protocolReader.ReadAsync(_lengthFieldedProtocol); + + _protocolReader.Advance(); + } + + /// + /// Benchmark reading a stream where the entire message is never available. + /// + [Benchmark] + public async ValueTask ReadProtocolWithPartialMessageAvailable() + { + _writer.Write(_header.AsSpan()); + _writer.Write(_data.AsSpan(0, _halfDataSize)); + await _writer.FlushAsync(); + + var readTask = _protocolReader.ReadAsync(_lengthFieldedProtocol); + + _writer.Write(_data.AsSpan(_halfDataSize)); + await _writer.FlushAsync(); + + await readTask; + + _protocolReader.Advance(); + } + } +} diff --git a/tests/Bedrock.Framework.Benchmarks/Program.cs b/tests/Bedrock.Framework.Benchmarks/Program.cs index 7e74e95a..3c0108fb 100644 --- a/tests/Bedrock.Framework.Benchmarks/Program.cs +++ b/tests/Bedrock.Framework.Benchmarks/Program.cs @@ -1,4 +1,5 @@ -using BenchmarkDotNet.Running; +using Bedrock.Framework.Benchmarks.Framing.VariableSized.LengthFielded; +using BenchmarkDotNet.Running; using System; namespace Bedrock.Framework.Benchmarks @@ -13,6 +14,7 @@ static void Main(string[] args) private static readonly Type[] AllBenchmarks = new[] { typeof(ProtocolReaderBenchmarks), + typeof(LengthFieldedProtocolBenchmarks), typeof(MessagePipeReaderBenchmarks), typeof(HttpHeaderReaderBenchmarks) }; diff --git a/tests/Bedrock.Framework.Tests/Bedrock.Framework.Tests.csproj b/tests/Bedrock.Framework.Tests/Bedrock.Framework.Tests.csproj index ca947092..1fe8a61f 100644 --- a/tests/Bedrock.Framework.Tests/Bedrock.Framework.Tests.csproj +++ b/tests/Bedrock.Framework.Tests/Bedrock.Framework.Tests.csproj @@ -6,6 +6,12 @@ false + + + + + + diff --git a/tests/Bedrock.Framework.Tests/Protocols/Framing/VariableSized/LengthFielded/LengthFieldedProtocolTests.cs b/tests/Bedrock.Framework.Tests/Protocols/Framing/VariableSized/LengthFielded/LengthFieldedProtocolTests.cs new file mode 100644 index 00000000..ea8339b3 --- /dev/null +++ b/tests/Bedrock.Framework.Tests/Protocols/Framing/VariableSized/LengthFielded/LengthFieldedProtocolTests.cs @@ -0,0 +1,101 @@ +using System; +using System.Text; +using System.Threading.Tasks; +using Xunit; +using Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded; +using ServerApplication.Framing.VariableSized.LengthFielded; +using Bedrock.Framework.Protocols; +using System.IO; +using System.IO.Pipelines; +using System.Buffers; + +namespace Bedrock.Framework.Tests.Protocols.Framing.VariableSized.LengthFielded +{ + public class LengthFieldedProtocolTests + { + private static ProtocolReader CreateReader(LengthFieldedProtocol protocol, out Func writeFunc) + { + var stream = new MemoryStream(); + var writer = PipeWriter.Create(stream); + var reader = new ProtocolReader(PipeReader.Create(stream)); + + long written = 0; + writeFunc = async frame => + { + var position = stream.Position; + stream.Position = written; + protocol.WriteMessage(frame, writer); + await writer.FlushAsync().ConfigureAwait(false); + written = stream.Position; + stream.Position = position; + }; + return reader; + } + + [Fact] + public async Task SingleMessageWorks() + { + // Arrange + var headerFactory = new HeaderFactory(); + var protocol = new LengthFieldedProtocol(Helper.HeaderLength, (headerSequence) => headerFactory.CreateHeader(headerSequence)); + var reader = CreateReader(protocol, out var writeFunc); + + string payload = "This is a test payload."; + int customHeaderData = 123; + var payloadAsArray = Encoding.ASCII.GetBytes(payload); + var header = headerFactory.CreateHeader(payloadAsArray.Length, customHeaderData); + var frame = new Frame(header, payloadAsArray); + + // Act + await writeFunc(frame); + var readResult = await reader.ReadAsync(protocol); + var result = readResult.Message; + reader.Advance(); + + // Assert + Assert.Equal(header, result.Header); +#if NETCOREAPP3_1 + Assert.Equal(payload, Encoding.ASCII.GetString(result.Payload.ToArray())); +#elif NET6_0_OR_GREATER + Assert.Equal(payload, Encoding.ASCII.GetString(result.Payload)); +#endif + } + + [Fact] + public async Task MultipleMessagesWorks() + { + // Arrange + var headerFactory = new HeaderFactory(); + var protocol = new LengthFieldedProtocol(Helper.HeaderLength, (headerSequence) => headerFactory.CreateHeader(headerSequence)); + var reader = CreateReader(protocol, out var writeFunc); + + string payload = "This is a test payload."; + int customHeaderData = 123; + var payloadAsArray = Encoding.ASCII.GetBytes(payload); + var header = headerFactory.CreateHeader(payloadAsArray.Length, customHeaderData); + var frame = new Frame(header, payloadAsArray); + + // Act + for (var i = 0; i < 5; i++) + { + await writeFunc(frame); + } + + // Assert + for (var i = 0; i < 5; i++) + { + var readResult = await reader.ReadAsync(protocol); + var result = readResult.Message; + reader.Advance(); + + Assert.Equal(header, result.Header); +#if NETCOREAPP3_1 + Assert.Equal(payload, Encoding.ASCII.GetString(result.Payload.ToArray())); +#elif NET6_0_OR_GREATER + Assert.Equal(payload, Encoding.ASCII.GetString(result.Payload)); +#endif + } + } + + } +}