Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framing - Variable Size Length Fielded Protocol #143

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions samples/ClientApplication/ClientApplication.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
</PropertyGroup>

<ItemGroup>
<Compile Remove="Framing\VariableSizeLengthFielded\**" />
<EmbeddedResource Remove="Framing\VariableSizeLengthFielded\**" />
<None Remove="Framing\VariableSizeLengthFielded\**" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\ServerApplication\Framing\VariableSized\LengthFielded\Header.cs" Link="Framing\VariableSized\LengthFielded\Header.cs" />
<Compile Include="..\ServerApplication\Framing\VariableSized\LengthFielded\HeaderFactory.cs" Link="Framing\VariableSized\LengthFielded\HeaderFactory.cs" />
<Compile Include="..\ServerApplication\Framing\VariableSized\LengthFielded\Helper.cs" Link="Framing\VariableSized\LengthFielded\Helper.cs" />
<Compile Include="..\ServerApplication\LengthPrefixedProtocol.cs" Link="LengthPrefixedProtocol.cs" />
</ItemGroup>

Expand All @@ -21,5 +30,9 @@
<ItemGroup>
<ProjectReference Include="..\..\src\Bedrock.Framework.Experimental\Bedrock.Framework.Experimental.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Framing\VariableSized\LengthFielded\" />
</ItemGroup>

</Project>
62 changes: 57 additions & 5 deletions samples/ClientApplication/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
using Microsoft.Extensions.Logging;
using Protocols;
using Bedrock.Framework.Experimental.Protocols.RabbitMQ.Methods;
using ServerApplication.Framing.VariableSized.LengthFielded;
using Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded;

namespace ClientApplication
{
Expand All @@ -39,9 +41,10 @@ static async Task Main(string[] args)
Console.WriteLine("4. Echo Server With TLS enabled");
Console.WriteLine("5. In Memory Transport Echo Server and client");
Console.WriteLine("6. Length prefixed custom binary protocol");
Console.WriteLine("7. Talk to local docker dameon");
Console.WriteLine("8. Memcached protocol");
Console.WriteLine("9. RebbitMQ protocol");
Console.WriteLine("7. Header prefixed protocol");
Console.WriteLine("8. Talk to local docker daemon");
Console.WriteLine("9. Memcached protocol");
Console.WriteLine("0. RabbitMQ protocol");

while (true)
{
Expand Down Expand Up @@ -78,16 +81,21 @@ static async Task Main(string[] args)
await CustomProtocol();
}
else if (keyInfo.Key == ConsoleKey.D7)
{
Console.WriteLine("Variable size length fielded protocol.");
await VariableSizedLengthFieldedProtocol();
}
else if (keyInfo.Key == ConsoleKey.D8)
{
Console.WriteLine("Talk to local docker daemon");
await DockerDaemon(serviceProvider);
}
else if (keyInfo.Key == ConsoleKey.D8)
else if (keyInfo.Key == ConsoleKey.D9)
{
Console.WriteLine("Send Request To Memcached");
await MemcachedProtocol(serviceProvider);
}
else if (keyInfo.Key == ConsoleKey.D9)
else if (keyInfo.Key == ConsoleKey.D0)
{
Console.WriteLine("RabbitMQ test");
await RabbitMQProtocol(serviceProvider);
Expand Down Expand Up @@ -353,6 +361,50 @@ private static async Task CustomProtocol()
}
}

private static async Task VariableSizedLengthFieldedProtocol()
{
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder.SetMinimumLevel(LogLevel.Debug);
builder.AddConsole();
});

var client = new ClientBuilder()
.UseSockets()
.UseConnectionLogging(loggerFactory: loggerFactory)
.Build();

await using var connection = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 5006));
Console.WriteLine($"Connected to {connection.RemoteEndPoint}");
Console.WriteLine("Enter 'c' to close the connection.");

var headerFactory = new HeaderFactory();
var protocol = new LengthFieldedProtocol(Helper.HeaderLength, (headerSequence) => headerFactory.CreateHeader(headerSequence));
await using var writer = connection.CreateWriter();

