Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working on message bus subscriptions options #278

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
Interlocked.Increment(ref messageCount);
cancellationTokenSource.Cancel();
countdown.Signal();
}, cancellationTokenSource.Token);
}, cancellationToken: cancellationTokenSource.Token);

await messageBus.SubscribeAsync<object>(msg => countdown.Signal());

Expand Down
7 changes: 5 additions & 2 deletions src/Foundatio.TestHarness/Queue/QueueTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ public virtual async Task WillWaitForItemAsync() {
return;

try {
Log.MinimumLevel = LogLevel.Trace;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably should revert this and leave this up to the caller.

await queue.DeleteQueueAsync();
await AssertEmptyQueueAsync(queue);

Expand All @@ -564,14 +565,16 @@ public virtual async Task WillWaitForItemAsync() {
Assert.InRange(sw.Elapsed, TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(5000));

_ = Task.Run(async () => {
await SystemClock.SleepAsync(500);
await Task.Delay(500);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemClock.SleepAsync(500) should be doing this behind the scenes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I know that, but in this case we never want the sleep to be virtual.

_logger.LogInformation("Enqueuing async message");
await queue.EnqueueAsync(new SimpleWorkItem {
Data = "Hello"
});
});

sw.Restart();
workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(1));
_logger.LogInformation("Dequeuing message with timeout");
workItem = await queue.DequeueAsync(TimeSpan.FromSeconds(10));
sw.Stop();
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Time {Elapsed:g}", sw.Elapsed);
Assert.True(sw.Elapsed > TimeSpan.FromMilliseconds(400));
Expand Down
1 change: 1 addition & 0 deletions src/Foundatio/Foundatio.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0" />
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
</ItemGroup>
</Project>
2 changes: 2 additions & 0 deletions src/Foundatio/Messaging/IMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public interface IMessageBus : IMessagePublisher, IMessageSubscriber, IDisposabl
public class MessageOptions {
public string UniqueId { get; set; }
public string CorrelationId { get; set; }
public string Topic { get; set; }
public string MessageType { get; set; }
public TimeSpan? DeliveryDelay { get; set; }
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();
}
Expand Down
53 changes: 42 additions & 11 deletions src/Foundatio/Messaging/IMessageSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,68 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Foundatio.Messaging {
public interface IMessageSubscriber {
Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken = default) where T : class;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should create extension method overloads for this that just set a token on options. Will users know there is an implicit conversion happening here for cancellation token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I am worried about us having overload conflicts since there are already a bunch of overloads for this method. I pretty much did this just for compatibility.

Task<IMessageSubscription> SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class;
}

public static class MessageBusExtensions {
public static Task SubscribeAsync<T>(this IMessageSubscriber subscriber, Func<T, Task> handler, CancellationToken cancellationToken = default) where T : class {
return subscriber.SubscribeAsync<T>((msg, token) => handler(msg), cancellationToken);
public static Task<IMessageSubscription> SubscribeAsync<T>(this IMessageSubscriber subscriber, Func<T, Task> handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class {
return subscriber.SubscribeAsync<T>((msg, token) => handler(msg), options, cancellationToken);
}

public static Task SubscribeAsync<T>(this IMessageSubscriber subscriber, Action<T> handler, CancellationToken cancellationToken = default) where T : class {
public static Task<IMessageSubscription> SubscribeAsync<T>(this IMessageSubscriber subscriber, Action<T> handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class {
return subscriber.SubscribeAsync<T>((msg, token) => {
handler(msg);
return Task.CompletedTask;
}, cancellationToken);
}, options, cancellationToken);
}

public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, CancellationToken, Task> handler, CancellationToken cancellationToken = default) {
return subscriber.SubscribeAsync<IMessage>((msg, token) => handler(msg, token), cancellationToken);
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, CancellationToken, Task> handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) {
return subscriber.SubscribeAsync<IMessage>((msg, token) => handler(msg, token), options, cancellationToken);
}

public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, Task> handler, CancellationToken cancellationToken = default) {
return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken);
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Func<IMessage, Task> handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) {
return subscriber.SubscribeAsync((msg, token) => handler(msg), options, cancellationToken);
}

public static Task SubscribeAsync(this IMessageSubscriber subscriber, Action<IMessage> handler, CancellationToken cancellationToken = default) {
public static Task<IMessageSubscription> SubscribeAsync(this IMessageSubscriber subscriber, Action<IMessage> handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) {
return subscriber.SubscribeAsync((msg, token) => {
handler(msg);
return Task.CompletedTask;
}, cancellationToken);
}, options, cancellationToken);
}
}

public interface IMessageSubscription : IAsyncDisposable {
string SubscriptionId { get; }
IDictionary<string, string> Properties { get; }
}

