Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update custom protocol #130

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions samples/ClientApplication/ClientApplication.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand All @@ -14,8 +14,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="3.0.0" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.0.8" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="6.0.0" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.1.0" />
</ItemGroup>

<ItemGroup>
Expand Down
63 changes: 39 additions & 24 deletions samples/ClientApplication/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System;
using System.IO;
using System.IO.Pipelines;
using System.Net;
Expand Down Expand Up @@ -32,20 +31,21 @@ static async Task Main(string[] args)
})
.BuildServiceProvider();

Console.WriteLine("Samples: ");
Console.WriteLine("1. Echo Server");
Console.WriteLine("2. HttpClient");
Console.WriteLine("3. SignalR");
Console.WriteLine("4. Echo Server With TLS enabled");
Console.WriteLine("5. In Memory Transport Echo Server and client");
Console.WriteLine("6. Length prefixed custom binary protocol");
Console.WriteLine("7. Talk to local docker dameon");
Console.WriteLine("8. Memcached protocol");
Console.WriteLine("9. RebbitMQ protocol");

while (true)
{
Console.WriteLine("Samples: ");
Console.WriteLine("1. Echo Server");
Console.WriteLine("2. HttpClient");
Console.WriteLine("3. SignalR");
Console.WriteLine("4. Echo Server With TLS enabled");
Console.WriteLine("5. In Memory Transport Echo Server and client");
Console.WriteLine("6. Length prefixed custom binary protocol");
Console.WriteLine("7. Talk to local docker dameon");
Console.WriteLine("8. Memcached protocol");
Console.WriteLine("9. RebbitMQ protocol");

var keyInfo = Console.ReadKey();
Console.Clear();

if (keyInfo.Key == ConsoleKey.D1)
{
Expand Down Expand Up @@ -92,12 +92,14 @@ static async Task Main(string[] args)
Console.WriteLine("RabbitMQ test");
await RabbitMQProtocol(serviceProvider);
}

Console.Clear();
}
}

private static async Task RabbitMQProtocol(IServiceProvider serviceProvider)
{
var loggerFactory = LoggerFactory.Create(builder =>
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder.SetMinimumLevel(LogLevel.Error);
builder.AddConsole();
Expand Down Expand Up @@ -139,7 +141,7 @@ await rabbitMqClientProtocol.SendAsync(new ConnectionOpen(new ReadOnlyMemory<byt

private static async Task MemcachedProtocol(IServiceProvider serviceProvider)
{
var loggerFactory = LoggerFactory.Create(builder =>
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder.SetMinimumLevel(LogLevel.Error);
builder.AddConsole();
Expand Down Expand Up @@ -171,7 +173,7 @@ private static async Task EchoServer(IServiceProvider serviceProvider)
.Build();

var connection = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 5000));
Console.WriteLine($"Connected to {connection.LocalEndPoint}");
Console.WriteLine($"Connected to {connection.RemoteEndPoint}");

Console.WriteLine("Echo server running, type into the console");
var reads = Console.OpenStandardInput().CopyToAsync(connection.Transport.Output);
Expand Down Expand Up @@ -255,7 +257,6 @@ private static async Task SignalR()
}
}


private static async Task EchoServerWithTls(ServiceProvider serviceProvider)
{
var client = new ClientBuilder(serviceProvider)
Expand All @@ -276,7 +277,7 @@ private static async Task EchoServerWithTls(ServiceProvider serviceProvider)
.Build();

var connection = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 5004));
Console.WriteLine($"Connected to {connection.LocalEndPoint}");
Console.WriteLine($"Connected to {connection.RemoteEndPoint}");

Console.WriteLine("Echo server running, type into the console");
var reads = Console.OpenStandardInput().CopyToAsync(connection.Transport.Output);
Expand Down Expand Up @@ -306,7 +307,7 @@ private static async Task InMemoryEchoTransport(IServiceProvider serviceProvider
Console.WriteLine("Started Server");

var connection = await client.ConnectAsync(endpoint: null);
Console.WriteLine($"Connected to {connection.LocalEndPoint}");
Console.WriteLine($"Connected to {connection.RemoteEndPoint}");

Console.WriteLine("Echo server running, type into the console");
var reads = Console.OpenStandardInput().CopyToAsync(connection.Transport.Output);
Expand All @@ -320,7 +321,7 @@ private static async Task InMemoryEchoTransport(IServiceProvider serviceProvider

private static async Task CustomProtocol()
{
var loggerFactory = LoggerFactory.Create(builder =>
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder.SetMinimumLevel(LogLevel.Debug);
builder.AddConsole();
Expand All @@ -332,25 +333,39 @@ private static async Task CustomProtocol()
.Build();

await using var connection = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 5005));
Console.WriteLine($"Connected to {connection.LocalEndPoint}");

Console.WriteLine($"Connected to {connection.RemoteEndPoint}");
Console.WriteLine("Enter 'c' to close the connection.");

var protocol = new LengthPrefixedProtocol();
var reader = connection.CreateReader();
var writer = connection.CreateWriter();
await using var reader = connection.CreateReader();
await using var writer = connection.CreateWriter();

while (true)
{
var line = Console.ReadLine();
if (line.Equals("c"))
{
await reader.CompleteAsync();
await writer.CompleteAsync();
break;
}

await writer.WriteAsync(protocol, new Message(Encoding.UTF8.GetBytes(line)));
var result = await reader.ReadAsync(protocol);

if (result.IsCompleted)
if (result.IsCompleted || result.IsCanceled)
{
break;
}

reader.Advance();
}

connection.Abort();

// If the DisposeAsync not called explicitly, the connection won't close.
await connection.DisposeAsync();
}

private static async Task DockerDaemon(IServiceProvider serviceProvider)
Expand Down
7 changes: 3 additions & 4 deletions samples/ServerApplication/MqttApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ private async Task OnClientConnectedAsync(IMqttChannelAdapter adapter)
{
while (true)
{
var packet = await adapter.ReceivePacketAsync(Timeout.InfiniteTimeSpan, default);
var packet = await adapter.ReceivePacketAsync(default);

switch (packet)
{
Expand All @@ -29,8 +29,7 @@ await adapter.SendPacketAsync(new MqttConnAckPacket
ReturnCode = MqttConnectReturnCode.ConnectionAccepted,
ReasonCode = MqttConnectReasonCode.Success,
IsSessionPresent = false
}, Timeout.InfiniteTimeSpan,
default);
}, default);
break;
case MqttDisconnectPacket disconnectPacket:
break;
Expand All @@ -48,7 +47,7 @@ await adapter.SendPacketAsync(new MqttConnAckPacket
};
ack.ReasonCodes.Add(MqttSubscribeReasonCode.GrantedQoS0);

await adapter.SendPacketAsync(ack, Timeout.InfiniteTimeSpan, default);
await adapter.SendPacketAsync(ack, default);
break;
default:
break;
Expand Down
4 changes: 4 additions & 0 deletions samples/ServerApplication/MyCustomProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public MyCustomProtocol(ILogger<MyCustomProtocol> logger)

public override async Task OnConnectedAsync(ConnectionContext connection)
{
_logger.LogInformation("{ConnectionId} connected.", connection.ConnectionId);

// Use a length prefixed protocol
var protocol = new LengthPrefixedProtocol();
var reader = connection.CreateReader();
Expand All @@ -43,6 +45,8 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
reader.Advance();
}
}

_logger.LogInformation("{ConnectionId} disconnected.", connection.ConnectionId);
}
}
}
4 changes: 2 additions & 2 deletions samples/ServerApplication/ServerApplication.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Content Include="..\Certs\testcert.pfx" Link="testcert.pfx" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="MQTTnet.AspNetCore" Version="3.0.8" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.1.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageDescription>Experimental protocols and transports for Bedrock.Framework.</PackageDescription>
<Authors>David Fowler</Authors>
Expand All @@ -18,6 +18,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Connections.Client" Version="3.1.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Connections.Client" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="Nerdbank.GitVersioning" Version="3.4.244" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePositio
goto case State.Headers;

