Skip to content

Commit

Permalink
Added event named ExecutingSubscribedEvent to handled executing recei…
Browse files Browse the repository at this point in the history
…ved event
  • Loading branch information
MirolimMajidov committed Dec 27, 2024
1 parent 43ece38 commit 76157a4
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/EventBus.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AlifCapital.EventStorage" Version="9.0.6" />
<PackageReference Include="AlifCapital.EventStorage" Version="9.0.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.0" />
Expand Down
29 changes: 23 additions & 6 deletions src/Extensions/RabbitMqExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using EventBus.RabbitMQ.Subscribers.Options;
using EventStorage.Configurations;
using EventStorage.Extensions;
using EventStorage.Inbox.EventArgs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

Expand All @@ -28,19 +29,33 @@ public static class RabbitMqExtensions
/// <param name="eventSubscriberManagerOptions">Options to register subscriber with the settings. It will overwrite existing subscriber setting if exists</param>
/// <param name="eventStoreOptions">Options to overwrite default settings of Inbox and Outbox.</param>
/// <param name="assemblies">Assemblies to find and load publisher and subscribers</param>
/// <param name="executingSubscribedEvent">The event for subscribing to the executing subscribed event of MessageBroker</param>
/// <param name="executingReceivedEvent">The event for subscribing to the executing received event of Inbox</param>
public static void AddRabbitMqEventBus(this IServiceCollection services, IConfiguration configuration,
Assembly[] assemblies,
Action<RabbitMqOptions> defaultOptions = null,
Action<Dictionary<string, RabbitMqHostSettings>> virtualHostSettingsOptions = null,
Action<EventPublisherManagerOptions> eventPublisherManagerOptions = null,
Action<EventSubscriberManagerOptions> eventSubscriberManagerOptions = null,
Action<InboxAndOutboxOptions> eventStoreOptions = null)
Action<InboxAndOutboxOptions> eventStoreOptions = null,
EventHandler<SubscribedMessageBrokerEventArgs> executingSubscribedEvent = null,
EventHandler<ReceivedEventArgs> executingReceivedEvent = null)
{
services.AddEventStore(configuration, assemblies: assemblies, options: eventStoreOptions);
var eventsToSubscribe = new List<EventHandler<ReceivedEventArgs>>();
if (executingReceivedEvent is not null)
eventsToSubscribe.Add(executingReceivedEvent);

if (executingSubscribedEvent is not null)
{
EventSubscriberManager.ExecutingSubscribedEvent += executingSubscribedEvent;
eventsToSubscribe.Add(EventSubscriberManager.HandleExecutingReceivedEvent);
}
services.AddEventStore(configuration, assemblies: assemblies, options: eventStoreOptions, executingReceivedEvents: eventsToSubscribe.ToArray());

var settings = configuration.GetSection(nameof(RabbitMqSettings)).Get<RabbitMqSettings>() ??
new RabbitMqSettings();
LoadDefaultRabbitMqOptions(settings, defaultOptions);

if (!settings.DefaultSettings.IsEnabled) return;

services.AddSingleton(settings.DefaultSettings);
Expand Down Expand Up @@ -144,14 +159,15 @@ private static void RegisterAllPublishers(EventPublisherManager publisherManager
/// </summary>
/// <param name="settingsFromConfig">Main settings from configuration</param>
/// <param name="defaultOptions">Settings option to overwrite the default settings</param>
private static void LoadDefaultRabbitMqOptions(RabbitMqSettings settingsFromConfig, Action<RabbitMqOptions> defaultOptions)
private static void LoadDefaultRabbitMqOptions(RabbitMqSettings settingsFromConfig,
Action<RabbitMqOptions> defaultOptions)
{
var defaultSettings = RabbitMqOptionsConstant.CreateDefaultRabbitMqOptions();
if (settingsFromConfig.DefaultSettings is null)
settingsFromConfig.DefaultSettings = defaultSettings;
else
settingsFromConfig.DefaultSettings.CopyNotAssignedSettingsFrom(defaultSettings);

defaultOptions?.Invoke(settingsFromConfig.DefaultSettings);
}

Expand All @@ -173,13 +189,14 @@ private static void RegisterAllSubscribers(EventSubscriberManager subscriberMana
}
}

private static void RegisterAllSubscriberReceiversToDependencyInjection(IServiceCollection services, Assembly[] assemblies)
private static void RegisterAllSubscriberReceiversToDependencyInjection(IServiceCollection services,
Assembly[] assemblies)
{
var subscriberReceiverTypes = GetSubscriberReceiverTypes(assemblies);
foreach (var (_, handlerType) in subscriberReceiverTypes)
services.AddTransient(handlerType);
}

static readonly Type SubscriberReceiverType = typeof(IEventSubscriber<>);

internal static List<(Type eventType, Type handlerType)> GetSubscriberReceiverTypes(Assembly[] assemblies)
Expand Down
13 changes: 10 additions & 3 deletions src/Subscribers/Consumers/EventConsumerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Text.Json;
using EventBus.RabbitMQ.Connections;
using EventBus.RabbitMQ.Instrumentation.Trace;
using EventBus.RabbitMQ.Subscribers.Managers;
using EventBus.RabbitMQ.Subscribers.Models;
using EventBus.RabbitMQ.Subscribers.Options;
using EventStorage.Exceptions;
Expand Down Expand Up @@ -138,7 +139,8 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
_logger.LogTrace("Received RabbitMQ event, Type is {EventType} and EventId is {EventId}", eventType,
eventArgs.BasicProperties.MessageId);
var eventId = Guid.TryParse(eventArgs.BasicProperties.MessageId, out Guid messageId)
? messageId : Guid.NewGuid();
? messageId
: Guid.NewGuid();

using var scope = _serviceProvider.CreateScope();
if (_useInbox)
Expand All @@ -150,16 +152,21 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
$"The RabbitMQ is configured to use the Inbox for received events, but the Inbox functionality of the EventStorage is not enabled. So, the {info.eventHandlerType.Name} event subscriber of an event will be executed immediately for the event id: {eventId};");

