Skip to content

Commit

Permalink
Merge pull request #12 from alifcapital/mirolimmajidov-acm1-382
Browse files Browse the repository at this point in the history
EventConsumerService: ERROR while reading the headers
  • Loading branch information
MirolimMajidov authored Dec 11, 2024
2 parents 5de7bae + 08124a9 commit 9d55aca
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
4 changes: 3 additions & 1 deletion src/Publishers/Managers/EventPublisherManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ public void Publish<TEventPublisher>(TEventPublisher @event) where TEventPublish

var headers = new Dictionary<string, object>();
properties.Headers = headers;
headers.Add(EventBusTraceInstrumentation.TraceParentIdKey, activity?.Id);
if(activity is not null)
headers.Add(EventBusTraceInstrumentation.TraceParentIdKey, activity.Id);

if (@event.Headers?.Any() == true)
{
foreach (var item in @event.Headers)
Expand Down
18 changes: 13 additions & 5 deletions src/Subscribers/Consumers/EventConsumerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
catch (Exception e)
{
var eventPayloadData = $"{EventBusTraceInstrumentation.EventPayloadTag}: {eventPayload}";
var eventHeadersData = $"{EventBusTraceInstrumentation.EventHeadersTag}: {SerializeData(eventArgs.BasicProperties.Headers)}";
var eventHeadersData =
$"{EventBusTraceInstrumentation.EventHeadersTag}: {SerializeData(eventArgs.BasicProperties.Headers)}";
_logger.LogError(e,
"----- ERROR while reading the headers of '{EventType}' event type with the '{RoutingKey}' routing key and '{EventId}' event id. {EventPayload}, {Headers}.",
eventType, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, eventPayloadData, eventHeadersData);
eventType, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, eventPayloadData,
eventHeadersData);

return;
}
Expand All @@ -121,11 +123,13 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
ActivityKind.Consumer, traceParentId);

if (EventBusTraceInstrumentation.ShouldAttachEventPayload)
activity?.AddEvent(new ActivityEvent($"{EventBusTraceInstrumentation.EventPayloadTag}: {eventPayload}"));
activity?.AddEvent(
new ActivityEvent($"{EventBusTraceInstrumentation.EventPayloadTag}: {eventPayload}"));

var headersAsJson = SerializeData(headers);
if (EventBusTraceInstrumentation.ShouldAttachEventHeaders)
activity?.AddEvent(new ActivityEvent($"{EventBusTraceInstrumentation.EventHeadersTag}: {headersAsJson}"));
activity?.AddEvent(
new ActivityEvent($"{EventBusTraceInstrumentation.EventHeadersTag}: {headersAsJson}"));

if (_subscribers.TryGetValue(eventType,
out (Type eventType, Type eventHandlerType, EventSubscriberOptions eventSettings) info))
Expand Down Expand Up @@ -180,6 +184,8 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
eventType, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId);
}

#region Helper methods

void MarkEventIsDelivered()
{
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
Expand All @@ -197,12 +203,14 @@ Dictionary<string, string> GetEventHeaders()
{
foreach (var header in eventArgs.BasicProperties.Headers)
{
var headerValue = Encoding.UTF8.GetString((byte[])header.Value);
var headerValue = header.Value is null ? null : Encoding.UTF8.GetString((byte[])header.Value);
eventHeaders.Add(header.Key, headerValue);
}
}

return eventHeaders;
}

#endregion
}
}

0 comments on commit 9d55aca

Please sign in to comment.