Skip to content

Commit

Permalink
TASK: Add CorrelationId to imported events
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Jan 27, 2025
1 parent 4a8be69 commit 2e0c7b7
Showing 1 changed file with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public function __construct(

public function run(ProcessingContext $context): void
{
/** @var array<Event> $domainEvents */
$domainEvents = [];
/** @var array<Event> $events */
$events = [];
$eventFileResource = $context->files->readStream('events.jsonl');

/** @var array<string, string> $eventIdMap */
Expand All @@ -68,6 +68,7 @@ public function run(ProcessingContext $context): void
throw new \InvalidArgumentException(sprintf('Workspace "%s" does not exist or is not a root workspace', $this->targetWorkspaceName), 1729530978);
}

$correlationId = Event\CorrelationId::fromString(sprintf('EventStoreImporter_%s', bin2hex(random_bytes(9))));
while (($line = fgets($eventFileResource)) !== false) {
$event =
ExportedEvent::fromJson(trim($line))
Expand All @@ -83,16 +84,12 @@ public function run(ProcessingContext $context): void
->withIdentifier($newEventId)
->processMetadata(static function (array $metadata) use ($eventIdMap) {
$processedMetadata = $metadata;
// todo the causationId is NOT written to the EventMetadata anymore but a dedicated field and must be exported separately
/** @var string|null $causationId */
$causationId = $processedMetadata['causationId'] ?? null;
if ($causationId !== null && array_key_exists($causationId, $eventIdMap)) {
$processedMetadata['causationId'] = $eventIdMap[$causationId];
}
/** @var string|null $correlationId */
$correlationId = $processedMetadata['correlationId'] ?? null;
if ($correlationId !== null && array_key_exists($correlationId, $eventIdMap)) {
$processedMetadata['correlationId'] = $eventIdMap[$correlationId];
}
return $processedMetadata;
});
}
Expand All @@ -107,15 +104,15 @@ public function run(ProcessingContext $context): void
if (in_array($domainEvent::class, [ContentStreamWasCreated::class, ContentStreamWasForked::class, ContentStreamWasRemoved::class], true)) {
throw new \RuntimeException(sprintf('Failed to read events. %s is not expected in imported event stream.', $event->type), 1729506757);
}
$domainEvent = DecoratedEvent::create($domainEvent, eventId: EventId::fromString($event->identifier), metadata: $event->metadata);
$domainEvents[] = $this->eventNormalizer->normalize($domainEvent);
$domainEvent = DecoratedEvent::create($domainEvent, eventId: EventId::fromString($event->identifier), metadata: $event->metadata, correlationId: $correlationId);
$events[] = $this->eventNormalizer->normalize($domainEvent);
}

$contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($rootWorkspaceContentStreamId)->getEventStreamName();
try {
$this->eventStore->commit($contentStreamStreamName, Events::fromArray($domainEvents), ExpectedVersion::fromVersion(Version::first()));
$this->eventStore->commit($contentStreamStreamName, Events::fromArray($events), ExpectedVersion::fromVersion(Version::first()));
} catch (ConcurrencyException $e) {
throw new \RuntimeException(sprintf('Failed to publish %d events because the content stream "%s" for workspace "%s" already contains events. Please consider to prune the content repository first via `./flow site:pruneAll`.', count($domainEvents), $contentStreamStreamName->value, $this->targetWorkspaceName->value), 1729506818, $e);
throw new \RuntimeException(sprintf('Failed to publish %d events because the content stream "%s" for workspace "%s" already contains events. Please consider to prune the content repository first via `./flow site:pruneAll`.', count($events), $contentStreamStreamName->value, $this->targetWorkspaceName->value), 1729506818, $e);
}
}
}

0 comments on commit 2e0c7b7

Please sign in to comment.