_ = eventReceiverManager.Received(eventId, info.eventType.Name, eventArgs.RoutingKey,
EventProviderType.MessageBroker, payload: eventPayload, headers: headersAsJson, namingPolicyType: info.eventSettings.PropertyNamingPolicy ?? NamingPolicyType.PascalCase);
EventProviderType.MessageBroker, payload: eventPayload, headers: headersAsJson,
namingPolicyType: info.eventSettings.PropertyNamingPolicy ?? NamingPolicyType.PascalCase);
}
else
{
var jsonSerializerSetting = info.eventSettings.GetJsonSerializer();
var receivedEvent =
JsonSerializer.Deserialize(eventPayload, info.eventType, jsonSerializerSetting) as ISubscribeEvent;
JsonSerializer.Deserialize(eventPayload, info.eventType, jsonSerializerSetting) as
ISubscribeEvent;

receivedEvent!.EventId = eventId;
receivedEvent!.Headers = headers;

EventSubscriberManager.OnExecutingSubscribedEvent(receivedEvent, _connectionOptions.VirtualHostSettings.VirtualHost);

var eventHandlerSubscriber = scope.ServiceProvider.GetRequiredService(info.eventHandlerType);
var handleMethod = info.eventHandlerType.GetMethod(HandlerMethodName);
await ((Task)handleMethod!.Invoke(eventHandlerSubscriber, [receivedEvent]))!;
Expand Down
76 changes: 63 additions & 13 deletions src/Subscribers/Managers/EventSubscriberManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@
using EventBus.RabbitMQ.Subscribers.Consumers;
using EventBus.RabbitMQ.Subscribers.Models;
using EventBus.RabbitMQ.Subscribers.Options;
using EventStorage.Inbox.EventArgs;
using EventStorage.Models;
using Microsoft.Extensions.DependencyInjection;

namespace EventBus.RabbitMQ.Subscribers.Managers;

internal class EventSubscriberManager(RabbitMqOptions defaultSettings, IServiceProvider serviceProvider)
: IEventSubscriberManager
{
/// <summary>
/// The event to be executed before executing the subscriber of the received event.
/// </summary>
public static event EventHandler<SubscribedMessageBrokerEventArgs> ExecutingSubscribedEvent;

/// <summary>
/// Dictionary collection to store all event and event handler information
/// </summary>
private readonly Dictionary<string, (Type eventType, Type eventHandlerType, EventSubscriberOptions eventSettings)>
_subscribers = new();
private static readonly Dictionary<string, (Type eventType, Type eventHandlerType, EventSubscriberOptions eventSettings)>
Subscribers = new();

/// <summary>
/// List of consumers for each unique a queue for different virtual host
Expand All @@ -25,7 +32,7 @@ public void AddSubscriber<TEvent, TEventHandler>(Action<EventSubscriberOptions>
where TEventHandler : class, IEventSubscriber<TEvent>
{
var eventType = typeof(TEvent);
if (_subscribers.TryGetValue(eventType.Name, out var info))
if (Subscribers.TryGetValue(eventType.Name, out var info))
{
options?.Invoke(info.eventSettings);
}
Expand All @@ -35,7 +42,7 @@ public void AddSubscriber<TEvent, TEventHandler>(Action<EventSubscriberOptions>
options?.Invoke(settings);

var handlerType = typeof(TEventHandler);
_subscribers.Add(eventType.Name, (eventType, handlerType, settings));
Subscribers.Add(eventType.Name, (eventType, handlerType, settings));
}
}

Expand All @@ -47,33 +54,37 @@ public void AddSubscriber<TEvent, TEventHandler>(Action<EventSubscriberOptions>
/// <param name="subscriberSettings">Settings of subscriber</param>
public void AddSubscriber(Type typeOfSubscriber, Type typeOfHandler, EventSubscriberOptions subscriberSettings)
{
if (_subscribers.TryGetValue(typeOfSubscriber.Name, out var info))
_subscribers[typeOfSubscriber.Name] = (info.eventType, info.eventHandlerType, subscriberSettings);
if (Subscribers.TryGetValue(typeOfSubscriber.Name, out var info))
Subscribers[typeOfSubscriber.Name] = (info.eventType, info.eventHandlerType, subscriberSettings);
else
_subscribers.Add(typeOfSubscriber.Name, (typeOfSubscriber, typeOfHandler, subscriberSettings));
Subscribers.Add(typeOfSubscriber.Name, (typeOfSubscriber, typeOfHandler, subscriberSettings));
}

/// <summary>
/// Setting the virtual host and other unassigned settings of subscribers
/// </summary>
public void SetVirtualHostAndOwnSettingsOfSubscribers(Dictionary<string, RabbitMqHostSettings> virtualHostsSettings)
{
foreach (var (eventTypeName, (_, _, eventSettings)) in _subscribers)
foreach (var (eventTypeName, (_, _, eventSettings)) in Subscribers)
{
var virtualHostSettings = string.IsNullOrEmpty(eventSettings.VirtualHostKey) ? defaultSettings : virtualHostsSettings.GetValueOrDefault(eventSettings.VirtualHostKey, defaultSettings);
var virtualHostSettings = string.IsNullOrEmpty(eventSettings.VirtualHostKey)
? defaultSettings
: virtualHostsSettings.GetValueOrDefault(eventSettings.VirtualHostKey, defaultSettings);
eventSettings.SetVirtualHostAndUnassignedSettings(virtualHostSettings, eventTypeName);
}
}

public void CreateConsumerForEachQueueAndStartReceivingEvents()
{
var eventConsumerCreator = serviceProvider.GetRequiredService<IEventConsumerServiceCreator>();
foreach (var (_, eventInfo) in _subscribers)
foreach (var (_, eventInfo) in Subscribers)
{
var consumerId = $"{eventInfo.eventSettings.VirtualHostSettings.VirtualHost}-{eventInfo.eventSettings.QueueName}";
if (!_eventConsumers.TryGetValue(consumerId, value: out IEventConsumerService eventConsumer))
var consumerId =
$"{eventInfo.eventSettings.VirtualHostSettings.VirtualHost}-{eventInfo.eventSettings.QueueName}";
if (!_eventConsumers.TryGetValue(consumerId, value: out var eventConsumer))
{
eventConsumer = eventConsumerCreator.Create(eventInfo.eventSettings, serviceProvider, defaultSettings.UseInbox);
eventConsumer =
eventConsumerCreator.Create(eventInfo.eventSettings, serviceProvider, defaultSettings.UseInbox);
_eventConsumers.Add(consumerId, eventConsumer);
}

Expand All @@ -83,4 +94,43 @@ public void CreateConsumerForEachQueueAndStartReceivingEvents()
foreach (var consumer in _eventConsumers)
consumer.Value.StartAndSubscribeReceiver();
}

/// <summary>
/// Invokes the ExecutingReceivedEvent event to be able to execute the event before the subscriber.
/// </summary>
/// <param name="event">Executing an event</param>
/// <param name="virtualHostName">The name of virtual host to being able to get a system name that the event published by it.</param>
public static void OnExecutingSubscribedEvent(ISubscribeEvent @event, string virtualHostName)
{
if (ExecutingSubscribedEvent is null)
return;

var systemName = virtualHostName.TrimStart('/');
var eventArgs = new SubscribedMessageBrokerEventArgs
{
Event = @event,
SystemName = systemName
};

ExecutingSubscribedEvent.Invoke(null, eventArgs);
}

/// <summary>
/// For handling the ExecutingReceivedEvent event and execute the ExecutingSubscribedEvent event if the
/// </summary>
public static void HandleExecutingReceivedEvent(object sender, ReceivedEventArgs e)
{
if (e.ProviderType == EventProviderType.MessageBroker)
{
if (e.Event is not ISubscribeEvent @event)
return;

var eventTypeName = @event.GetType().Name;
var virtualHostName = Subscribers.TryGetValue(eventTypeName, out var info)
? info.eventSettings.VirtualHostSettings.VirtualHost
: string.Empty;

OnExecutingSubscribedEvent(@event, virtualHostName);
}
}
}
14 changes: 14 additions & 0 deletions src/Subscribers/Models/SubscribedMessageBrokerEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace EventBus.RabbitMQ.Subscribers.Models;

public class SubscribedMessageBrokerEventArgs : EventArgs
{
/// <summary>
/// Executing received event.
/// </summary>
public required ISubscribeEvent Event { get; init; }

/// <summary>
/// The name of system that the event published by it.
/// </summary>
public required string SystemName { get; init; }
}

0 comments on commit 76157a4

Please sign in to comment.