Skip to content

Commit

Permalink
Changed the logic to store an original payload of incoming event
Browse files Browse the repository at this point in the history
  • Loading branch information
MirolimMajidov committed Dec 12, 2024
1 parent 82375f4 commit f064c63
Showing 1 changed file with 22 additions and 25 deletions.
47 changes: 22 additions & 25 deletions src/Subscribers/Consumers/EventConsumerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using EventBus.RabbitMQ.Instrumentation.Trace;
using EventBus.RabbitMQ.Subscribers.Models;
using EventBus.RabbitMQ.Subscribers.Options;
using EventStorage.Exceptions;
using EventStorage.Inbox.Managers;
using EventStorage.Models;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -136,37 +137,33 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
{
_logger.LogTrace("Received RabbitMQ event, Type is {EventType} and EventId is {EventId}", eventType,
eventArgs.BasicProperties.MessageId);
var jsonSerializerSetting = info.eventSettings.GetJsonSerializer();
var receivedEvent =
JsonSerializer.Deserialize(eventPayload, info.eventType, jsonSerializerSetting) as ISubscribeEvent;

receivedEvent!.EventId = Guid.TryParse(eventArgs.BasicProperties.MessageId, out Guid messageId)
? messageId
: Guid.NewGuid();
var eventId = Guid.TryParse(eventArgs.BasicProperties.MessageId, out Guid messageId)
? messageId : Guid.NewGuid();

using var scope = _serviceProvider.CreateScope();
if (_useInbox)
{
IEventReceiverManager eventReceiverManager =
scope.ServiceProvider.GetService<IEventReceiverManager>();
if (eventReceiverManager is not null)
{
_ = eventReceiverManager.Received(receivedEvent, eventArgs.RoutingKey,
EventProviderType.MessageBroker, headersAsJson);
MarkEventIsDelivered();

return;
}

_logger.LogWarning(
"The RabbitMQ is configured to use the Inbox for received events, but the Inbox functionality of the EventStorage is not enabled. So, the {EventSubscriber} event subscriber of an event will be executed immediately for the event id: {EventId};",
info.eventHandlerType.Name, receivedEvent!.EventId);
}
if (eventReceiverManager is null)
throw new EventStoreException(
$"The RabbitMQ is configured to use the Inbox for received events, but the Inbox functionality of the EventStorage is not enabled. So, the {info.eventHandlerType.Name} event subscriber of an event will be executed immediately for the event id: {eventId};");

receivedEvent!.Headers = headers;
var eventHandlerSubscriber = scope.ServiceProvider.GetRequiredService(info.eventHandlerType);
var handleMethod = info.eventHandlerType.GetMethod(HandlerMethodName);
await ((Task)handleMethod!.Invoke(eventHandlerSubscriber, [receivedEvent]))!;
_ = eventReceiverManager.Received(eventId, info.eventType.Name, eventArgs.RoutingKey,
EventProviderType.MessageBroker, payload: eventPayload, headers: headersAsJson);
}
else
{
var jsonSerializerSetting = info.eventSettings.GetJsonSerializer();
var receivedEvent =
JsonSerializer.Deserialize(eventPayload, info.eventType, jsonSerializerSetting) as ISubscribeEvent;

receivedEvent!.EventId = eventId;
receivedEvent!.Headers = headers;
var eventHandlerSubscriber = scope.ServiceProvider.GetRequiredService(info.eventHandlerType);
var handleMethod = info.eventHandlerType.GetMethod(HandlerMethodName);
await ((Task)handleMethod!.Invoke(eventHandlerSubscriber, [receivedEvent]))!;
}

MarkEventIsDelivered();
}
Expand All @@ -185,7 +182,7 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
}

#region Helper methods

void MarkEventIsDelivered()
{
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
Expand Down

0 comments on commit f064c63

Please sign in to comment.