From e1ce308fbb3635cde5916c513611d52ff2e29c27 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 28 Jun 2024 15:58:55 +0900 Subject: [PATCH] Fix incorrect use of Payload within HeartbeatManager --- .../Internal.Shared/StreamingHubPayload.cs | 66 ++++++++++---- .../StreamingHubPayloadPool.BuiltIn.cs | 8 +- .../StreamingHubPayloadPool.ObjectPool.cs | 8 +- .../StreamingHubPayload.cs | 66 ++++++++++---- .../StreamingHubPayloadPool.BuiltIn.cs | 8 +- .../StreamingHubPayloadPool.ObjectPool.cs | 8 +- .../Hubs/StreamingHubHeartbeatManager.cs | 3 +- .../StreamingHubStressTest.cs | 90 +++++++++++++++++++ 8 files changed, 205 insertions(+), 52 deletions(-) create mode 100644 tests/MagicOnion.Integration.Tests/StreamingHubStressTest.cs diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs index 3bebc7d94..c4be586dd 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs @@ -1,20 +1,52 @@ #nullable enable using System; using System.Buffers; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Threading; namespace MagicOnion.Internal { - internal class StreamingHubPayload : IStreamingHubPayload + internal class StreamingHubPayload { byte[]? buffer; ReadOnlyMemory? memory; +#if DEBUG + public int Length + { + get + { + ThrowIfUninitialized(); + return memory!.Value.Length; + } + } + + public ReadOnlySpan Span + { + get + { + ThrowIfUninitialized(); + return memory!.Value.Span; + } + } + + public ReadOnlyMemory Memory + { + get + { + ThrowIfUninitialized(); + return memory!.Value; + } + } + +#else public int Length => memory!.Value.Length; public ReadOnlySpan Span => memory!.Value.Span; public ReadOnlyMemory Memory => memory!.Value; +#endif - void IStreamingHubPayload.Initialize(ReadOnlySpan data) + public void Initialize(ReadOnlySpan data) { ThrowIfUsing(); @@ -23,7 +55,7 @@ void IStreamingHubPayload.Initialize(ReadOnlySpan data) memory = buffer.AsMemory(0, (int)data.Length); } - void IStreamingHubPayload.Initialize(ReadOnlySequence data) + public void Initialize(ReadOnlySequence data) { ThrowIfUsing(); if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue"); @@ -33,7 +65,7 @@ void IStreamingHubPayload.Initialize(ReadOnlySequence data) memory = buffer.AsMemory(0, (int)data.Length); } - void IStreamingHubPayload.Initialize(ReadOnlyMemory data) + public void Initialize(ReadOnlyMemory data) { ThrowIfUsing(); @@ -41,9 +73,9 @@ void IStreamingHubPayload.Initialize(ReadOnlyMemory data) memory = data; } - void IStreamingHubPayload.Uninitialize() + public void Uninitialize() { - ThrowIfDisposed(); + ThrowIfUninitialized(); if (buffer != null) { @@ -60,22 +92,22 @@ void IStreamingHubPayload.Uninitialize() #if NON_UNITY && !NETSTANDARD2_0 && !NETSTANDARD2_1 [MemberNotNull(nameof(memory))] #endif - void ThrowIfDisposed() + void ThrowIfUninitialized() { - if (memory is null) throw new ObjectDisposedException(nameof(StreamingHubPayload)); + //Debug.Assert(memory is not null); + if (memory is null) + { + throw new InvalidOperationException("A StreamingHubPayload has been already uninitialized."); + } } void ThrowIfUsing() { - if (memory is not null) throw new InvalidOperationException(nameof(StreamingHubPayload)); + //Debug.Assert(memory is null); + if (memory is not null) + { + throw new InvalidOperationException("A StreamingHubPayload is currently used by other caller."); + } } } - - internal interface IStreamingHubPayload - { - void Initialize(ReadOnlySpan data); - void Initialize(ReadOnlySequence data); - void Initialize(ReadOnlyMemory data); - void Uninitialize(); - } } diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs index b352a9de1..d4e036091 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs @@ -61,28 +61,28 @@ internal class StreamingHubPayloadPool : ObjectPool public void Return(StreamingHubPayload payload) { - ((IStreamingHubPayload)payload).Uninitialize(); + payload.Uninitialize(); ReturnCore(payload); } public StreamingHubPayload RentOrCreate(ReadOnlySequence data) { var payload = RentOrCreateCore(static () => new StreamingHubPayload()); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } public StreamingHubPayload RentOrCreate(ReadOnlySpan data) { var payload = RentOrCreateCore(static () => new StreamingHubPayload()); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } public StreamingHubPayload RentOrCreate(ReadOnlyMemory data) { var payload = RentOrCreateCore(static () => new StreamingHubPayload()); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } } diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs index ae7abf115..9a357378b 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs @@ -15,21 +15,21 @@ internal class StreamingHubPayloadPool public StreamingHubPayload RentOrCreate(ReadOnlySequence data) { var payload = pool.Get(); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } public StreamingHubPayload RentOrCreate(ReadOnlySpan data) { var payload = pool.Get(); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } public StreamingHubPayload RentOrCreate(ReadOnlyMemory data) { var payload = pool.Get(); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } @@ -47,7 +47,7 @@ public StreamingHubPayload Create() public bool Return(StreamingHubPayload obj) { - ((IStreamingHubPayload)obj).Uninitialize(); + obj.Uninitialize(); return true; } } diff --git a/src/MagicOnion.Internal/StreamingHubPayload.cs b/src/MagicOnion.Internal/StreamingHubPayload.cs index 3bebc7d94..c4be586dd 100644 --- a/src/MagicOnion.Internal/StreamingHubPayload.cs +++ b/src/MagicOnion.Internal/StreamingHubPayload.cs @@ -1,20 +1,52 @@ #nullable enable using System; using System.Buffers; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Threading; namespace MagicOnion.Internal { - internal class StreamingHubPayload : IStreamingHubPayload + internal class StreamingHubPayload { byte[]? buffer; ReadOnlyMemory? memory; +#if DEBUG + public int Length + { + get + { + ThrowIfUninitialized(); + return memory!.Value.Length; + } + } + + public ReadOnlySpan Span + { + get + { + ThrowIfUninitialized(); + return memory!.Value.Span; + } + } + + public ReadOnlyMemory Memory + { + get + { + ThrowIfUninitialized(); + return memory!.Value; + } + } + +#else public int Length => memory!.Value.Length; public ReadOnlySpan Span => memory!.Value.Span; public ReadOnlyMemory Memory => memory!.Value; +#endif - void IStreamingHubPayload.Initialize(ReadOnlySpan data) + public void Initialize(ReadOnlySpan data) { ThrowIfUsing(); @@ -23,7 +55,7 @@ void IStreamingHubPayload.Initialize(ReadOnlySpan data) memory = buffer.AsMemory(0, (int)data.Length); } - void IStreamingHubPayload.Initialize(ReadOnlySequence data) + public void Initialize(ReadOnlySequence data) { ThrowIfUsing(); if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue"); @@ -33,7 +65,7 @@ void IStreamingHubPayload.Initialize(ReadOnlySequence data) memory = buffer.AsMemory(0, (int)data.Length); } - void IStreamingHubPayload.Initialize(ReadOnlyMemory data) + public void Initialize(ReadOnlyMemory data) { ThrowIfUsing(); @@ -41,9 +73,9 @@ void IStreamingHubPayload.Initialize(ReadOnlyMemory data) memory = data; } - void IStreamingHubPayload.Uninitialize() + public void Uninitialize() { - ThrowIfDisposed(); + ThrowIfUninitialized(); if (buffer != null) { @@ -60,22 +92,22 @@ void IStreamingHubPayload.Uninitialize() #if NON_UNITY && !NETSTANDARD2_0 && !NETSTANDARD2_1 [MemberNotNull(nameof(memory))] #endif - void ThrowIfDisposed() + void ThrowIfUninitialized() { - if (memory is null) throw new ObjectDisposedException(nameof(StreamingHubPayload)); + //Debug.Assert(memory is not null); + if (memory is null) + { + throw new InvalidOperationException("A StreamingHubPayload has been already uninitialized."); + } } void ThrowIfUsing() { - if (memory is not null) throw new InvalidOperationException(nameof(StreamingHubPayload)); + //Debug.Assert(memory is null); + if (memory is not null) + { + throw new InvalidOperationException("A StreamingHubPayload is currently used by other caller."); + } } } - - internal interface IStreamingHubPayload - { - void Initialize(ReadOnlySpan data); - void Initialize(ReadOnlySequence data); - void Initialize(ReadOnlyMemory data); - void Uninitialize(); - } } diff --git a/src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs b/src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs index b352a9de1..d4e036091 100644 --- a/src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs +++ b/src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs @@ -61,28 +61,28 @@ internal class StreamingHubPayloadPool : ObjectPool public void Return(StreamingHubPayload payload) { - ((IStreamingHubPayload)payload).Uninitialize(); + payload.Uninitialize(); ReturnCore(payload); } public StreamingHubPayload RentOrCreate(ReadOnlySequence data) { var payload = RentOrCreateCore(static () => new StreamingHubPayload()); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } public StreamingHubPayload RentOrCreate(ReadOnlySpan data) { var payload = RentOrCreateCore(static () => new StreamingHubPayload()); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } public StreamingHubPayload RentOrCreate(ReadOnlyMemory data) { var payload = RentOrCreateCore(static () => new StreamingHubPayload()); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } } diff --git a/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs b/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs index ae7abf115..9a357378b 100644 --- a/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs +++ b/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs @@ -15,21 +15,21 @@ internal class StreamingHubPayloadPool public StreamingHubPayload RentOrCreate(ReadOnlySequence data) { var payload = pool.Get(); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } public StreamingHubPayload RentOrCreate(ReadOnlySpan data) { var payload = pool.Get(); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } public StreamingHubPayload RentOrCreate(ReadOnlyMemory data) { var payload = pool.Get(); - ((IStreamingHubPayload)payload).Initialize(data); + payload.Initialize(data); return payload; } @@ -47,7 +47,7 @@ public StreamingHubPayload Create() public bool Return(StreamingHubPayload obj) { - ((IStreamingHubPayload)obj).Uninitialize(); + obj.Uninitialize(); return true; } } diff --git a/src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs b/src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs index 3fb2eaa0c..d42853f6d 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs @@ -125,7 +125,6 @@ async Task StartHeartbeatAsync(PeriodicTimer runningTimer, string method) writer.Write(Nil); } - var payload = StreamingHubPayloadPool.Shared.RentOrCreate(writer.WrittenSpan); MagicOnionServerLog.SendHeartbeat(this.logger, method); try @@ -133,7 +132,7 @@ async Task StartHeartbeatAsync(PeriodicTimer runningTimer, string method) foreach (var (contextId, handle) in contexts) { handle.RestartTimeoutTimer(); - handle.ServiceContext.QueueResponseStreamWrite(payload); + handle.ServiceContext.QueueResponseStreamWrite(StreamingHubPayloadPool.Shared.RentOrCreate(writer.WrittenSpan)); } } catch { /* Ignore */ } diff --git a/tests/MagicOnion.Integration.Tests/StreamingHubStressTest.cs b/tests/MagicOnion.Integration.Tests/StreamingHubStressTest.cs new file mode 100644 index 000000000..24f3abe63 --- /dev/null +++ b/tests/MagicOnion.Integration.Tests/StreamingHubStressTest.cs @@ -0,0 +1,90 @@ +using System.Reflection; +using Grpc.Net.Client; +using MagicOnion.Client; +using MagicOnion.Client.DynamicClient; +using MagicOnion.Server.Hubs; + +namespace MagicOnion.Integration.Tests; + +public class StreamingHubStressTest : IClassFixture> +{ + readonly MagicOnionApplicationFactory factory; + + public StreamingHubStressTest(MagicOnionApplicationFactory factory) + { + this.factory = factory; + } + + public static IEnumerable EnumerateStreamingHubClientFactory() + { + yield return new[] { new TestStreamingHubClientFactory("Dynamic", DynamicStreamingHubClientFactoryProvider.Instance) }; + yield return new[] { new TestStreamingHubClientFactory("Static", MagicOnionGeneratedClientInitializer.StreamingHubClientFactoryProvider) }; + } + + [Theory] + [MemberData(nameof(EnumerateStreamingHubClientFactory))] + public async Task Short(TestStreamingHubClientFactory clientFactory) + { + // Arrange + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var cts = new CancellationTokenSource(); + var count = 0; + + var tasks = Enumerable.Range(0, Environment.ProcessorCount * 10).Select(async x => + { + var httpClient = factory.CreateDefaultClient(); + var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions() { HttpClient = httpClient }); + var receiver = Substitute.For(); + var client = await clientFactory.CreateAndConnectAsync(channel, receiver); + + await tcs.Task; + + while (!cts.IsCancellationRequested) + { + var response = await client.Method((x * 100) + count, $"Task{x}-{count}", x % 2 == 0).AsTask().WaitAsync(cts.Token); + Interlocked.Increment(ref count); + } + + await Task.Delay(500); + }); + + // Act + tcs.TrySetResult(); + cts.CancelAfter(TimeSpan.FromSeconds(10)); + try + { + await Task.WhenAll(tasks); + } + catch (OperationCanceledException e) when (e.CancellationToken == cts.Token) + { + } + } +} + + +public interface IStreamingHubStressTestHub : IStreamingHub +{ + ValueTask<(int, string, bool)> Method(int arg0, string arg1, bool arg2); +} + +public interface IStreamingHubStressTestHubReceiver +{ + void OnMessage(); +} + +[Heartbeat(Enable = true, Interval = 100, Timeout = 1000)] +public class StreamingHubStressTestHub : StreamingHubBase, IStreamingHubStressTestHub +{ + IGroup defaultGroup = default!; + + protected override async ValueTask OnConnected() + { + defaultGroup = await Group.AddAsync("_"); + } + + public ValueTask<(int, string, bool)> Method(int arg0, string arg1, bool arg2) + { + defaultGroup.All.OnMessage(); + return new ValueTask<(int, string, bool)>((arg0, arg1, arg2)); + } +}