case State.Headers:
while (sequenceReader.TryReadTo(out var headerLine, NewLine))
while (sequenceReader.TryReadTo(out ReadOnlySequence<byte> headerLine, NewLine))
{
if (headerLine.Length == 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePositio
goto case State.Headers;

case State.Headers:
while (sequenceReader.TryReadTo(out var headerLine, NewLine))
while (sequenceReader.TryReadTo(out ReadOnlySequence<byte> headerLine, NewLine))
{
if (headerLine.Length == 0)
{
Expand Down
8 changes: 6 additions & 2 deletions src/Bedrock.Framework/Bedrock.Framework.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<PackageDescription>High performance, low level networking APIs for building custom severs and clients.</PackageDescription>
<Authors>David Fowler</Authors>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
Expand All @@ -12,6 +12,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.IO.Pipelines" Version="4.7.2" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="Nerdbank.GitVersioning" Version="3.4.244" />
</ItemGroup>
</Project>
21 changes: 21 additions & 0 deletions src/Bedrock.Framework/Protocols/ProtocolReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,27 @@ public void Advance()
_hasMessage = false;
}

public void CancelPendingRead()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().Name);
}

_reader.CancelPendingRead();
_isCanceled = true;
}

public async ValueTask CompleteAsync()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().Name);
}

await _reader.CompleteAsync();
}

public ValueTask DisposeAsync()
{
_disposed = true;
Expand Down
20 changes: 20 additions & 0 deletions src/Bedrock.Framework/Protocols/ProtocolWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ public async ValueTask WriteManyAsync<TWriteMessage>(IMessageWriter<TWriteMessag
}
}

public void CancelPendingFlush()
{
if (_disposed)
{
return;
}

_writer.CancelPendingFlush();
}

public async ValueTask CompleteAsync()
{
if (_disposed)
{
return;
}

await _writer.CompleteAsync();
}

public async ValueTask DisposeAsync()
{
await _semaphore.WaitAsync().ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.12.0" />
<PackageReference Include="BenchmarkDotNet" Version="0.13.1" />
</ItemGroup>

<ItemGroup>
Expand Down
16 changes: 11 additions & 5 deletions tests/Bedrock.Framework.Tests/Bedrock.Framework.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
<PackageReference Include="coverlet.collector" Version="1.0.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.0.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.1.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
Expand Down
Loading