Skip to content

Commit

Permalink
Merge pull request #5451 from mhsdesign/task/add-debug-metadata-to-ev…
Browse files Browse the repository at this point in the history
…ents

TASK: Add correlation id and debug metadata to events
  • Loading branch information
mhsdesign authored Jan 28, 2025
2 parents ac7a598 + 2647207 commit 82b5a18
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 86 deletions.
37 changes: 22 additions & 15 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint;
use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph;
use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\EventStore\Events as DomainEvents;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\EventStore\InitiatingEventMetadata;
use Neos\ContentRepository\Core\Feature\Security\AuthProviderInterface;
Expand All @@ -34,13 +37,17 @@
use Neos\ContentRepository\Core\Projection\ProjectionStates;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist;
use Neos\ContentRepository\Core\SharedModel\Id\UuidFactory;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreams;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspace;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event\CorrelationId;
use Neos\EventStore\Model\Events;
use Psr\Clock\ClockInterface;

/**
Expand Down Expand Up @@ -90,11 +97,11 @@ public function handle(CommandInterface $command): void
}

$toPublish = $this->commandBus->handle($command);
$correlationId = CorrelationId::fromString(sprintf('%s_%s', substr($command::class, strrpos($command::class, '\\') + 1, 20), bin2hex(random_bytes(9))));

// simple case
if ($toPublish instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish);
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion);
$this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, $correlationId), $toPublish->expectedVersion);
$fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it.
if ($fullCatchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors);
Expand All @@ -104,10 +111,9 @@ public function handle(CommandInterface $command): void

// control-flow aware command handling via generator
try {
foreach ($toPublish as $yieldedEventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish);
foreach ($toPublish as $eventsToPublish) {
try {
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion);
$this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion);
} catch (ConcurrencyException $concurrencyException) {
// we pass the exception into the generator (->throw), so it could be try-caught and reacted upon:
//
Expand All @@ -119,7 +125,7 @@ public function handle(CommandInterface $command): void
// }
$yieldedErrorStrategy = $toPublish->throw($concurrencyException);
if ($yieldedErrorStrategy instanceof EventsToPublish) {
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->eventNormalizer->normalizeEvents($yieldedErrorStrategy->events), $yieldedErrorStrategy->expectedVersion);
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion);
}
throw $concurrencyException;
}
Expand Down Expand Up @@ -204,19 +210,20 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface
return $this->contentDimensionSource;
}

private function enrichEventsToPublishWithMetadata(EventsToPublish $eventsToPublish): EventsToPublish
private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId $correlationId): Events
{
$initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser();
$initiatingTimestamp = $this->clock->now();

return new EventsToPublish(
$eventsToPublish->streamName,
InitiatingEventMetadata::enrichEventsWithInitiatingMetadata(
$eventsToPublish->events,
$initiatingUserId,
$initiatingTimestamp
),
$eventsToPublish->expectedVersion,
$eventsWithMetaData = InitiatingEventMetadata::enrichEventsWithInitiatingMetadata(
$events,
$initiatingUserId,
$initiatingTimestamp
);

return Events::fromArray($eventsWithMetaData->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) {
$decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId);
return $this->eventNormalizer->normalize($decoratedEvent);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Neos\ContentRepository\Core\EventStore;

use Neos\ContentRepository\Core\EventStore\Events as DomainEvents;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened;
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
Expand All @@ -26,7 +25,6 @@
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Dto\SubtreeTag;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged;
use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated;
Expand All @@ -45,7 +43,6 @@
use Neos\EventStore\Model\Event\EventData;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Events;

/**
* Central authority to convert Content Repository domain events to Event Store EventData and EventType, vice versa.
Expand Down Expand Up @@ -156,11 +153,6 @@ public function normalize(EventInterface|DecoratedEvent $event): Event
);
}

public function normalizeEvents(DomainEvents $events): Events
{
return Events::fromArray($events->map($this->normalize(...)));
}

public function denormalize(Event $event): EventInterface
{
$eventClassName = $this->getEventClassName($event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Neos\ContentRepository\Core\Feature;

use Neos\ContentRepository\Core\CommandHandler\CommandHandlingDependencies;
use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\Events;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed;
Expand Down Expand Up @@ -50,16 +51,20 @@ private function closeContentStream(
*/
private function reopenContentStreamWithoutConstraintChecks(
ContentStreamId $contentStreamId,
string $debugReason
): EventsToPublish {
return new EventsToPublish(
ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(),
Events::with(
new ContentStreamWasReopened(
$contentStreamId
),
DecoratedEvent::create(
new ContentStreamWasReopened(
$contentStreamId
),
metadata: array_filter(['debug_reason' => $debugReason])
)
),
// We operate here without constraints on purpose to ensure this can be commited.
//Constraints have been checked beforehand and its expected that the content stream is closed.
// Constraints have been checked beforehand and its expected that the content stream is closed.
ExpectedVersion::ANY()
);
}
Expand All @@ -74,16 +79,20 @@ private function reopenContentStreamWithoutConstraintChecks(
private function forkContentStream(
ContentStreamId $newContentStreamId,
ContentStreamId $sourceContentStreamId,
Version $sourceContentStreamVersion
Version $sourceContentStreamVersion,
string $debugReason
): EventsToPublish {
return new EventsToPublish(
ContentStreamEventStreamName::fromContentStreamId($newContentStreamId)->getEventStreamName(),
Events::with(
new ContentStreamWasForked(
$newContentStreamId,
$sourceContentStreamId,
$sourceContentStreamVersion,
),
DecoratedEvent::create(
event: new ContentStreamWasForked(
$newContentStreamId,
$sourceContentStreamId,
$sourceContentStreamVersion,
),
metadata: ['debug_reason' => $debugReason]
)
),
// NO_STREAM to ensure the "fork" happens as the first event of the new content stream
ExpectedVersion::NO_STREAM()
Expand Down
Loading

0 comments on commit 82b5a18

Please sign in to comment.