Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup memcached protocol #102

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
using Bedrock.Framework.Experimental.Protocols.Memcached;
using Bedrock.Framework.Protocols;

using Microsoft.AspNetCore.Connections;

using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using static Bedrock.Framework.Experimental.Protocols.Memcached.Enums;

namespace Bedrock.Framework.Experimental.Protocols.Memcached
{
public class MemcachedProtocol
{
{
private readonly ConnectionContext _connection;
private readonly MemcachedMessageWriter _memcachedMessageWriter;
private readonly MemcachedMessageReader _memcachedMessageReader;
Expand All @@ -40,6 +43,8 @@ public MemcachedProtocol(ConnectionContext connection)

private async Task<byte[]> CommandWithResult(MemcachedRequest request)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ValueTask

{
await _semaphore.WaitAsync();

try
{
var result = await ExecuteCommand(request);
Expand All @@ -53,13 +58,15 @@ private async Task<byte[]> CommandWithResult(MemcachedRequest request)
}
}
finally
{
{
_semaphore.Release();
}
}

private async Task CommandWithNoResult(MemcachedRequest request)
{
await _semaphore.WaitAsync();

try
{
var result = await ExecuteCommand(request);
Expand All @@ -73,59 +80,44 @@ private async Task CommandWithNoResult(MemcachedRequest request)
_semaphore.Release();
}
}
public async Task<byte[]> Get(string key)

public Task<byte[]> Get(string key)
{
await _semaphore.WaitAsync();

var keyBytes = Encoding.UTF8.GetBytes(key);
var request = new MemcachedRequest(Enums.Opcode.Get, keyBytes, NextOpaque);
var request = new MemcachedRequest(Enums.Opcode.Get, key, NextOpaque);

return await CommandWithResult(request);
return CommandWithResult(request);
}

public async Task Delete(string key)
public Task Delete(string key)
{
await _semaphore.WaitAsync();

var keyBytes = Encoding.UTF8.GetBytes(key);
var request = new MemcachedRequest(Enums.Opcode.Delete, keyBytes, NextOpaque);
var request = new MemcachedRequest(Enums.Opcode.Delete, key, NextOpaque);

await CommandWithNoResult(request);
return CommandWithNoResult(request);
}

public async Task Set(string key, byte[] value, TimeSpan? expireIn)
public Task Set(string key, byte[] value, TimeSpan? expireIn)
{
await _semaphore.WaitAsync();
var request = new MemcachedRequest(Enums.Opcode.Set, key, NextOpaque, value, TypeCode.Object, expireIn);

var keyBytes = Encoding.UTF8.GetBytes(key);
var request = new MemcachedRequest(Enums.Opcode.Set, keyBytes, NextOpaque, value, TypeCode.Object, expireIn);

await CommandWithNoResult(request);
return CommandWithNoResult(request);
}

public async Task Add(string key, byte[] value, TimeSpan? expireIn)
public Task Add(string key, byte[] value, TimeSpan? expireIn)
{
await _semaphore.WaitAsync();
var request = new MemcachedRequest(Enums.Opcode.Add, key, NextOpaque, value, TypeCode.Object, expireIn);

var keyBytes = Encoding.UTF8.GetBytes(key);
var request = new MemcachedRequest(Enums.Opcode.Add, keyBytes, NextOpaque, value, TypeCode.Object, expireIn);

await CommandWithNoResult(request);
return CommandWithNoResult(request);
}

public async Task Replace(string key, byte[] value, TimeSpan? expireIn)
public Task Replace(string key, byte[] value, TimeSpan? expireIn)
{
await _semaphore.WaitAsync();
var request = new MemcachedRequest(Enums.Opcode.Replace, key, NextOpaque, value, TypeCode.Object, expireIn);

var keyBytes = Encoding.UTF8.GetBytes(key);
var request = new MemcachedRequest(Enums.Opcode.Replace, keyBytes, NextOpaque, value, TypeCode.Object, expireIn);

await CommandWithNoResult(request);
return CommandWithNoResult(request);
}

private async Task<MemcachedResponse> ExecuteCommand(MemcachedRequest request)
{
{
await _protocolWriter.WriteAsync(_memcachedMessageWriter, request);
var result = await _protocolReader.ReadAsync(_memcachedMessageReader);
_protocolReader.Advance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,38 @@ public class MemcachedRequest
public TypeCode Flags { get; }
public TimeSpan? ExpireIn { get; }

public MemcachedRequest(Opcode opcode, byte[] key, uint opaque,byte[] value, TypeCode flags, TimeSpan ? expireIn=null)
public MemcachedRequest(Opcode opcode, string key, uint opaque, byte[] value, TypeCode flags, TimeSpan? expireIn = null)
{
Opcode = opcode;
Key = key;
Opaque = opaque;
Value = value;
Flags = flags;
ExpireIn = expireIn;
Opcode = opcode;
Key = Encoding.UTF8.GetBytes(key);
Opaque = opaque;
Value = value;
Flags = flags;
ExpireIn = expireIn;
}

public MemcachedRequest(Opcode opcode, byte[] key, uint opaque, byte[] value, TypeCode flags, TimeSpan? expireIn = null)
{
Opcode = opcode;
Key = key;
Opaque = opaque;
Value = value;
Flags = flags;
ExpireIn = expireIn;
}

public MemcachedRequest(Opcode opcode, byte[] key, uint opaque)
{
Opcode = opcode;
Key = key;
Key = key;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Fix the spacing

Opaque = opaque;
}

public MemcachedRequest(Opcode opcode, string key, uint opaque)
{
Opcode = opcode;
Key = Encoding.UTF8.GetBytes(key);
Opaque = opaque;
}
}
}