From 2d22808ad029a5568a0c450c8238fc5e007db95e Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 30 Mar 2024 11:57:23 +0100 Subject: [PATCH 01/12] WIP: Set causationId and meta data for structure adjustments --- .../src/StructureAdjustmentService.php | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php index bf18eaea3b..51cdaeade4 100644 --- a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php +++ b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php @@ -6,7 +6,10 @@ use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph; +use Neos\ContentRepository\Core\EventStore\DecoratedEvent; +use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\EventStore\EventNormalizer; +use Neos\ContentRepository\Core\EventStore\Events; use Neos\ContentRepository\Core\EventStore\EventsToPublish; use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface; use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter; @@ -22,7 +25,8 @@ use Neos\ContentRepository\StructureAdjustment\Adjustment\TetheredNodeAdjustments; use Neos\ContentRepository\StructureAdjustment\Adjustment\UnknownNodeTypeAdjustment; use Neos\EventStore\EventStoreInterface; -use Neos\EventStore\Model\Events; +use Neos\EventStore\Model\Event\EventId; +use Neos\EventStore\Model\Event\EventMetadata; class StructureAdjustmentService implements ContentRepositoryServiceInterface { @@ -109,8 +113,16 @@ public function fixError(StructureAdjustment $adjustment): void $remediation = $adjustment->remediation; $eventsToPublish = $remediation(); assert($eventsToPublish instanceof EventsToPublish); - $normalizedEvents = Events::fromArray( - $eventsToPublish->events->map($this->eventNormalizer->normalize(...)) + + $eventsWithMetaData = self::eventsWithCausationOfFirstEventAndAdditionalMetaData( + $eventsToPublish->events, + EventMetadata::fromArray([ + 'structureAdjustment' => mb_strimwidth($adjustment->render() , 0, 250, '…') + ]) + ); + + $normalizedEvents = \Neos\EventStore\Model\Events::fromArray( + $eventsWithMetaData->map($this->eventNormalizer->normalize(...)) ); $this->eventStore->commit( $eventsToPublish->streamName, @@ -119,4 +131,24 @@ public function fixError(StructureAdjustment $adjustment): void ); $this->subscriptionEngine->catchUpActive(); } + + private static function eventsWithCausationOfFirstEventAndAdditionalMetaData(Events $events, EventMetadata $metadata): Events + { + /** @var non-empty-list $restEvents */ + $restEvents = iterator_to_array($events); + $firstEvent = array_shift($restEvents); + + if ($firstEvent instanceof DecoratedEvent && $firstEvent->eventMetadata) { + $metadata = EventMetadata::fromArray(array_merge($firstEvent->eventMetadata->value, $metadata->value)); + } + + $decoratedFirstEvent = DecoratedEvent::create($firstEvent, eventId: EventId::create(), metadata: $metadata); + + $decoratedRestEvents = []; + foreach ($restEvents as $event) { + $decoratedRestEvents[] = DecoratedEvent::create($event, causationId: $decoratedFirstEvent->eventId); + } + + return Events::fromArray([$decoratedFirstEvent, ...$decoratedRestEvents]); + } } From 4c64a017422c2abf1f48c9923d05e80ef091641a Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 26 Jan 2025 21:45:12 +0100 Subject: [PATCH 02/12] TASK: Use correlation id instead of causation id to group adjustment events Applies suggestion originally posted in https://github.com/neos/neos-development-collection/pull/4969#discussion_r1581825304 the resulting event log would be similar to: | event | correlation id | metadata | |---------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------------| | NodeAggregateWithNodeWasCreated | 123456 | `{structureAdjustment: 'Content Stream: %s; Dimension Space Point: %s, Node Aggregate: %s --- The tethered child node "bar" is missing.'}` | | NodePropertiesWereSet | 123456 | | --- .../src/StructureAdjustmentService.php | 54 ++++++++----------- 1 file changed, 22 insertions(+), 32 deletions(-) diff --git a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php index 51cdaeade4..31d17b56a6 100644 --- a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php +++ b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php @@ -9,13 +9,13 @@ use Neos\ContentRepository\Core\EventStore\DecoratedEvent; use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\EventStore\EventNormalizer; -use Neos\ContentRepository\Core\EventStore\Events; use Neos\ContentRepository\Core\EventStore\EventsToPublish; use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface; use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; use Neos\ContentRepository\Core\NodeType\NodeTypeName; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface; +use Neos\ContentRepository\Core\SharedModel\Id\UuidFactory; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine; use Neos\ContentRepository\StructureAdjustment\Adjustment\DimensionAdjustment; @@ -25,8 +25,8 @@ use Neos\ContentRepository\StructureAdjustment\Adjustment\TetheredNodeAdjustments; use Neos\ContentRepository\StructureAdjustment\Adjustment\UnknownNodeTypeAdjustment; use Neos\EventStore\EventStoreInterface; -use Neos\EventStore\Model\Event\EventId; -use Neos\EventStore\Model\Event\EventMetadata; +use Neos\EventStore\Model\Event\CorrelationId; +use Neos\EventStore\Model\Events; class StructureAdjustmentService implements ContentRepositoryServiceInterface { @@ -114,16 +114,26 @@ public function fixError(StructureAdjustment $adjustment): void $eventsToPublish = $remediation(); assert($eventsToPublish instanceof EventsToPublish); - $eventsWithMetaData = self::eventsWithCausationOfFirstEventAndAdditionalMetaData( - $eventsToPublish->events, - EventMetadata::fromArray([ - 'structureAdjustment' => mb_strimwidth($adjustment->render() , 0, 250, '…') - ]) - ); + // set correlation id and add debug metadata + $correlationId = CorrelationId::fromString(UuidFactory::create()); + $isFirstEvent = true; + $normalizedEvents = Events::fromArray($eventsToPublish->events->map(function (EventInterface|DecoratedEvent $event) use ( + &$isFirstEvent, $correlationId, $adjustment + ) { + $metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : []; + if ($isFirstEvent) { + $metadata['debug_structureAdjustment'] = mb_strimwidth($adjustment->render() , 0, 250, '…'); + $isFirstEvent = false; + } + $decoratedEvent = DecoratedEvent::create( + event: $event, + metadata: $metadata, + correlationId: $correlationId, + ); + + return $this->eventNormalizer->normalize($decoratedEvent); + })); - $normalizedEvents = \Neos\EventStore\Model\Events::fromArray( - $eventsWithMetaData->map($this->eventNormalizer->normalize(...)) - ); $this->eventStore->commit( $eventsToPublish->streamName, $normalizedEvents, @@ -131,24 +141,4 @@ public function fixError(StructureAdjustment $adjustment): void ); $this->subscriptionEngine->catchUpActive(); } - - private static function eventsWithCausationOfFirstEventAndAdditionalMetaData(Events $events, EventMetadata $metadata): Events - { - /** @var non-empty-list $restEvents */ - $restEvents = iterator_to_array($events); - $firstEvent = array_shift($restEvents); - - if ($firstEvent instanceof DecoratedEvent && $firstEvent->eventMetadata) { - $metadata = EventMetadata::fromArray(array_merge($firstEvent->eventMetadata->value, $metadata->value)); - } - - $decoratedFirstEvent = DecoratedEvent::create($firstEvent, eventId: EventId::create(), metadata: $metadata); - - $decoratedRestEvents = []; - foreach ($restEvents as $event) { - $decoratedRestEvents[] = DecoratedEvent::create($event, causationId: $decoratedFirstEvent->eventId); - } - - return Events::fromArray([$decoratedFirstEvent, ...$decoratedRestEvents]); - } } From 21ecddc6ae437a49997d6513a3e38630fa5ffd22 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 26 Jan 2025 21:51:38 +0100 Subject: [PATCH 03/12] TASK: Add debug event metadata why a content stream was closed --- .../Classes/Feature/ContentStreamHandling.php | 11 +++++--- .../Feature/WorkspaceCommandHandler.php | 25 +++++++++++++------ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php index a16aff28aa..83a83e2091 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php +++ b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php @@ -5,6 +5,7 @@ namespace Neos\ContentRepository\Core\Feature; use Neos\ContentRepository\Core\CommandHandler\CommandHandlingDependencies; +use Neos\ContentRepository\Core\EventStore\DecoratedEvent; use Neos\ContentRepository\Core\EventStore\Events; use Neos\ContentRepository\Core\EventStore\EventsToPublish; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed; @@ -30,15 +31,19 @@ trait ContentStreamHandling private function closeContentStream( ContentStreamId $contentStreamId, Version $contentStreamVersion, + string $causationCommandClassName ): EventsToPublish { $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(); return new EventsToPublish( $streamName, Events::with( - new ContentStreamWasClosed( - $contentStreamId, - ), + DecoratedEvent::create( + new ContentStreamWasClosed( + $contentStreamId, + ), + metadata: array_filter(['debug_causationCommand' => substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1)]) + ) ), ExpectedVersion::fromVersion($contentStreamVersion) ); diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index ff3b6c5fac..a1253f3390 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -206,7 +206,8 @@ private function handlePublishWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion + $workspaceContentStreamVersion, + $command::class ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -242,7 +243,8 @@ static function ($handle) use ($rebaseableCommands): void { ); } catch (ConcurrencyException $concurrencyException) { yield $this->reopenContentStreamWithoutConstraintChecks( - $workspace->currentContentStreamId + $workspace->currentContentStreamId, + sprintf('concurrency %d: %s', $concurrencyException->getCode(), $concurrencyException->getMessage()) ); throw $concurrencyException; } @@ -350,7 +352,8 @@ private function handleRebaseWorkspace( // if we have no changes in the workspace we can fork from the base directly yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion + $workspaceContentStreamVersion, + $command::class ); yield from $this->rebaseWorkspaceWithoutChanges( @@ -371,7 +374,8 @@ private function handleRebaseWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion + $workspaceContentStreamVersion, + $command::class ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -458,7 +462,8 @@ private function handlePublishIndividualNodesFromWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion + $workspaceContentStreamVersion, + $command::class ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -579,7 +584,8 @@ private function handleDiscardIndividualNodesFromWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion + $workspaceContentStreamVersion, + $command::class ); if ($commandsToKeep->isEmpty()) { @@ -790,8 +796,11 @@ private function forkNewContentStreamAndApplyEvents( $sourceContentStreamId, $sourceContentStreamVersion )->withAppendedEvents(Events::with( - new ContentStreamWasClosed( - $newContentStreamId + DecoratedEvent::create( + new ContentStreamWasClosed( + $newContentStreamId + ), + metadata: ['debug_reason' => sprintf('Forking %s from %s to publish %d events', $newContentStreamId->value, $sourceContentStreamId->value, $eventsToApplyOnNewContentStream?->count() ?? 0)] ) )); From 182e20498e8ccc9d07e43ce6f14997d9897bf93d Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 26 Jan 2025 21:55:53 +0100 Subject: [PATCH 04/12] TASK: Add debug event metadata why a content stream was reopened (using the exception message) While before #4965 a even `WorkspaceRebaseFailed` would contain bloats of data which grew a lot. A small message in the closed even is not a problem though. --- .../Classes/Feature/ContentStreamHandling.php | 12 +++-- .../Feature/WorkspaceCommandHandler.php | 46 +++++++++++-------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php index 83a83e2091..053352f98e 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php +++ b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php @@ -55,16 +55,20 @@ private function closeContentStream( */ private function reopenContentStreamWithoutConstraintChecks( ContentStreamId $contentStreamId, + string $debugReason ): EventsToPublish { return new EventsToPublish( ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(), Events::with( - new ContentStreamWasReopened( - $contentStreamId - ), + DecoratedEvent::create( + new ContentStreamWasReopened( + $contentStreamId + ), + metadata: array_filter(['debug_reason' => $debugReason]) + ) ), // We operate here without constraints on purpose to ensure this can be commited. - //Constraints have been checked beforehand and its expected that the content stream is closed. + // Constraints have been checked beforehand and its expected that the content stream is closed. ExpectedVersion::ANY() ); } diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index a1253f3390..1f206d1d7e 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -221,10 +221,12 @@ static function ($handle) use ($rebaseableCommands): void { ); if ($commandSimulator->hasConflicts()) { + $workspaceRebaseFailed = WorkspaceRebaseFailed::duringPublish($commandSimulator->getConflictingEvents()); yield $this->reopenContentStreamWithoutConstraintChecks( - $workspace->currentContentStreamId + $workspace->currentContentStreamId, + sprintf('conflicts %d: %s', $workspaceRebaseFailed->getCode(), $workspaceRebaseFailed->getMessage()) ); - throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getConflictingEvents()); + throw $workspaceRebaseFailed; } $eventsOfWorkspaceToPublish = $this->getCopiedEventsOfEventStream( @@ -392,12 +394,13 @@ static function ($handle) use ($rebaseableCommands): void { $command->rebaseErrorHandlingStrategy === RebaseErrorHandlingStrategy::STRATEGY_FAIL && $commandSimulator->hasConflicts() ) { + // throw an exception that contains all the information about what exactly failed + $workspaceRebaseFailed = WorkspaceRebaseFailed::duringRebase($commandSimulator->getConflictingEvents()); yield $this->reopenContentStreamWithoutConstraintChecks( - $workspace->currentContentStreamId + $workspace->currentContentStreamId, + sprintf('conflicts %d: %s', $workspaceRebaseFailed->getCode(), $workspaceRebaseFailed->getMessage()) ); - - // throw an exception that contains all the information about what exactly failed - throw WorkspaceRebaseFailed::duringRebase($commandSimulator->getConflictingEvents()); + throw $workspaceRebaseFailed; } // if we got so far without an exception (or if we don't care), we can switch the workspace's active content stream. @@ -482,17 +485,19 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC ); if ($commandSimulator->hasConflicts()) { - yield $this->reopenContentStreamWithoutConstraintChecks( - $workspace->currentContentStreamId - ); - match ($workspace->status) { + $workspaceRebaseFailed = match ($workspace->status) { // If the workspace is up-to-date it must be a problem regarding that the order of events cannot be changed WorkspaceStatus::UP_TO_DATE => - throw PartialWorkspaceRebaseFailed::duringPartialPublish($commandSimulator->getConflictingEvents()), + PartialWorkspaceRebaseFailed::duringPartialPublish($commandSimulator->getConflictingEvents()), // If the workspace is outdated we cannot know for sure but suspect that the conflict arose due to changes in the base workspace. WorkspaceStatus::OUTDATED => - throw WorkspaceRebaseFailed::duringPublish($commandSimulator->getConflictingEvents()) + WorkspaceRebaseFailed::duringPublish($commandSimulator->getConflictingEvents()) }; + yield $this->reopenContentStreamWithoutConstraintChecks( + $workspace->currentContentStreamId, + sprintf('conflicts %d: %s', $workspaceRebaseFailed->getCode(), $workspaceRebaseFailed->getMessage()) + ); + throw $workspaceRebaseFailed; } $selectedEventsOfWorkspaceToPublish = $this->getCopiedEventsOfEventStream( @@ -511,7 +516,8 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC ); } catch (ConcurrencyException $concurrencyException) { yield $this->reopenContentStreamWithoutConstraintChecks( - $workspace->currentContentStreamId + $workspace->currentContentStreamId, + sprintf('concurrency %d: %s', $concurrencyException->getCode(), $concurrencyException->getMessage()) ); throw $concurrencyException; } @@ -610,17 +616,19 @@ static function ($handle) use ($commandsToKeep): void { ); if ($commandSimulator->hasConflicts()) { - yield $this->reopenContentStreamWithoutConstraintChecks( - $workspace->currentContentStreamId - ); - match ($workspace->status) { + $workspaceRebaseFailed = match ($workspace->status) { // If the workspace is up-to-date it must be a problem regarding that the order of events cannot be changed WorkspaceStatus::UP_TO_DATE => - throw PartialWorkspaceRebaseFailed::duringPartialDiscard($commandSimulator->getConflictingEvents()), + PartialWorkspaceRebaseFailed::duringPartialDiscard($commandSimulator->getConflictingEvents()), // If the workspace is outdated we cannot know for sure but suspect that the conflict arose due to changes in the base workspace. WorkspaceStatus::OUTDATED => - throw WorkspaceRebaseFailed::duringDiscard($commandSimulator->getConflictingEvents()) + WorkspaceRebaseFailed::duringDiscard($commandSimulator->getConflictingEvents()) }; + yield $this->reopenContentStreamWithoutConstraintChecks( + $workspace->currentContentStreamId, + sprintf('conflicts %d: %s', $workspaceRebaseFailed->getCode(), $workspaceRebaseFailed->getMessage()) + ); + throw $workspaceRebaseFailed; } yield from $this->forkNewContentStreamAndApplyEvents( From bde53552a3f8410f0c78ebcdca9c923fd5029a6a Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 26 Jan 2025 21:57:51 +0100 Subject: [PATCH 05/12] TASK: Remove obsolete 'and 0 further conflicts' --- .../Feature/WorkspaceRebase/Exception/WorkspaceRebaseFailed.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceRebase/Exception/WorkspaceRebaseFailed.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceRebase/Exception/WorkspaceRebaseFailed.php index 8f6c8ae416..51ec5c8077 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceRebase/Exception/WorkspaceRebaseFailed.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceRebase/Exception/WorkspaceRebaseFailed.php @@ -86,6 +86,6 @@ public static function duringDiscard(ConflictingEvents $conflictingEvents): self private static function renderMessage(ConflictingEvents $conflictingEvents): string { $firstConflict = $conflictingEvents->first(); - return sprintf('"%s" and %d further conflicts', $firstConflict?->getException()->getMessage(), count($conflictingEvents) - 1); + return sprintf('"%s"%s', $firstConflict?->getException()->getMessage(), count($conflictingEvents) > 1 ? sprintf(' and %d further conflicts', count($conflictingEvents) - 1) : ''); } } From 0d12062775a380f65b3eb3929441516559e3e245 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 26 Jan 2025 22:14:09 +0100 Subject: [PATCH 06/12] TASK: Prepare content repository to set correlation ids --- .../Classes/ContentRepository.php | 28 +++++++++---------- .../Classes/EventStore/EventNormalizer.php | 8 ------ 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 1658d1d045..b9f920e668 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -21,6 +21,7 @@ use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint; use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph; use Neos\ContentRepository\Core\EventStore\EventNormalizer; +use Neos\ContentRepository\Core\EventStore\Events as DomainEvents; use Neos\ContentRepository\Core\EventStore\EventsToPublish; use Neos\ContentRepository\Core\EventStore\InitiatingEventMetadata; use Neos\ContentRepository\Core\Feature\Security\AuthProviderInterface; @@ -41,6 +42,7 @@ use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; +use Neos\EventStore\Model\Events; use Psr\Clock\ClockInterface; /** @@ -93,8 +95,7 @@ public function handle(CommandInterface $command): void // simple case if ($toPublish instanceof EventsToPublish) { - $eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish); - $this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion); + $this->eventStore->commit($toPublish->streamName, $this->enrichAndNormaliseEvents($toPublish->events), $toPublish->expectedVersion); $fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it. if ($fullCatchUpResult->hadErrors()) { throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors); @@ -104,10 +105,9 @@ public function handle(CommandInterface $command): void // control-flow aware command handling via generator try { - foreach ($toPublish as $yieldedEventsToPublish) { - $eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish); + foreach ($toPublish as $eventsToPublish) { try { - $this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion); + $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormaliseEvents($eventsToPublish->events), $eventsToPublish->expectedVersion); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -119,7 +119,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->eventNormalizer->normalizeEvents($yieldedErrorStrategy->events), $yieldedErrorStrategy->expectedVersion); + $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormaliseEvents($yieldedErrorStrategy->events), $yieldedErrorStrategy->expectedVersion); } throw $concurrencyException; } @@ -204,19 +204,17 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface return $this->contentDimensionSource; } - private function enrichEventsToPublishWithMetadata(EventsToPublish $eventsToPublish): EventsToPublish + private function enrichAndNormaliseEvents(DomainEvents $events): Events { $initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser(); $initiatingTimestamp = $this->clock->now(); - return new EventsToPublish( - $eventsToPublish->streamName, - InitiatingEventMetadata::enrichEventsWithInitiatingMetadata( - $eventsToPublish->events, - $initiatingUserId, - $initiatingTimestamp - ), - $eventsToPublish->expectedVersion, + $events = InitiatingEventMetadata::enrichEventsWithInitiatingMetadata( + $events, + $initiatingUserId, + $initiatingTimestamp ); + + return Events::fromArray($events->map($this->eventNormalizer->normalize(...))); } } diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventNormalizer.php b/Neos.ContentRepository.Core/Classes/EventStore/EventNormalizer.php index 6cb9bdd474..028700f972 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventNormalizer.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventNormalizer.php @@ -4,7 +4,6 @@ namespace Neos\ContentRepository\Core\EventStore; -use Neos\ContentRepository\Core\EventStore\Events as DomainEvents; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed; use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened; use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; @@ -26,7 +25,6 @@ use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated; use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated; use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated; -use Neos\ContentRepository\Core\Feature\SubtreeTagging\Dto\SubtreeTag; use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged; use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated; @@ -45,7 +43,6 @@ use Neos\EventStore\Model\Event\EventData; use Neos\EventStore\Model\Event\EventId; use Neos\EventStore\Model\Event\EventType; -use Neos\EventStore\Model\Events; /** * Central authority to convert Content Repository domain events to Event Store EventData and EventType, vice versa. @@ -156,11 +153,6 @@ public function normalize(EventInterface|DecoratedEvent $event): Event ); } - public function normalizeEvents(DomainEvents $events): Events - { - return Events::fromArray($events->map($this->normalize(...))); - } - public function denormalize(Event $event): EventInterface { $eventClassName = $this->getEventClassName($event); From cc6fcb37d0af41e64c00597b60874aa1579f784b Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 26 Jan 2025 22:26:27 +0100 Subject: [PATCH 07/12] TASK: Set correlation ids #3887 in ContentRepository::handle() The refactored publishing V3 opens another now used advantage: For commands with multiple events like workspace publishing, we can now centrally add metadata like a simple correlation id. Note that for 'simple' commands we dont need to do this as `RebaseableCommand::enrichWithCommand` will already group them with a causationId which is what we decided to use for tethered nodes for example. The id never changes for causation ids. --- .../Classes/ContentRepository.php | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index b9f920e668..91469755d7 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -20,6 +20,8 @@ use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint; use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph; +use Neos\ContentRepository\Core\EventStore\DecoratedEvent; +use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\EventStore\EventNormalizer; use Neos\ContentRepository\Core\EventStore\Events as DomainEvents; use Neos\ContentRepository\Core\EventStore\EventsToPublish; @@ -35,6 +37,8 @@ use Neos\ContentRepository\Core\Projection\ProjectionStates; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist; +use Neos\ContentRepository\Core\SharedModel\Id\UuidFactory; +use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreams; use Neos\ContentRepository\Core\SharedModel\Workspace\Workspace; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces; @@ -42,6 +46,7 @@ use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; +use Neos\EventStore\Model\Event\CorrelationId; use Neos\EventStore\Model\Events; use Psr\Clock\ClockInterface; @@ -95,7 +100,7 @@ public function handle(CommandInterface $command): void // simple case if ($toPublish instanceof EventsToPublish) { - $this->eventStore->commit($toPublish->streamName, $this->enrichAndNormaliseEvents($toPublish->events), $toPublish->expectedVersion); + $this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, correlationId: null), $toPublish->expectedVersion); $fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it. if ($fullCatchUpResult->hadErrors()) { throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors); @@ -104,10 +109,11 @@ public function handle(CommandInterface $command): void } // control-flow aware command handling via generator + $correlationId = CorrelationId::fromString(UuidFactory::create()); try { foreach ($toPublish as $eventsToPublish) { try { - $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormaliseEvents($eventsToPublish->events), $eventsToPublish->expectedVersion); + $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -119,7 +125,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormaliseEvents($yieldedErrorStrategy->events), $yieldedErrorStrategy->expectedVersion); + $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion); } throw $concurrencyException; } @@ -204,7 +210,7 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface return $this->contentDimensionSource; } - private function enrichAndNormaliseEvents(DomainEvents $events): Events + private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId): Events { $initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser(); $initiatingTimestamp = $this->clock->now(); @@ -215,6 +221,9 @@ private function enrichAndNormaliseEvents(DomainEvents $events): Events $initiatingTimestamp ); - return Events::fromArray($events->map($this->eventNormalizer->normalize(...))); + return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) { + $decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId); + return $this->eventNormalizer->normalize($decoratedEvent); + })); } } From 5b75ccb3c6ec35494f8c0170c219e9afabd24610 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 26 Jan 2025 22:44:03 +0100 Subject: [PATCH 08/12] TASK: Add metadata which command class caused publish / workspace operation to first event For commands with multiple events like workspace publishing, we can now add to the event metadata like the causation command short class name. Via the correlation id they are grouped so we only add this information to the first event, Note that for 'simple' commands we dont need to do this as `RebaseableCommand::enrichWithCommand` will actually fully serialize the command into commandName and payload --- .../Classes/ContentRepository.php | 26 ++++++++++++++----- .../Classes/Feature/ContentStreamHandling.php | 10 +++---- .../Feature/WorkspaceCommandHandler.php | 15 ++++------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 91469755d7..dcb84d3e6d 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -100,7 +100,7 @@ public function handle(CommandInterface $command): void // simple case if ($toPublish instanceof EventsToPublish) { - $this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, correlationId: null), $toPublish->expectedVersion); + $this->eventStore->commit($toPublish->streamName, Events::fromArray($this->enrichEventsWithInitiatingMetadata($toPublish->events)->map($this->eventNormalizer->normalize(...))), $toPublish->expectedVersion); $fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it. if ($fullCatchUpResult->hadErrors()) { throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors); @@ -109,11 +109,13 @@ public function handle(CommandInterface $command): void } // control-flow aware command handling via generator + $isFirstEvent = true; + $causationCommandClassName = $command::class; $correlationId = CorrelationId::fromString(UuidFactory::create()); try { foreach ($toPublish as $eventsToPublish) { try { - $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion); + $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId, $isFirstEvent, $causationCommandClassName), $eventsToPublish->expectedVersion); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -125,7 +127,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion); + $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId, $isFirstEvent, $causationCommandClassName), $yieldedErrorStrategy->expectedVersion); } throw $concurrencyException; } @@ -210,19 +212,29 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface return $this->contentDimensionSource; } - private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId): Events + private function enrichEventsWithInitiatingMetadata(DomainEvents $events): DomainEvents { $initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser(); $initiatingTimestamp = $this->clock->now(); - $events = InitiatingEventMetadata::enrichEventsWithInitiatingMetadata( + return InitiatingEventMetadata::enrichEventsWithInitiatingMetadata( $events, $initiatingUserId, $initiatingTimestamp ); + } + + private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId, bool &$isFirstEvent, string $causationCommandClassName): Events + { + $events = $this->enrichEventsWithInitiatingMetadata($events); - return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) { - $decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId); + return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId, $causationCommandClassName, &$isFirstEvent) { + $metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : []; + if ($isFirstEvent) { + $metadata['debug_causationCommand'] = substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1); + $isFirstEvent = false; + } + $decoratedEvent = DecoratedEvent::create($event, metadata: $metadata, correlationId: $correlationId); return $this->eventNormalizer->normalize($decoratedEvent); })); } diff --git a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php index 053352f98e..8432cd74c3 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php +++ b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php @@ -31,19 +31,15 @@ trait ContentStreamHandling private function closeContentStream( ContentStreamId $contentStreamId, Version $contentStreamVersion, - string $causationCommandClassName ): EventsToPublish { $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(); return new EventsToPublish( $streamName, Events::with( - DecoratedEvent::create( - new ContentStreamWasClosed( - $contentStreamId, - ), - metadata: array_filter(['debug_causationCommand' => substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1)]) - ) + new ContentStreamWasClosed( + $contentStreamId, + ), ), ExpectedVersion::fromVersion($contentStreamVersion) ); diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index 1f206d1d7e..e3c47cdb38 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -206,8 +206,7 @@ private function handlePublishWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -354,8 +353,7 @@ private function handleRebaseWorkspace( // if we have no changes in the workspace we can fork from the base directly yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); yield from $this->rebaseWorkspaceWithoutChanges( @@ -376,8 +374,7 @@ private function handleRebaseWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -465,8 +462,7 @@ private function handlePublishIndividualNodesFromWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -590,8 +586,7 @@ private function handleDiscardIndividualNodesFromWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); if ($commandsToKeep->isEmpty()) { From c522ffe161714b544e6f1ca90a0a926b2bc7fa29 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Mon, 27 Jan 2025 10:43:18 +0100 Subject: [PATCH 09/12] TASK: Base correlation id on command class name ... instead of encoding the command into `debug_causationCommand` --- .../Classes/ContentRepository.php | 28 ++++++------------- .../src/StructureAdjustmentService.php | 4 +-- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index dcb84d3e6d..1d2ece621e 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -97,10 +97,11 @@ public function handle(CommandInterface $command): void } $toPublish = $this->commandBus->handle($command); + $correlationId = CorrelationId::fromString(sprintf('%s_%s', substr($command::class, strrpos($command::class, '\\') + 1, 20), bin2hex(random_bytes(9)))); // simple case if ($toPublish instanceof EventsToPublish) { - $this->eventStore->commit($toPublish->streamName, Events::fromArray($this->enrichEventsWithInitiatingMetadata($toPublish->events)->map($this->eventNormalizer->normalize(...))), $toPublish->expectedVersion); + $this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, $correlationId), $toPublish->expectedVersion); $fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it. if ($fullCatchUpResult->hadErrors()) { throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors); @@ -109,13 +110,10 @@ public function handle(CommandInterface $command): void } // control-flow aware command handling via generator - $isFirstEvent = true; - $causationCommandClassName = $command::class; - $correlationId = CorrelationId::fromString(UuidFactory::create()); try { foreach ($toPublish as $eventsToPublish) { try { - $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId, $isFirstEvent, $causationCommandClassName), $eventsToPublish->expectedVersion); + $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -127,7 +125,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId, $isFirstEvent, $causationCommandClassName), $yieldedErrorStrategy->expectedVersion); + $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion); } throw $concurrencyException; } @@ -212,29 +210,19 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface return $this->contentDimensionSource; } - private function enrichEventsWithInitiatingMetadata(DomainEvents $events): DomainEvents + private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId $correlationId): Events { $initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser(); $initiatingTimestamp = $this->clock->now(); - return InitiatingEventMetadata::enrichEventsWithInitiatingMetadata( + $eventsWithMetaData = InitiatingEventMetadata::enrichEventsWithInitiatingMetadata( $events, $initiatingUserId, $initiatingTimestamp ); - } - - private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId, bool &$isFirstEvent, string $causationCommandClassName): Events - { - $events = $this->enrichEventsWithInitiatingMetadata($events); - return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId, $causationCommandClassName, &$isFirstEvent) { - $metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : []; - if ($isFirstEvent) { - $metadata['debug_causationCommand'] = substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1); - $isFirstEvent = false; - } - $decoratedEvent = DecoratedEvent::create($event, metadata: $metadata, correlationId: $correlationId); + return Events::fromArray($eventsWithMetaData->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) { + $decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId); return $this->eventNormalizer->normalize($decoratedEvent); })); } diff --git a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php index 31d17b56a6..4987fcfa72 100644 --- a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php +++ b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php @@ -115,14 +115,14 @@ public function fixError(StructureAdjustment $adjustment): void assert($eventsToPublish instanceof EventsToPublish); // set correlation id and add debug metadata - $correlationId = CorrelationId::fromString(UuidFactory::create()); + $correlationId = CorrelationId::fromString(sprintf('StructureAdjustment_%s', bin2hex(random_bytes(9)))); $isFirstEvent = true; $normalizedEvents = Events::fromArray($eventsToPublish->events->map(function (EventInterface|DecoratedEvent $event) use ( &$isFirstEvent, $correlationId, $adjustment ) { $metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : []; if ($isFirstEvent) { - $metadata['debug_structureAdjustment'] = mb_strimwidth($adjustment->render() , 0, 250, '…'); + $metadata['debug_reason'] = mb_strimwidth($adjustment->render() , 0, 250, '…'); $isFirstEvent = false; } $decoratedEvent = DecoratedEvent::create( From 4a8be69a0db4e4ed0214426b351c39ff516ae83f Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Mon, 27 Jan 2025 16:49:38 +0100 Subject: [PATCH 10/12] TASK: Add debug metadata to `ContentStreamWasForked` #4758 --- .../Classes/Feature/ContentStreamHandling.php | 16 +++++---- .../Feature/WorkspaceCommandHandler.php | 35 +++++++++++-------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php index 8432cd74c3..9f4ccdfa71 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php +++ b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php @@ -79,16 +79,20 @@ private function reopenContentStreamWithoutConstraintChecks( private function forkContentStream( ContentStreamId $newContentStreamId, ContentStreamId $sourceContentStreamId, - Version $sourceContentStreamVersion + Version $sourceContentStreamVersion, + string $debugReason ): EventsToPublish { return new EventsToPublish( ContentStreamEventStreamName::fromContentStreamId($newContentStreamId)->getEventStreamName(), Events::with( - new ContentStreamWasForked( - $newContentStreamId, - $sourceContentStreamId, - $sourceContentStreamVersion, - ), + DecoratedEvent::create( + event: new ContentStreamWasForked( + $newContentStreamId, + $sourceContentStreamId, + $sourceContentStreamVersion, + ), + metadata: ['debug_reason' => $debugReason] + ) ), // NO_STREAM to ensure the "fork" happens as the first event of the new content stream ExpectedVersion::NO_STREAM() diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index e3c47cdb38..f06de074a3 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -135,7 +135,8 @@ private function handleCreateWorkspace( yield $this->forkContentStream( $command->newContentStreamId, $baseWorkspace->currentContentStreamId, - $sourceContentStreamVersion + $sourceContentStreamVersion, + sprintf('Create workspace %s with base %s', $command->workspaceName->value, $baseWorkspace->workspaceName->value) ); yield new EventsToPublish( @@ -254,7 +255,8 @@ static function ($handle) use ($rebaseableCommands): void { yield $this->forkContentStream( $command->newContentStreamId, $baseWorkspace->currentContentStreamId, - Version::fromInteger($baseWorkspaceContentStreamVersion->value + ($eventsOfWorkspaceToPublish?->count() ?? 0)) + Version::fromInteger($baseWorkspaceContentStreamVersion->value + ($eventsOfWorkspaceToPublish?->count() ?? 0)), + sprintf('Publish workspace %s and fork base %s', $workspace->workspaceName->value, $baseWorkspace->workspaceName->value) ); yield new EventsToPublish( @@ -283,7 +285,8 @@ private function rebaseWorkspaceWithoutChanges( yield $this->forkContentStream( $newContentStreamId, $baseWorkspace->currentContentStreamId, - $baseWorkspaceContentStreamVersion + $baseWorkspaceContentStreamVersion, + sprintf('Rebase empty workspace %s and fork base %s', $workspace->workspaceName->value, $baseWorkspace->workspaceName->value) ); yield new EventsToPublish( @@ -422,7 +425,8 @@ static function ($handle) use ($rebaseableCommands): void { $command->workspaceName, $command->rebasedContentStreamId, $commandSimulator->eventStream(), - ) + ), + sprintf('Rebase %s and fork base %s', $command->workspaceName->value, $baseWorkspace->workspaceName->value) ); yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); @@ -540,7 +544,8 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC $command->workspaceName, $command->contentStreamIdForRemainingPart, $commandSimulator->eventStream()->withMinimumSequenceNumber($highestSequenceNumberForMatching->next()) - ) + ), + sprintf('Partial publish workspace %s and fork base %s', $command->workspaceName->value, $baseWorkspace->workspaceName->value) ); yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); @@ -646,7 +651,8 @@ static function ($handle) use ($commandsToKeep): void { $command->workspaceName, $command->newContentStreamId, $commandSimulator->eventStream(), - ) + ), + sprintf('Partial discard workspace %s and fork base %s', $command->workspaceName->value, $baseWorkspace->workspaceName->value) ); yield $this->removeContentStreamWithoutConstraintChecks($workspace->currentContentStreamId); @@ -691,7 +697,8 @@ private function discardWorkspace( yield $this->forkContentStream( $newContentStream, $baseWorkspace->currentContentStreamId, - $baseWorkspaceContentStreamVersion + $baseWorkspaceContentStreamVersion, + sprintf('Discard %s and fork base %s', $workspace->workspaceName->value, $baseWorkspace->workspaceName->value) ); yield new EventsToPublish( @@ -740,7 +747,8 @@ private function handleChangeBaseWorkspace( yield $this->forkContentStream( $command->newContentStreamId, $newBaseWorkspace->currentContentStreamId, - $newBaseWorkspaceContentStreamVersion + $newBaseWorkspaceContentStreamVersion, + sprintf('Change base workspace of %s to %s', $workspace->workspaceName->value, $newBaseWorkspace->workspaceName->value) ); yield new EventsToPublish( @@ -793,17 +801,16 @@ private function forkNewContentStreamAndApplyEvents( Version $sourceContentStreamVersion, EventsToPublish $pointWorkspaceToNewContentStream, Events|null $eventsToApplyOnNewContentStream, + string $debugReasonForFork ): \Generator { yield $this->forkContentStream( $newContentStreamId, $sourceContentStreamId, - $sourceContentStreamVersion + $sourceContentStreamVersion, + $debugReasonForFork . sprintf('; Apply %d events on new (temporary closed) content stream', $eventsToApplyOnNewContentStream?->count() ?? 0) )->withAppendedEvents(Events::with( - DecoratedEvent::create( - new ContentStreamWasClosed( - $newContentStreamId - ), - metadata: ['debug_reason' => sprintf('Forking %s from %s to publish %d events', $newContentStreamId->value, $sourceContentStreamId->value, $eventsToApplyOnNewContentStream?->count() ?? 0)] + new ContentStreamWasClosed( + $newContentStreamId ) )); From 2e0c7b7164846d8e9ead338cebecb41d8bd5c9c0 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Mon, 27 Jan 2025 20:30:16 +0100 Subject: [PATCH 11/12] TASK: Add CorrelationId to imported events --- .../Processors/EventStoreImportProcessor.php | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/Neos.ContentRepository.Export/src/Processors/EventStoreImportProcessor.php b/Neos.ContentRepository.Export/src/Processors/EventStoreImportProcessor.php index 6a98fe988a..931a91c024 100644 --- a/Neos.ContentRepository.Export/src/Processors/EventStoreImportProcessor.php +++ b/Neos.ContentRepository.Export/src/Processors/EventStoreImportProcessor.php @@ -44,8 +44,8 @@ public function __construct( public function run(ProcessingContext $context): void { - /** @var array $domainEvents */ - $domainEvents = []; + /** @var array $events */ + $events = []; $eventFileResource = $context->files->readStream('events.jsonl'); /** @var array $eventIdMap */ @@ -68,6 +68,7 @@ public function run(ProcessingContext $context): void throw new \InvalidArgumentException(sprintf('Workspace "%s" does not exist or is not a root workspace', $this->targetWorkspaceName), 1729530978); } + $correlationId = Event\CorrelationId::fromString(sprintf('EventStoreImporter_%s', bin2hex(random_bytes(9)))); while (($line = fgets($eventFileResource)) !== false) { $event = ExportedEvent::fromJson(trim($line)) @@ -83,16 +84,12 @@ public function run(ProcessingContext $context): void ->withIdentifier($newEventId) ->processMetadata(static function (array $metadata) use ($eventIdMap) { $processedMetadata = $metadata; + // todo the causationId is NOT written to the EventMetadata anymore but a dedicated field and must be exported separately /** @var string|null $causationId */ $causationId = $processedMetadata['causationId'] ?? null; if ($causationId !== null && array_key_exists($causationId, $eventIdMap)) { $processedMetadata['causationId'] = $eventIdMap[$causationId]; } - /** @var string|null $correlationId */ - $correlationId = $processedMetadata['correlationId'] ?? null; - if ($correlationId !== null && array_key_exists($correlationId, $eventIdMap)) { - $processedMetadata['correlationId'] = $eventIdMap[$correlationId]; - } return $processedMetadata; }); } @@ -107,15 +104,15 @@ public function run(ProcessingContext $context): void if (in_array($domainEvent::class, [ContentStreamWasCreated::class, ContentStreamWasForked::class, ContentStreamWasRemoved::class], true)) { throw new \RuntimeException(sprintf('Failed to read events. %s is not expected in imported event stream.', $event->type), 1729506757); } - $domainEvent = DecoratedEvent::create($domainEvent, eventId: EventId::fromString($event->identifier), metadata: $event->metadata); - $domainEvents[] = $this->eventNormalizer->normalize($domainEvent); + $domainEvent = DecoratedEvent::create($domainEvent, eventId: EventId::fromString($event->identifier), metadata: $event->metadata, correlationId: $correlationId); + $events[] = $this->eventNormalizer->normalize($domainEvent); } $contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($rootWorkspaceContentStreamId)->getEventStreamName(); try { - $this->eventStore->commit($contentStreamStreamName, Events::fromArray($domainEvents), ExpectedVersion::fromVersion(Version::first())); + $this->eventStore->commit($contentStreamStreamName, Events::fromArray($events), ExpectedVersion::fromVersion(Version::first())); } catch (ConcurrencyException $e) { - throw new \RuntimeException(sprintf('Failed to publish %d events because the content stream "%s" for workspace "%s" already contains events. Please consider to prune the content repository first via `./flow site:pruneAll`.', count($domainEvents), $contentStreamStreamName->value, $this->targetWorkspaceName->value), 1729506818, $e); + throw new \RuntimeException(sprintf('Failed to publish %d events because the content stream "%s" for workspace "%s" already contains events. Please consider to prune the content repository first via `./flow site:pruneAll`.', count($events), $contentStreamStreamName->value, $this->targetWorkspaceName->value), 1729506818, $e); } } } From 2647207382391a5750ba0d16dce788324b95a49c Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Mon, 27 Jan 2025 20:30:56 +0100 Subject: [PATCH 12/12] TASK: Add CorrelationId to removed events from cs pruner --- .../Classes/Service/ContentStreamPruner.php | 11 +++++++++-- .../ContentStreamPruner/ContentStreamForPruning.php | 4 ++-- .../src/StructureAdjustmentService.php | 6 +----- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php index 482586ab62..21ca4c6598 100644 --- a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php @@ -4,6 +4,7 @@ namespace Neos\ContentRepository\Core\Service; +use Neos\ContentRepository\Core\EventStore\DecoratedEvent; use Neos\ContentRepository\Core\EventStore\EventNormalizer; use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface; use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; @@ -22,6 +23,7 @@ use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine; use Neos\EventStore\EventStoreInterface; +use Neos\EventStore\Model\Event\CorrelationId; use Neos\EventStore\Model\Event\EventType; use Neos\EventStore\Model\Event\EventTypes; use Neos\EventStore\Model\EventStream\EventStreamFilter; @@ -129,6 +131,7 @@ public function removeDanglingContentStreams(\Closure $outputFn, \DateTimeImmuta { $allContentStreams = $this->findAllContentStreams(); + $correlationId = CorrelationId::fromString(sprintf('ContentStreamPruner_%s', bin2hex(random_bytes(9)))); $danglingContentStreamsPresent = false; foreach ($allContentStreams as $contentStream) { if (!$contentStream->isDangling()) { @@ -145,8 +148,12 @@ public function removeDanglingContentStreams(\Closure $outputFn, \DateTimeImmuta $this->eventStore->commit( ContentStreamEventStreamName::fromContentStreamId($contentStream->id)->getEventStreamName(), $this->eventNormalizer->normalize( - new ContentStreamWasRemoved( - $contentStream->id + DecoratedEvent::create( + new ContentStreamWasRemoved( + $contentStream->id + ), + metadata: ['debug_reason' => sprintf('Removed dangling content stream with status %s', $contentStream->status->value)], + correlationId: $correlationId ) ), ExpectedVersion::STREAM_EXISTS() diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner/ContentStreamForPruning.php b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner/ContentStreamForPruning.php index 9264fc8b9f..bac98a0fe8 100644 --- a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner/ContentStreamForPruning.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner/ContentStreamForPruning.php @@ -71,13 +71,13 @@ public static function create( ContentStreamId $id, ContentStreamStatus $status, ?ContentStreamId $sourceContentStreamId, - \DateTimeImmutable $create, + \DateTimeImmutable $created, ): self { return new self( $id, $status, $sourceContentStreamId, - $create, + $created, false ); } diff --git a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php index 4987fcfa72..66922b31ad 100644 --- a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php +++ b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php @@ -134,11 +134,7 @@ public function fixError(StructureAdjustment $adjustment): void return $this->eventNormalizer->normalize($decoratedEvent); })); - $this->eventStore->commit( - $eventsToPublish->streamName, - $normalizedEvents, - $eventsToPublish->expectedVersion - ); + $this->eventStore->commit($eventsToPublish->streamName, $normalizedEvents, $eventsToPublish->expectedVersion); $this->subscriptionEngine->catchUpActive(); } }