public class MessageSubscription : IMessageSubscription {
private readonly Func<ValueTask> _disposeSubscriptionFunc;

public MessageSubscription(string subscriptionId, Func<ValueTask> disposeSubscriptionFunc) {
SubscriptionId = subscriptionId;
_disposeSubscriptionFunc = disposeSubscriptionFunc;
}

public string SubscriptionId { get; }
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();

public ValueTask DisposeAsync() {
return _disposeSubscriptionFunc?.Invoke() ?? new ValueTask();
}
}

public class MessageSubscriptionOptions {
/// <summary>
/// The topic name
/// </summary>
public string Topic { get; set; }

public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();
}
}
39 changes: 31 additions & 8 deletions src/Foundatio/Messaging/Message.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Foundatio.Serializer;
using System.Threading.Tasks;

namespace Foundatio.Messaging {
public interface IMessage {
public interface IMessage : IAsyncDisposable {
string UniqueId { get; }
string CorrelationId { get; }
string Topic { get; }
string Type { get; }
Type ClrType { get; }
byte[] Data { get; }
object GetBody();
IDictionary<string, string> Properties { get; }

Task RenewLockAsync();
Task AbandonAsync();
Task CompleteAsync();
}

public interface IMessage<T> : IMessage where T: class {
Expand All @@ -29,11 +34,29 @@ public Message(byte[] data, Func<IMessage, object> getBody) {

public string UniqueId { get; set; }
public string CorrelationId { get; set; }
public string Topic { get; set; }
public string Type { get; set; }
public Type ClrType { get; set; }
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();
public byte[] Data { get; set; }

public object GetBody() => _getBody(this);

public virtual Task AbandonAsync() {
return Task.CompletedTask;
}

public virtual Task CompleteAsync() {
return Task.CompletedTask;
}

public virtual Task RenewLockAsync() {
return Task.CompletedTask;
}

public virtual ValueTask DisposeAsync() {
return new ValueTask(AbandonAsync());
}
}

public class Message<T> : IMessage<T> where T : class {
Expand All @@ -44,19 +67,19 @@ public Message(IMessage message) {
}

public byte[] Data => _message.Data;

public T Body => (T)GetBody();

public string UniqueId => _message.UniqueId;

public string CorrelationId => _message.CorrelationId;

public string Topic => _message.Topic;
public string Type => _message.Type;

public Type ClrType => _message.ClrType;

public IDictionary<string, string> Properties => _message.Properties;

public object GetBody() => _message.GetBody();

public Task AbandonAsync() => _message.AbandonAsync();
public Task CompleteAsync() => _message.CompleteAsync();
public Task RenewLockAsync() => _message.RenewLockAsync();
public ValueTask DisposeAsync() => _message.DisposeAsync();
}
}
40 changes: 24 additions & 16 deletions src/Foundatio/Messaging/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using Foundatio.Serializer;
using Foundatio.Utility;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Foundatio.Messaging {
public abstract class MessageBusBase<TOptions> : IMessageBus, IDisposable where TOptions : SharedMessageBusOptions {
protected readonly ConcurrentDictionary<string, Channel<Message>> _topics = new();
private readonly CancellationTokenSource _messageBusDisposedCancellationTokenSource;
protected readonly ConcurrentDictionary<string, Subscriber> _subscribers = new();
protected readonly TOptions _options;
Expand All @@ -25,7 +27,7 @@ public MessageBusBase(TOptions options) {
var loggerFactory = options?.LoggerFactory ?? NullLoggerFactory.Instance;
_logger = loggerFactory.CreateLogger(GetType());
_serializer = options.Serializer ?? DefaultSerializer.Instance;
MessageBusId = _options.Topic + Guid.NewGuid().ToString("N").Substring(10);
MessageBusId = _options.DefaultTopic + Guid.NewGuid().ToString("N").Substring(10);
_messageBusDisposedCancellationTokenSource = new CancellationTokenSource();
}

Expand Down Expand Up @@ -59,11 +61,11 @@ protected string GetMappedMessageType(Type messageType) {
}

private readonly ConcurrentDictionary<string, Type> _knownMessageTypesCache = new();
protected virtual Type GetMappedMessageType(string messageType) {
if (String.IsNullOrEmpty(messageType))
protected virtual Type GetMappedMessageType(IConsumeMessageContext context) {
if (context == null || String.IsNullOrEmpty(context.MessageType))
return null;

return _knownMessageTypesCache.GetOrAdd(messageType, type => {
return _knownMessageTypesCache.GetOrAdd(context.MessageType, type => {
if (_options.MessageTypeMappings != null && _options.MessageTypeMappings.ContainsKey(type))
return _options.MessageTypeMappings[type];

Expand All @@ -86,11 +88,14 @@ protected virtual Type GetMappedMessageType(string messageType) {
}
});
}
protected Type GetMappedMessageType(string messageType) {
return GetMappedMessageType(new ConsumeMessageContext { MessageType = messageType });
}

protected virtual Task RemoveTopicSubscriptionAsync() => Task.CompletedTask;
protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask;

protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken) where T : class {
protected virtual Task<IMessageSubscription> SubscribeImplAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options, CancellationToken cancellationToken = default) where T : class {
var subscriber = new Subscriber {
CancellationToken = cancellationToken,
Type = typeof(T),
Expand All @@ -105,14 +110,6 @@ protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> ha
}
};

if (cancellationToken != CancellationToken.None) {
cancellationToken.Register(() => {
_subscribers.TryRemove(subscriber.Id, out _);
if (_subscribers.Count == 0)
RemoveTopicSubscriptionAsync().GetAwaiter().GetResult();
});
}

if (subscriber.Type.Name == "IMessage`1" && subscriber.Type.GenericTypeArguments.Length == 1) {
var modelType = subscriber.Type.GenericTypeArguments.Single();
subscriber.GenericType = typeof(Message<>).MakeGenericType(modelType);
Expand All @@ -121,15 +118,26 @@ protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> ha
if (!_subscribers.TryAdd(subscriber.Id, subscriber) && _logger.IsEnabled(LogLevel.Error))
_logger.LogError("Unable to add subscriber {SubscriberId}", subscriber.Id);

return Task.CompletedTask;
return Task.FromResult<IMessageSubscription>(new MessageSubscription(Guid.NewGuid().ToString("N"), async () => {
_subscribers.TryRemove(subscriber.Id, out _);
if (_subscribers.Count == 0)
await RemoveTopicSubscriptionAsync().AnyContext();
}));
}

public async Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken = default) where T : class {
public async Task<IMessageSubscription> SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class {
if (_logger.IsEnabled(LogLevel.Trace))
_logger.LogTrace("Adding subscriber for {MessageType}.", typeof(T).FullName);

await SubscribeImplAsync(handler, cancellationToken).AnyContext();
options ??= new MessageSubscriptionOptions();

var sub = await SubscribeImplAsync(handler, options).AnyContext();
await EnsureTopicSubscriptionAsync(cancellationToken).AnyContext();

if (cancellationToken != CancellationToken.None)
cancellationToken.Register(() => sub.DisposeAsync());

return sub;
}

protected List<Subscriber> GetMessageSubscribers(IMessage message) {
Expand Down
4 changes: 2 additions & 2 deletions src/Foundatio/Messaging/NullMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public Task PublishAsync(Type messageType, object message, MessageOptions option
return Task.CompletedTask;
}

public Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken = default) where T : class {
return Task.CompletedTask;
public Task<IMessageSubscription> SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, MessageSubscriptionOptions options = null, CancellationToken cancellationToken = default) where T : class {
return Task.FromResult<IMessageSubscription>(new MessageSubscription(Guid.NewGuid().ToString(), () => new ValueTask()));
}

public void Dispose() {}
Expand Down
42 changes: 37 additions & 5 deletions src/Foundatio/Messaging/SharedMessageBusOptions.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,58 @@
using System;
using System;
using System.Collections.Generic;

namespace Foundatio.Messaging {
public class SharedMessageBusOptions : SharedOptions {
/// <summary>
/// The topic name
/// The default topic name
/// </summary>
public string Topic { get; set; } = "messages";
public string DefaultTopic { get; set; } = "messages";

/// <summary>
/// Controls which types messages are mapped to.
/// Resolves message types
/// </summary>
public IMessageRouter Router { get; set; }

/// <summary>
/// Statically configured message type mappings. <see cref="Router"/> will be run first and then this dictionary will be checked.
/// </summary>
public Dictionary<string, Type> MessageTypeMappings { get; set; } = new Dictionary<string, Type>();
}

public interface IMessageRouter {
// get topic from bus options, message and message options
// get message type from message and options
// get .net type from topic, message type and properties (headers)
IConsumeMessageContext ToMessageType(Type messageType);
Type ToClrType(IConsumeMessageContext context);
}

public interface IConsumeMessageContext {
string Topic { get; set; }
string MessageType { get; set; }
IDictionary<string, string> Properties { get; }
Comment on lines +32 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work for kafka? I'm dealing with public Func<ConsumeResult<string, byte[]>, string> ResolveMessageType { get; set; } so I can look at specific headers and not have to deserialize every single header to process the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it can't be kafka specific. It has access to properties which should include the headers from the kafka message.

}

public class ConsumeMessageContext : IConsumeMessageContext {
public string Topic { get; set; }
public string MessageType { get; set; }
public IDictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();
}

public class SharedMessageBusOptionsBuilder<TOptions, TBuilder> : SharedOptionsBuilder<TOptions, TBuilder>
where TOptions : SharedMessageBusOptions, new()
where TBuilder : SharedMessageBusOptionsBuilder<TOptions, TBuilder> {
public TBuilder Topic(string topic) {
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException(nameof(topic));
Target.Topic = topic;
Target.DefaultTopic = topic;
return (TBuilder)this;
}

public TBuilder MessageTypeResolver(IMessageRouter resolver) {
if (resolver == null)
throw new ArgumentNullException(nameof(resolver));
Target.Router = resolver;
return (TBuilder)this;
}

Expand Down
Loading