while (true)
{
Console.WriteLine("Enter the text: ");
var line = Console.ReadLine();
if (line.Equals("c", StringComparison.Ordinal))
{
break;
}

Console.WriteLine("Enter a number as custom data: ");
int someCustomData = int.Parse(Console.ReadLine());

var payload = Encoding.UTF8.GetBytes(line);
var header = headerFactory.CreateHeader(payload.Length, someCustomData);
var frame = new Frame(header, payload);

await writer.WriteAsync(protocol, frame);
}

connection.Abort();
await connection.DisposeAsync();
}

private static async Task DockerDaemon(IServiceProvider serviceProvider)
{
var client = new ClientBuilder(serviceProvider)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded;

namespace ServerApplication.Framing.VariableSized.LengthFielded
{
internal class Header : IHeader, IEquatable<Header>
{
private byte[] _headerAsArray;

public int PayloadLength { get; }
public int SomeCustomData { get; }

public Header(int payloadLength, int someCustomData)
{
PayloadLength = payloadLength;
SomeCustomData = someCustomData;
}

public Header(ReadOnlySpan<byte> headerAsSpan)
{
PayloadLength = BitConverter.ToInt32(headerAsSpan.Slice(0, 4));
SomeCustomData = BitConverter.ToInt32(headerAsSpan.Slice(4));
}

public ReadOnlySpan<byte> AsSpan()
{
// Lazy creating the array.
if (_headerAsArray is null)
{
var payloadLengthAsArray = BitConverter.GetBytes(PayloadLength);
var someCustomDataAsArray = BitConverter.GetBytes(SomeCustomData);

_headerAsArray = new byte[Helper.HeaderLength];
_headerAsArray[0] = payloadLengthAsArray[0];
_headerAsArray[1] = payloadLengthAsArray[1];
_headerAsArray[2] = payloadLengthAsArray[2];
_headerAsArray[3] = payloadLengthAsArray[3];
_headerAsArray[4] = someCustomDataAsArray[0];
_headerAsArray[5] = someCustomDataAsArray[1];
_headerAsArray[6] = someCustomDataAsArray[2];
_headerAsArray[7] = someCustomDataAsArray[3];
}

return _headerAsArray.AsSpan();
}

public override string ToString() => $"Payload length: {PayloadLength} - Some custom data: {SomeCustomData}";

#region IEquatable
public override bool Equals(object obj) => Equals((Header)obj);

public override int GetHashCode() => HashCode.Combine(PayloadLength, SomeCustomData);

public bool Equals(Header other) => PayloadLength == other.PayloadLength && SomeCustomData.Equals(other.SomeCustomData);

public static bool operator ==(Header left, Header right) => left.Equals(right);

public static bool operator !=(Header left, Header right) => !(left == right);
#endregion
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Buffers;

namespace ServerApplication.Framing.VariableSized.LengthFielded
{
internal class HeaderFactory
{
public Header CreateHeader(int payloadLength, int someCustomData) => new Header(payloadLength, someCustomData);

public Header CreateHeader(in ReadOnlySequence<byte> headerSequence)
{
if (headerSequence.IsSingleSegment)
{
return CreateHeader(headerSequence.FirstSpan);
}
else
{
Span<byte> headerData = stackalloc byte[Helper.HeaderLength];
headerSequence.CopyTo(headerData);
return CreateHeader(headerData);
}
}

public Header CreateHeader(in ReadOnlySpan<byte> headerData) => new Header(headerData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Buffers;
using System.Text;
using System.Threading.Tasks;
using Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded;
using Bedrock.Framework.Protocols;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;

namespace ServerApplication.Framing.VariableSized.LengthFielded
{
internal partial class HeaderPrefixedApplication : ConnectionHandler
{
private readonly ILogger _logger;
private readonly HeaderFactory _headerFactory;

#region Logs
#if NETCOREAPP3_1
private static readonly Action<ILogger, string, Exception?> _logConnected =
LoggerMessage.Define<string>(logLevel: LogLevel.Information, eventId: 0, formatString: "{ConnectionId} connected.");

private static readonly Action<ILogger, IHeader, string, Exception?> _logMessageReceived =
LoggerMessage.Define<IHeader, string>(logLevel: LogLevel.Information, eventId: 0, formatString: "Message received - Header: {Header} - Payload: {Payload}");

private static readonly Action<ILogger, string, Exception?> _logDisconnected =
LoggerMessage.Define<string>(logLevel: LogLevel.Information, eventId: 0, formatString: "{ConnectionId} disconnected.");
#elif NET6_0_OR_GREATER
[LoggerMessage(0, LogLevel.Information, "{ConnectionId} connected.")]
partial void LogConnected(string connectionId);

[LoggerMessage(0, LogLevel.Information, "Message received - Header: {Header} - Payload: {Payload}")]
partial void LogMessageReceived(IHeader header, string payload);

[LoggerMessage(0, LogLevel.Information, "{ConnectionId} disconnected.")]
partial void LogDisconnected(string connectionId);
#endif
#endregion

public HeaderPrefixedApplication(ILogger<HeaderPrefixedApplication> logger, HeaderFactory headerFactory)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_headerFactory = headerFactory ?? throw new ArgumentNullException(nameof(headerFactory));
}

public override async Task OnConnectedAsync(ConnectionContext connection)
{
#if NETCOREAPP3_1
_logConnected(_logger, connection.ConnectionId, null);
#elif NET6_0_OR_GREATER
LogConnected(connection.ConnectionId);
#endif
// Use the header prefixed protocol
var headerFactory = _headerFactory; // Capturing members in anonymous methods results memory leak, that's why we introduce a local variable.
var protocol = new LengthFieldedProtocol(Helper.HeaderLength, (headerSequence) => headerFactory.CreateHeader(headerSequence));
var reader = connection.CreateReader();
var writer = connection.CreateWriter();

while (true)
{
try
{
var result = await reader.ReadAsync(protocol);
var message = result.Message;

#if NETCOREAPP3_1
_logMessageReceived(_logger, message.Header, Encoding.UTF8.GetString(message.Payload.ToArray()), null);
#elif NET6_0_OR_GREATER
LogMessageReceived(message.Header, Encoding.UTF8.GetString(message.Payload));
#endif
if (result.IsCompleted)
{
break;
}
}
finally
{
reader.Advance();
}
}

#if NETCOREAPP3_1
_logDisconnected(_logger, connection.ConnectionId, null);
#elif NET6_0_OR_GREATER
LogDisconnected(connection.ConnectionId);
#endif
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace ServerApplication.Framing.VariableSized.LengthFielded
{
internal static class Helper
{
// Size of the Header. In this case 2 * int -> 2 * 4.
public static int HeaderLength => 8;
}
}
7 changes: 7 additions & 0 deletions samples/ServerApplication/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ServerApplication.Framing.VariableSized.LengthFielded;

namespace ServerApplication
{
Expand All @@ -23,6 +24,7 @@ public static async Task Main(string[] args)
});

services.AddSignalR();
services.AddSingleton<HeaderFactory>();

var serviceProvider = services.BuildServiceProvider();

Expand Down Expand Up @@ -56,8 +58,13 @@ public static async Task Main(string[] args)
})
.UseConnectionLogging().UseConnectionHandler<EchoServerApplication>());

// Length prefixed server
sockets.Listen(IPAddress.Loopback, 5005,
builder => builder.UseConnectionLogging().UseConnectionHandler<MyCustomProtocol>());

// Header prefixed server
sockets.Listen(IPAddress.Loopback, 5006,
builder => builder.UseConnectionLogging().UseConnectionHandler<HeaderPrefixedApplication>());
})
.Build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Buffers;

namespace Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded
{
public readonly struct Frame
{
public IHeader Header { get; }
public ReadOnlySequence<byte> Payload { get; }

public Frame(IHeader header, byte[] payload) : this(header, new ReadOnlySequence<byte>(payload))
{
}

public Frame(IHeader header, ReadOnlySequence<byte> payload)
{
Header = header;
Payload = payload;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

namespace Bedrock.Framework.Experimental.Protocols.Framing.VariableSized.LengthFielded
{
public interface IHeader
{
public int PayloadLength { get; }

public ReadOnlySpan<byte> AsSpan();
}
}
Loading