diff --git a/src/Subscribers/Consumers/EventConsumerService.cs b/src/Subscribers/Consumers/EventConsumerService.cs index 951f315..c8f743c 100644 --- a/src/Subscribers/Consumers/EventConsumerService.cs +++ b/src/Subscribers/Consumers/EventConsumerService.cs @@ -5,6 +5,7 @@ using EventBus.RabbitMQ.Instrumentation.Trace; using EventBus.RabbitMQ.Subscribers.Models; using EventBus.RabbitMQ.Subscribers.Options; +using EventStorage.Exceptions; using EventStorage.Inbox.Managers; using EventStorage.Models; using Microsoft.Extensions.DependencyInjection; @@ -136,37 +137,33 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA { _logger.LogTrace("Received RabbitMQ event, Type is {EventType} and EventId is {EventId}", eventType, eventArgs.BasicProperties.MessageId); - var jsonSerializerSetting = info.eventSettings.GetJsonSerializer(); - var receivedEvent = - JsonSerializer.Deserialize(eventPayload, info.eventType, jsonSerializerSetting) as ISubscribeEvent; - - receivedEvent!.EventId = Guid.TryParse(eventArgs.BasicProperties.MessageId, out Guid messageId) - ? messageId - : Guid.NewGuid(); + var eventId = Guid.TryParse(eventArgs.BasicProperties.MessageId, out Guid messageId) + ? messageId : Guid.NewGuid(); using var scope = _serviceProvider.CreateScope(); if (_useInbox) { IEventReceiverManager eventReceiverManager = scope.ServiceProvider.GetService(); - if (eventReceiverManager is not null) - { - _ = eventReceiverManager.Received(receivedEvent, eventArgs.RoutingKey, - EventProviderType.MessageBroker, headersAsJson); - MarkEventIsDelivered(); - - return; - } - - _logger.LogWarning( - "The RabbitMQ is configured to use the Inbox for received events, but the Inbox functionality of the EventStorage is not enabled. So, the {EventSubscriber} event subscriber of an event will be executed immediately for the event id: {EventId};", - info.eventHandlerType.Name, receivedEvent!.EventId); - } + if (eventReceiverManager is null) + throw new EventStoreException( + $"The RabbitMQ is configured to use the Inbox for received events, but the Inbox functionality of the EventStorage is not enabled. So, the {info.eventHandlerType.Name} event subscriber of an event will be executed immediately for the event id: {eventId};"); - receivedEvent!.Headers = headers; - var eventHandlerSubscriber = scope.ServiceProvider.GetRequiredService(info.eventHandlerType); - var handleMethod = info.eventHandlerType.GetMethod(HandlerMethodName); - await ((Task)handleMethod!.Invoke(eventHandlerSubscriber, [receivedEvent]))!; + _ = eventReceiverManager.Received(eventId, info.eventType.Name, eventArgs.RoutingKey, + EventProviderType.MessageBroker, payload: eventPayload, headers: headersAsJson); + } + else + { + var jsonSerializerSetting = info.eventSettings.GetJsonSerializer(); + var receivedEvent = + JsonSerializer.Deserialize(eventPayload, info.eventType, jsonSerializerSetting) as ISubscribeEvent; + + receivedEvent!.EventId = eventId; + receivedEvent!.Headers = headers; + var eventHandlerSubscriber = scope.ServiceProvider.GetRequiredService(info.eventHandlerType); + var handleMethod = info.eventHandlerType.GetMethod(HandlerMethodName); + await ((Task)handleMethod!.Invoke(eventHandlerSubscriber, [receivedEvent]))!; + } MarkEventIsDelivered(); } @@ -185,7 +182,7 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA } #region Helper methods - + void MarkEventIsDelivered() { _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);