Skip to content

Commit

Permalink
Merge pull request #797 from Cysharp/feature/FixIncorrectUseOfPayload…
Browse files Browse the repository at this point in the history
…InHeartbeatManager

Fix incorrect use of Payload within HeartbeatManager
  • Loading branch information
mayuki authored Jun 28, 2024
2 parents c9d52f5 + e1ce308 commit 133d110
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
#nullable enable
using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;

namespace MagicOnion.Internal
{
internal class StreamingHubPayload : IStreamingHubPayload
internal class StreamingHubPayload
{
byte[]? buffer;
ReadOnlyMemory<byte>? memory;

#if DEBUG
public int Length
{
get
{
ThrowIfUninitialized();
return memory!.Value.Length;
}
}

public ReadOnlySpan<byte> Span
{
get
{
ThrowIfUninitialized();
return memory!.Value.Span;
}
}

public ReadOnlyMemory<byte> Memory
{
get
{
ThrowIfUninitialized();
return memory!.Value;
}
}

#else
public int Length => memory!.Value.Length;
public ReadOnlySpan<byte> Span => memory!.Value.Span;
public ReadOnlyMemory<byte> Memory => memory!.Value;
#endif

void IStreamingHubPayload.Initialize(ReadOnlySpan<byte> data)
public void Initialize(ReadOnlySpan<byte> data)
{
ThrowIfUsing();

Expand All @@ -23,7 +55,7 @@ void IStreamingHubPayload.Initialize(ReadOnlySpan<byte> data)
memory = buffer.AsMemory(0, (int)data.Length);
}

void IStreamingHubPayload.Initialize(ReadOnlySequence<byte> data)
public void Initialize(ReadOnlySequence<byte> data)
{
ThrowIfUsing();
if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue");
Expand All @@ -33,17 +65,17 @@ void IStreamingHubPayload.Initialize(ReadOnlySequence<byte> data)
memory = buffer.AsMemory(0, (int)data.Length);
}

void IStreamingHubPayload.Initialize(ReadOnlyMemory<byte> data)
public void Initialize(ReadOnlyMemory<byte> data)
{
ThrowIfUsing();

buffer = null;
memory = data;
}

void IStreamingHubPayload.Uninitialize()
public void Uninitialize()
{
ThrowIfDisposed();
ThrowIfUninitialized();

if (buffer != null)
{
Expand All @@ -60,22 +92,22 @@ void IStreamingHubPayload.Uninitialize()
#if NON_UNITY && !NETSTANDARD2_0 && !NETSTANDARD2_1
[MemberNotNull(nameof(memory))]
#endif
void ThrowIfDisposed()
void ThrowIfUninitialized()
{
if (memory is null) throw new ObjectDisposedException(nameof(StreamingHubPayload));
//Debug.Assert(memory is not null);
if (memory is null)
{
throw new InvalidOperationException("A StreamingHubPayload has been already uninitialized.");
}
}

void ThrowIfUsing()
{
if (memory is not null) throw new InvalidOperationException(nameof(StreamingHubPayload));
//Debug.Assert(memory is null);
if (memory is not null)
{
throw new InvalidOperationException("A StreamingHubPayload is currently used by other caller.");
}
}
}

internal interface IStreamingHubPayload
{
void Initialize(ReadOnlySpan<byte> data);
void Initialize(ReadOnlySequence<byte> data);
void Initialize(ReadOnlyMemory<byte> data);
void Uninitialize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,28 @@ internal class StreamingHubPayloadPool : ObjectPool<StreamingHubPayload>

public void Return(StreamingHubPayload payload)
{
((IStreamingHubPayload)payload).Uninitialize();
payload.Uninitialize();
ReturnCore(payload);
}

public StreamingHubPayload RentOrCreate(ReadOnlySequence<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

public StreamingHubPayload RentOrCreate(ReadOnlySpan<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

public StreamingHubPayload RentOrCreate(ReadOnlyMemory<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ internal class StreamingHubPayloadPool
public StreamingHubPayload RentOrCreate(ReadOnlySequence<byte> data)
{
var payload = pool.Get();
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

public StreamingHubPayload RentOrCreate(ReadOnlySpan<byte> data)
{
var payload = pool.Get();
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

public StreamingHubPayload RentOrCreate(ReadOnlyMemory<byte> data)
{
var payload = pool.Get();
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

Expand All @@ -47,7 +47,7 @@ public StreamingHubPayload Create()

public bool Return(StreamingHubPayload obj)
{
((IStreamingHubPayload)obj).Uninitialize();
obj.Uninitialize();
return true;
}
}
Expand Down
66 changes: 49 additions & 17 deletions src/MagicOnion.Internal/StreamingHubPayload.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
#nullable enable
using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;

namespace MagicOnion.Internal
{
internal class StreamingHubPayload : IStreamingHubPayload
internal class StreamingHubPayload
{
byte[]? buffer;
ReadOnlyMemory<byte>? memory;

#if DEBUG
public int Length
{
get
{
ThrowIfUninitialized();
return memory!.Value.Length;
}
}

public ReadOnlySpan<byte> Span
{
get
{
ThrowIfUninitialized();
return memory!.Value.Span;
}
}

public ReadOnlyMemory<byte> Memory
{
get
{
ThrowIfUninitialized();
return memory!.Value;
}
}

#else
public int Length => memory!.Value.Length;
public ReadOnlySpan<byte> Span => memory!.Value.Span;
public ReadOnlyMemory<byte> Memory => memory!.Value;
#endif

void IStreamingHubPayload.Initialize(ReadOnlySpan<byte> data)
public void Initialize(ReadOnlySpan<byte> data)
{
ThrowIfUsing();

Expand All @@ -23,7 +55,7 @@ void IStreamingHubPayload.Initialize(ReadOnlySpan<byte> data)
memory = buffer.AsMemory(0, (int)data.Length);
}

void IStreamingHubPayload.Initialize(ReadOnlySequence<byte> data)
public void Initialize(ReadOnlySequence<byte> data)
{
ThrowIfUsing();
if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue");
Expand All @@ -33,17 +65,17 @@ void IStreamingHubPayload.Initialize(ReadOnlySequence<byte> data)
memory = buffer.AsMemory(0, (int)data.Length);
}

void IStreamingHubPayload.Initialize(ReadOnlyMemory<byte> data)
public void Initialize(ReadOnlyMemory<byte> data)
{
ThrowIfUsing();

buffer = null;
memory = data;
}

void IStreamingHubPayload.Uninitialize()
public void Uninitialize()
{
ThrowIfDisposed();
ThrowIfUninitialized();

if (buffer != null)
{
Expand All @@ -60,22 +92,22 @@ void IStreamingHubPayload.Uninitialize()
#if NON_UNITY && !NETSTANDARD2_0 && !NETSTANDARD2_1
[MemberNotNull(nameof(memory))]
#endif
void ThrowIfDisposed()
void ThrowIfUninitialized()
{
if (memory is null) throw new ObjectDisposedException(nameof(StreamingHubPayload));
//Debug.Assert(memory is not null);
if (memory is null)
{
throw new InvalidOperationException("A StreamingHubPayload has been already uninitialized.");
}
}

void ThrowIfUsing()
{
if (memory is not null) throw new InvalidOperationException(nameof(StreamingHubPayload));
//Debug.Assert(memory is null);
if (memory is not null)
{
throw new InvalidOperationException("A StreamingHubPayload is currently used by other caller.");
}
}
}

internal interface IStreamingHubPayload
{
void Initialize(ReadOnlySpan<byte> data);
void Initialize(ReadOnlySequence<byte> data);
void Initialize(ReadOnlyMemory<byte> data);
void Uninitialize();
}
}
8 changes: 4 additions & 4 deletions src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,28 @@ internal class StreamingHubPayloadPool : ObjectPool<StreamingHubPayload>

public void Return(StreamingHubPayload payload)
{
((IStreamingHubPayload)payload).Uninitialize();
payload.Uninitialize();
ReturnCore(payload);
}

public StreamingHubPayload RentOrCreate(ReadOnlySequence<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

public StreamingHubPayload RentOrCreate(ReadOnlySpan<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

public StreamingHubPayload RentOrCreate(ReadOnlyMemory<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ internal class StreamingHubPayloadPool
public StreamingHubPayload RentOrCreate(ReadOnlySequence<byte> data)
{
var payload = pool.Get();
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

public StreamingHubPayload RentOrCreate(ReadOnlySpan<byte> data)
{
var payload = pool.Get();
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

public StreamingHubPayload RentOrCreate(ReadOnlyMemory<byte> data)
{
var payload = pool.Get();
((IStreamingHubPayload)payload).Initialize(data);
payload.Initialize(data);
return payload;
}

Expand All @@ -47,7 +47,7 @@ public StreamingHubPayload Create()

public bool Return(StreamingHubPayload obj)
{
((IStreamingHubPayload)obj).Uninitialize();
obj.Uninitialize();
return true;
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,14 @@ async Task StartHeartbeatAsync(PeriodicTimer runningTimer, string method)
writer.Write(Nil);
}

var payload = StreamingHubPayloadPool.Shared.RentOrCreate(writer.WrittenSpan);

MagicOnionServerLog.SendHeartbeat(this.logger, method);
try
{
foreach (var (contextId, handle) in contexts)
{
handle.RestartTimeoutTimer();
handle.ServiceContext.QueueResponseStreamWrite(payload);
handle.ServiceContext.QueueResponseStreamWrite(StreamingHubPayloadPool.Shared.RentOrCreate(writer.WrittenSpan));
}
}
catch { /* Ignore */ }
Expand Down
Loading

0 comments on commit 133d110

Please sign in to comment.