Skip to content

Commit

Permalink
Merge pull request #799 from Cysharp/feature/StreamingHubPayloadPoolC…
Browse files Browse the repository at this point in the history
…heck

Check usage of StreamingHubPayload and PayloadPool in debug build
  • Loading branch information
mayuki authored Jul 1, 2024
2 parents 0c70646 + 77915a5 commit 28b11a7
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,83 @@

namespace MagicOnion.Internal
{
#if DEBUG
internal class StreamingHubPayload
{
byte[]? buffer;
ReadOnlyMemory<byte>? memory;
readonly short version;

#if STREAMINGHUBPAYLOAD_TRACK_LOCATION
string? payloadCreatedLocation;
string? payloadReturnLocation;
#endif

internal StreamingHubPayloadCore Core { get; }

#if DEBUG
public int Length
{
get
{
ThrowIfUninitialized();
return memory!.Value.Length;
ThrowIfVersionHasChanged();
return Core.Length;
}
}

public ReadOnlySpan<byte> Span
public ReadOnlyMemory<byte> Memory
{
get
{
ThrowIfUninitialized();
return memory!.Value.Span;
ThrowIfVersionHasChanged();
return Core.Memory;
}
}

public ReadOnlyMemory<byte> Memory
public ReadOnlySpan<byte> Span
{
get
{
ThrowIfUninitialized();
return memory!.Value;
ThrowIfVersionHasChanged();
return Core.Span;
}
}

public StreamingHubPayload(StreamingHubPayloadCore core)
{
this.Core = core;
this.version = core.Version;
#if STREAMINGHUBPAYLOAD_TRACK_LOCATION
this.payloadCreatedLocation = Environment.StackTrace;
#endif
}

void ThrowIfVersionHasChanged()
{
if (Core.Version != version) throw new InvalidOperationException("StreamingHubPayload version is mismatched.");
}

public void MarkAsReturned()
{
#if STREAMINGHUBPAYLOAD_TRACK_LOCATION
payloadReturnLocation = Environment.StackTrace;
#endif
}
}
#else
internal class StreamingHubPayload : StreamingHubPayloadCore
{}
#endif

internal class StreamingHubPayloadCore
{
byte[]? buffer;
ReadOnlyMemory<byte>? memory;

#if DEBUG
public short Version { get; private set; }
#endif

public int Length => memory!.Value.Length;
public ReadOnlySpan<byte> Span => memory!.Value.Span;
public ReadOnlyMemory<byte> Memory => memory!.Value;
#endif

public void Initialize(ReadOnlySpan<byte> data)
{
Expand Down Expand Up @@ -87,6 +126,10 @@ public void Uninitialize()

memory = null;
buffer = null;

#if DEBUG
Version++;
#endif
}

#if NON_UNITY && !NETSTANDARD2_0 && !NETSTANDARD2_1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ namespace MagicOnion.Internal
{
internal class ObjectPool<T> where T : class
{
readonly Func<T> factory;

T? item1;
T? item2;
T? item3;
T? item4;

protected T RentOrCreateCore(Func<T> factory)
public ObjectPool(Func<T> factory)
{
this.factory = factory;
}

public T RentOrCreateCore()
{
T? tmpItem;
if (!(TryGet(ref item1, out tmpItem) ||
Expand All @@ -29,7 +36,7 @@ protected T RentOrCreateCore(Func<T> factory)
return tmpItem;
}

protected void ReturnCore(T item)
public void ReturnCore(T item)
{
var pooled = TryReturn(ref item1, item) ||
TryReturn(ref item2, item) ||
Expand All @@ -55,35 +62,58 @@ bool TryGet(ref T? field, [NotNullWhen(true)] out T? item)
}
}

internal class StreamingHubPayloadPool : ObjectPool<StreamingHubPayload>
internal class StreamingHubPayloadPool
{
#if DEBUG
readonly ObjectPool<StreamingHubPayloadCore> pool = new(static () => new StreamingHubPayloadCore());
#else
readonly ObjectPool<StreamingHubPayloadCore> pool = new(static () => new StreamingHubPayload());
#endif

public static StreamingHubPayloadPool Shared { get; } = new();

public void Return(StreamingHubPayload payload)
{
#if DEBUG
payload.Core.Uninitialize();
pool.ReturnCore(payload.Core);
#else
payload.Uninitialize();
ReturnCore(payload);
pool.ReturnCore(payload);
#endif
}

public StreamingHubPayload RentOrCreate(ReadOnlySequence<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
var payload = pool.RentOrCreateCore();
payload.Initialize(data);
return payload;
#if DEBUG
return new StreamingHubPayload(payload);
#else
return (StreamingHubPayload)payload;
#endif
}

public StreamingHubPayload RentOrCreate(ReadOnlySpan<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
var payload = pool.RentOrCreateCore();
payload.Initialize(data);
return payload;
#if DEBUG
return new StreamingHubPayload(payload);
#else
return (StreamingHubPayload)payload;
#endif
}

public StreamingHubPayload RentOrCreate(ReadOnlyMemory<byte> data)
{
var payload = RentOrCreateCore(static () => new StreamingHubPayload());
var payload = pool.RentOrCreateCore();
payload.Initialize(data);
return payload;
#if DEBUG
return new StreamingHubPayload(payload);
#else
return (StreamingHubPayload)payload;
#endif
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,65 @@ internal class StreamingHubPayloadPool
{
const int MaximumRetained = 2 << 7;

readonly ObjectPool<StreamingHubPayload> pool = new DefaultObjectPool<StreamingHubPayload>(new Policy(), MaximumRetained);
readonly ObjectPool<StreamingHubPayloadCore> pool = new DefaultObjectPool<StreamingHubPayloadCore>(new Policy(), MaximumRetained);

public static StreamingHubPayloadPool Shared { get; } = new StreamingHubPayloadPool();
public static StreamingHubPayloadPool Shared { get; } = new();

public StreamingHubPayload RentOrCreate(ReadOnlySequence<byte> data)
{
var payload = pool.Get();
payload.Initialize(data);
return payload;
#if DEBUG
return new StreamingHubPayload(payload);
#else
return (StreamingHubPayload)payload;
#endif
}

public StreamingHubPayload RentOrCreate(ReadOnlySpan<byte> data)
{
var payload = pool.Get();
payload.Initialize(data);
return payload;
#if DEBUG
return new StreamingHubPayload(payload);
#else
return (StreamingHubPayload)payload;
#endif
}

public StreamingHubPayload RentOrCreate(ReadOnlyMemory<byte> data)
{
var payload = pool.Get();
payload.Initialize(data);
return payload;
#if DEBUG
return new StreamingHubPayload(payload);
#else
return (StreamingHubPayload)payload;
#endif
}

public void Return(StreamingHubPayload payload)
{
#if DEBUG
payload.MarkAsReturned();
pool.Return(payload.Core);
#else
pool.Return(payload);
#endif
}

class Policy : IPooledObjectPolicy<StreamingHubPayload>
class Policy : IPooledObjectPolicy<StreamingHubPayloadCore>
{
public StreamingHubPayload Create()
public StreamingHubPayloadCore Create()
{
#if DEBUG
return new StreamingHubPayloadCore();
#else
return new StreamingHubPayload();
#endif
}

public bool Return(StreamingHubPayload obj)
public bool Return(StreamingHubPayloadCore obj)
{
obj.Uninitialize();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ namespace MagicOnion.Client
internal class StreamingHubResponseTaskSourcePool<T> : ObjectPool<StreamingHubResponseTaskSource<T>>
{
public static StreamingHubResponseTaskSourcePool<T> Shared { get; } = new();

public StreamingHubResponseTaskSourcePool()
: base(static () => new StreamingHubResponseTaskSource<T>())
{}

public StreamingHubResponseTaskSource<T> RentOrCreate()
{
var item = RentOrCreateCore(static () => new StreamingHubResponseTaskSource<T>());
var item = RentOrCreateCore();
item.Reset();
return item;
}
Expand Down
7 changes: 6 additions & 1 deletion src/MagicOnion.Client/StreamingHubResponseTaskSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ namespace MagicOnion.Client
internal class StreamingHubResponseTaskSourcePool<T> : ObjectPool<StreamingHubResponseTaskSource<T>>
{
public static StreamingHubResponseTaskSourcePool<T> Shared { get; } = new();

public StreamingHubResponseTaskSourcePool()
: base(static () => new StreamingHubResponseTaskSource<T>())
{}

public StreamingHubResponseTaskSource<T> RentOrCreate()
{
var item = RentOrCreateCore(static () => new StreamingHubResponseTaskSource<T>());
var item = RentOrCreateCore();
item.Reset();
return item;
}
Expand Down
Loading

0 comments on commit 28b11a7

Please sign in to comment.