diff --git a/src/EventBus.RabbitMQ.csproj b/src/EventBus.RabbitMQ.csproj index 05a2839..ca8a17d 100644 --- a/src/EventBus.RabbitMQ.csproj +++ b/src/EventBus.RabbitMQ.csproj @@ -18,7 +18,7 @@ - + diff --git a/src/Extensions/RabbitMqExtensions.cs b/src/Extensions/RabbitMqExtensions.cs index 91a0e31..9a510ac 100644 --- a/src/Extensions/RabbitMqExtensions.cs +++ b/src/Extensions/RabbitMqExtensions.cs @@ -10,6 +10,7 @@ using EventBus.RabbitMQ.Subscribers.Options; using EventStorage.Configurations; using EventStorage.Extensions; +using EventStorage.Inbox.EventArgs; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -28,19 +29,33 @@ public static class RabbitMqExtensions /// Options to register subscriber with the settings. It will overwrite existing subscriber setting if exists /// Options to overwrite default settings of Inbox and Outbox. /// Assemblies to find and load publisher and subscribers + /// The event for subscribing to the executing subscribed event of MessageBroker + /// The event for subscribing to the executing received event of Inbox public static void AddRabbitMqEventBus(this IServiceCollection services, IConfiguration configuration, Assembly[] assemblies, Action defaultOptions = null, Action> virtualHostSettingsOptions = null, Action eventPublisherManagerOptions = null, Action eventSubscriberManagerOptions = null, - Action eventStoreOptions = null) + Action eventStoreOptions = null, + EventHandler executingSubscribedEvent = null, + EventHandler executingReceivedEvent = null) { - services.AddEventStore(configuration, assemblies: assemblies, options: eventStoreOptions); + var eventsToSubscribe = new List>(); + if (executingReceivedEvent is not null) + eventsToSubscribe.Add(executingReceivedEvent); + + if (executingSubscribedEvent is not null) + { + EventSubscriberManager.ExecutingSubscribedEvent += executingSubscribedEvent; + eventsToSubscribe.Add(EventSubscriberManager.HandleExecutingReceivedEvent); + } + services.AddEventStore(configuration, assemblies: assemblies, options: eventStoreOptions, executingReceivedEvents: eventsToSubscribe.ToArray()); var settings = configuration.GetSection(nameof(RabbitMqSettings)).Get() ?? new RabbitMqSettings(); LoadDefaultRabbitMqOptions(settings, defaultOptions); + if (!settings.DefaultSettings.IsEnabled) return; services.AddSingleton(settings.DefaultSettings); @@ -144,14 +159,15 @@ private static void RegisterAllPublishers(EventPublisherManager publisherManager /// /// Main settings from configuration /// Settings option to overwrite the default settings - private static void LoadDefaultRabbitMqOptions(RabbitMqSettings settingsFromConfig, Action defaultOptions) + private static void LoadDefaultRabbitMqOptions(RabbitMqSettings settingsFromConfig, + Action defaultOptions) { var defaultSettings = RabbitMqOptionsConstant.CreateDefaultRabbitMqOptions(); if (settingsFromConfig.DefaultSettings is null) settingsFromConfig.DefaultSettings = defaultSettings; else settingsFromConfig.DefaultSettings.CopyNotAssignedSettingsFrom(defaultSettings); - + defaultOptions?.Invoke(settingsFromConfig.DefaultSettings); } @@ -173,13 +189,14 @@ private static void RegisterAllSubscribers(EventSubscriberManager subscriberMana } } - private static void RegisterAllSubscriberReceiversToDependencyInjection(IServiceCollection services, Assembly[] assemblies) + private static void RegisterAllSubscriberReceiversToDependencyInjection(IServiceCollection services, + Assembly[] assemblies) { var subscriberReceiverTypes = GetSubscriberReceiverTypes(assemblies); foreach (var (_, handlerType) in subscriberReceiverTypes) services.AddTransient(handlerType); } - + static readonly Type SubscriberReceiverType = typeof(IEventSubscriber<>); internal static List<(Type eventType, Type handlerType)> GetSubscriberReceiverTypes(Assembly[] assemblies) diff --git a/src/Subscribers/Consumers/EventConsumerService.cs b/src/Subscribers/Consumers/EventConsumerService.cs index a6827e1..7c00ba2 100644 --- a/src/Subscribers/Consumers/EventConsumerService.cs +++ b/src/Subscribers/Consumers/EventConsumerService.cs @@ -3,6 +3,7 @@ using System.Text.Json; using EventBus.RabbitMQ.Connections; using EventBus.RabbitMQ.Instrumentation.Trace; +using EventBus.RabbitMQ.Subscribers.Managers; using EventBus.RabbitMQ.Subscribers.Models; using EventBus.RabbitMQ.Subscribers.Options; using EventStorage.Exceptions; @@ -138,7 +139,8 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA _logger.LogTrace("Received RabbitMQ event, Type is {EventType} and EventId is {EventId}", eventType, eventArgs.BasicProperties.MessageId); var eventId = Guid.TryParse(eventArgs.BasicProperties.MessageId, out Guid messageId) - ? messageId : Guid.NewGuid(); + ? messageId + : Guid.NewGuid(); using var scope = _serviceProvider.CreateScope(); if (_useInbox) @@ -150,16 +152,21 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA $"The RabbitMQ is configured to use the Inbox for received events, but the Inbox functionality of the EventStorage is not enabled. So, the {info.eventHandlerType.Name} event subscriber of an event will be executed immediately for the event id: {eventId};"); _ = eventReceiverManager.Received(eventId, info.eventType.Name, eventArgs.RoutingKey, - EventProviderType.MessageBroker, payload: eventPayload, headers: headersAsJson, namingPolicyType: info.eventSettings.PropertyNamingPolicy ?? NamingPolicyType.PascalCase); + EventProviderType.MessageBroker, payload: eventPayload, headers: headersAsJson, + namingPolicyType: info.eventSettings.PropertyNamingPolicy ?? NamingPolicyType.PascalCase); } else { var jsonSerializerSetting = info.eventSettings.GetJsonSerializer(); var receivedEvent = - JsonSerializer.Deserialize(eventPayload, info.eventType, jsonSerializerSetting) as ISubscribeEvent; + JsonSerializer.Deserialize(eventPayload, info.eventType, jsonSerializerSetting) as + ISubscribeEvent; receivedEvent!.EventId = eventId; receivedEvent!.Headers = headers; + + EventSubscriberManager.OnExecutingSubscribedEvent(receivedEvent, _connectionOptions.VirtualHostSettings.VirtualHost); + var eventHandlerSubscriber = scope.ServiceProvider.GetRequiredService(info.eventHandlerType); var handleMethod = info.eventHandlerType.GetMethod(HandlerMethodName); await ((Task)handleMethod!.Invoke(eventHandlerSubscriber, [receivedEvent]))!; diff --git a/src/Subscribers/Managers/EventSubscriberManager.cs b/src/Subscribers/Managers/EventSubscriberManager.cs index 1a10ed1..ef4fa13 100644 --- a/src/Subscribers/Managers/EventSubscriberManager.cs +++ b/src/Subscribers/Managers/EventSubscriberManager.cs @@ -2,6 +2,8 @@ using EventBus.RabbitMQ.Subscribers.Consumers; using EventBus.RabbitMQ.Subscribers.Models; using EventBus.RabbitMQ.Subscribers.Options; +using EventStorage.Inbox.EventArgs; +using EventStorage.Models; using Microsoft.Extensions.DependencyInjection; namespace EventBus.RabbitMQ.Subscribers.Managers; @@ -9,11 +11,16 @@ namespace EventBus.RabbitMQ.Subscribers.Managers; internal class EventSubscriberManager(RabbitMqOptions defaultSettings, IServiceProvider serviceProvider) : IEventSubscriberManager { + /// + /// The event to be executed before executing the subscriber of the received event. + /// + public static event EventHandler ExecutingSubscribedEvent; + /// /// Dictionary collection to store all event and event handler information /// - private readonly Dictionary - _subscribers = new(); + private static readonly Dictionary + Subscribers = new(); /// /// List of consumers for each unique a queue for different virtual host @@ -25,7 +32,7 @@ public void AddSubscriber(Action where TEventHandler : class, IEventSubscriber { var eventType = typeof(TEvent); - if (_subscribers.TryGetValue(eventType.Name, out var info)) + if (Subscribers.TryGetValue(eventType.Name, out var info)) { options?.Invoke(info.eventSettings); } @@ -35,7 +42,7 @@ public void AddSubscriber(Action options?.Invoke(settings); var handlerType = typeof(TEventHandler); - _subscribers.Add(eventType.Name, (eventType, handlerType, settings)); + Subscribers.Add(eventType.Name, (eventType, handlerType, settings)); } } @@ -47,10 +54,10 @@ public void AddSubscriber(Action /// Settings of subscriber public void AddSubscriber(Type typeOfSubscriber, Type typeOfHandler, EventSubscriberOptions subscriberSettings) { - if (_subscribers.TryGetValue(typeOfSubscriber.Name, out var info)) - _subscribers[typeOfSubscriber.Name] = (info.eventType, info.eventHandlerType, subscriberSettings); + if (Subscribers.TryGetValue(typeOfSubscriber.Name, out var info)) + Subscribers[typeOfSubscriber.Name] = (info.eventType, info.eventHandlerType, subscriberSettings); else - _subscribers.Add(typeOfSubscriber.Name, (typeOfSubscriber, typeOfHandler, subscriberSettings)); + Subscribers.Add(typeOfSubscriber.Name, (typeOfSubscriber, typeOfHandler, subscriberSettings)); } /// @@ -58,9 +65,11 @@ public void AddSubscriber(Type typeOfSubscriber, Type typeOfHandler, EventSubscr /// public void SetVirtualHostAndOwnSettingsOfSubscribers(Dictionary virtualHostsSettings) { - foreach (var (eventTypeName, (_, _, eventSettings)) in _subscribers) + foreach (var (eventTypeName, (_, _, eventSettings)) in Subscribers) { - var virtualHostSettings = string.IsNullOrEmpty(eventSettings.VirtualHostKey) ? defaultSettings : virtualHostsSettings.GetValueOrDefault(eventSettings.VirtualHostKey, defaultSettings); + var virtualHostSettings = string.IsNullOrEmpty(eventSettings.VirtualHostKey) + ? defaultSettings + : virtualHostsSettings.GetValueOrDefault(eventSettings.VirtualHostKey, defaultSettings); eventSettings.SetVirtualHostAndUnassignedSettings(virtualHostSettings, eventTypeName); } } @@ -68,12 +77,14 @@ public void SetVirtualHostAndOwnSettingsOfSubscribers(Dictionary(); - foreach (var (_, eventInfo) in _subscribers) + foreach (var (_, eventInfo) in Subscribers) { - var consumerId = $"{eventInfo.eventSettings.VirtualHostSettings.VirtualHost}-{eventInfo.eventSettings.QueueName}"; - if (!_eventConsumers.TryGetValue(consumerId, value: out IEventConsumerService eventConsumer)) + var consumerId = + $"{eventInfo.eventSettings.VirtualHostSettings.VirtualHost}-{eventInfo.eventSettings.QueueName}"; + if (!_eventConsumers.TryGetValue(consumerId, value: out var eventConsumer)) { - eventConsumer = eventConsumerCreator.Create(eventInfo.eventSettings, serviceProvider, defaultSettings.UseInbox); + eventConsumer = + eventConsumerCreator.Create(eventInfo.eventSettings, serviceProvider, defaultSettings.UseInbox); _eventConsumers.Add(consumerId, eventConsumer); } @@ -83,4 +94,43 @@ public void CreateConsumerForEachQueueAndStartReceivingEvents() foreach (var consumer in _eventConsumers) consumer.Value.StartAndSubscribeReceiver(); } + + /// + /// Invokes the ExecutingReceivedEvent event to be able to execute the event before the subscriber. + /// + /// Executing an event + /// The name of virtual host to being able to get a system name that the event published by it. + public static void OnExecutingSubscribedEvent(ISubscribeEvent @event, string virtualHostName) + { + if (ExecutingSubscribedEvent is null) + return; + + var systemName = virtualHostName.TrimStart('/'); + var eventArgs = new SubscribedMessageBrokerEventArgs + { + Event = @event, + SystemName = systemName + }; + + ExecutingSubscribedEvent.Invoke(null, eventArgs); + } + + /// + /// For handling the ExecutingReceivedEvent event and execute the ExecutingSubscribedEvent event if the + /// + public static void HandleExecutingReceivedEvent(object sender, ReceivedEventArgs e) + { + if (e.ProviderType == EventProviderType.MessageBroker) + { + if (e.Event is not ISubscribeEvent @event) + return; + + var eventTypeName = @event.GetType().Name; + var virtualHostName = Subscribers.TryGetValue(eventTypeName, out var info) + ? info.eventSettings.VirtualHostSettings.VirtualHost + : string.Empty; + + OnExecutingSubscribedEvent(@event, virtualHostName); + } + } } \ No newline at end of file diff --git a/src/Subscribers/Models/SubscribedMessageBrokerEventArgs.cs b/src/Subscribers/Models/SubscribedMessageBrokerEventArgs.cs new file mode 100644 index 0000000..e787666 --- /dev/null +++ b/src/Subscribers/Models/SubscribedMessageBrokerEventArgs.cs @@ -0,0 +1,14 @@ +namespace EventBus.RabbitMQ.Subscribers.Models; + +public class SubscribedMessageBrokerEventArgs : EventArgs +{ + /// + /// Executing received event. + /// + public required ISubscribeEvent Event { get; init; } + + /// + /// The name of system that the event published by it. + /// + public required string SystemName { get; init; } +} \ No newline at end of file diff --git a/tests/UnitTests/Subscribers/EventSubscriberManagerTests.cs b/tests/UnitTests/Subscribers/EventSubscriberManagerTests.cs index eb23211..8fe5559 100644 --- a/tests/UnitTests/Subscribers/EventSubscriberManagerTests.cs +++ b/tests/UnitTests/Subscribers/EventSubscriberManagerTests.cs @@ -33,17 +33,9 @@ public void AddSubscriber_CallingWithGenericAndWithOption_ShouldAdded() { var options = new Action(x => { x.QueueName = "TestQueue"; }); - // Act _subscriberManager.AddSubscriber(options); - // Assert - var field = _subscriberManager.GetType() - .GetField("_subscribers", BindingFlags.NonPublic | BindingFlags.Instance); - field.Should().NotBeNull(); - var subscribers = - (Dictionary) - field?.GetValue(_subscriberManager)!; - + var subscribers = GetSubscribers(); subscribers.Should().ContainKey(nameof(SimpleSubscribeEvent)); subscribers!.First().Value.eventSettings.QueueName.Should().Be("TestQueue"); } @@ -53,21 +45,13 @@ public void AddSubscriber_AddingExistingEventWithNewOptions_ShouldUpdateEventOpt { var options = new Action(x => { x.QueueName = "TestQueue"; }); - // Act _subscriberManager.AddSubscriber(options); _subscriberManager.AddSubscriber(x => { x.QueueName = "TestQueueUpdated"; }); - // Assert - var field = _subscriberManager.GetType() - .GetField("_subscribers", BindingFlags.NonPublic | BindingFlags.Instance); - field.Should().NotBeNull(); - var subscribers = - (Dictionary) - field?.GetValue(_subscriberManager)!; - + var subscribers = GetSubscribers(); subscribers.Should().ContainKey(nameof(SimpleSubscribeEvent)); subscribers!.First().Value.eventSettings.QueueName.Should().Be("TestQueueUpdated"); } @@ -80,17 +64,9 @@ public void AddSubscriber_CallingWithTypesOfEventAndTypeOfHandlerAndWithOptions_ QueueName = "TestQueue", }; - // Act _subscriberManager.AddSubscriber(typeof(SimpleSubscribeEvent), typeof(SimpleEventSubscriberHandler), options); - // Assert - var field = _subscriberManager.GetType() - .GetField("_subscribers", BindingFlags.NonPublic | BindingFlags.Instance); - field.Should().NotBeNull(); - var subscribers = - (Dictionary) - field?.GetValue(_subscriberManager)!; - + var subscribers = GetSubscribers(); subscribers.Should().ContainKey(nameof(SimpleSubscribeEvent)); subscribers!.First().Value.eventSettings.QueueName.Should().Be("TestQueue"); } @@ -103,7 +79,6 @@ public void AddSubscriber_CallingWithTypesAddingExistingEventWithNewOptions_Shou QueueName = "TestQueue", }; - // Act _subscriberManager.AddSubscriber(typeof(SimpleSubscribeEvent), typeof(SimpleEventSubscriberHandler), options); _subscriberManager.AddSubscriber(typeof(SimpleSubscribeEvent), typeof(SimpleEventSubscriberHandler), new EventSubscriberOptions @@ -111,14 +86,7 @@ public void AddSubscriber_CallingWithTypesAddingExistingEventWithNewOptions_Shou QueueName = "TestQueueUpdated" }); - // Assert - var field = _subscriberManager.GetType() - .GetField("_subscribers", BindingFlags.NonPublic | BindingFlags.Instance); - field.Should().NotBeNull(); - var subscribers = - (Dictionary) - field?.GetValue(_subscriberManager)!; - + var subscribers = GetSubscribers(); subscribers.Should().ContainKey(nameof(SimpleSubscribeEvent)); subscribers!.First().Value.eventSettings.QueueName.Should().Be("TestQueueUpdated"); } @@ -126,18 +94,10 @@ public void AddSubscriber_CallingWithTypesAddingExistingEventWithNewOptions_Shou [Test] public void AddSubscriber_CallingWithTypesAndWithDefaultSettings_ShouldAdded() { - // Act _subscriberManager.AddSubscriber(typeof(SimpleSubscribeEvent), typeof(SimpleEventSubscriberHandler), new EventSubscriberOptions()); - - // Assert - var field = _subscriberManager.GetType() - .GetField("_subscribers", BindingFlags.NonPublic | BindingFlags.Instance); - field.Should().NotBeNull(); - var subscribers = - (Dictionary) - field?.GetValue(_subscriberManager)!; - + + var subscribers = GetSubscribers(); subscribers.Should().ContainKey(nameof(SimpleSubscribeEvent)); subscribers!.First().Value.eventSettings.QueueName.Should().BeNull(); } @@ -161,20 +121,11 @@ public void SetVirtualHostAndOwnSettingsOfSubscribers_WithVirtualHostSettings_Sh }; var options = new Action(x => { x.VirtualHostKey = "TestVirtualHostKey"; }); - _subscriberManager.AddSubscriber(options); - // Act _subscriberManager.SetVirtualHostAndOwnSettingsOfSubscribers(virtualHostsSettings); - // Assert - var field = _subscriberManager.GetType() - .GetField("_subscribers", BindingFlags.NonPublic | BindingFlags.Instance); - field.Should().NotBeNull(); - var subscribers = - (Dictionary) - field?.GetValue(_subscriberManager)!; - + var subscribers = GetSubscribers(); subscribers.Should().ContainKey(nameof(SimpleSubscribeEvent)); subscribers!.First().Value.eventSettings.VirtualHostSettings.VirtualHost.Should().Be("TestVirtualHost"); } @@ -186,7 +137,6 @@ public void SetVirtualHostAndOwnSettingsOfSubscribers_WithVirtualHostSettings_Sh [Test] public void CreateConsumerForEachQueueAndStartReceivingEvents_WithSubscribers_ShouldCreateConsumer() { - // Arrange var options = new Action(x => { x.VirtualHostKey = "TestVirtualHostKey"; @@ -218,10 +168,8 @@ public void CreateConsumerForEachQueueAndStartReceivingEvents_WithSubscribers_Sh _subscriberManager.AddSubscriber(options); _subscriberManager.SetVirtualHostAndOwnSettingsOfSubscribers(virtualHostsSettings); - // Act _subscriberManager.CreateConsumerForEachQueueAndStartReceivingEvents(); - // Assert var field = _subscriberManager.GetType() .GetField("_eventConsumers", BindingFlags.NonPublic | BindingFlags.Instance); field.Should().NotBeNull(); @@ -239,4 +187,21 @@ public void CreateConsumerForEachQueueAndStartReceivingEvents_WithSubscribers_Sh } #endregion + + #region Helper method + + private static readonly FieldInfo SubscribersProperty = typeof(EventSubscriberManager) + .GetField("Subscribers", BindingFlags.NonPublic | BindingFlags.Static); + + Dictionary GetSubscribers() + { + var eventSubscriberManager = typeof(EventSubscriberManager); + var subscribers = + (Dictionary) + SubscribersProperty?.GetValue(eventSubscriberManager)!; + + return subscribers; + } + + #endregion } \ No newline at end of file