From bc79d6e1f53cf35584b0006e17c8a72f4b67f150 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sun, 26 Jan 2025 22:26:27 +0100 Subject: [PATCH] TASK: Set correlation ids #3887 in ContentRepository::handle() The refactored publishing V3 opens another now used advantage: For commands with multiple events like workspace publishing, we can now centrally add metadata like a simple correlation id. Note that for 'simple' commands we dont need to do this as `RebaseableCommand::enrichWithCommand` will already group them with a causationId which is what we decided to use for tethered nodes for example. The id never changes for causation ids. --- .../Classes/ContentRepository.php | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 7278f4ce9b..8f60277680 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -20,6 +20,8 @@ use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint; use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph; +use Neos\ContentRepository\Core\EventStore\DecoratedEvent; +use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\EventStore\EventNormalizer; use Neos\ContentRepository\Core\EventStore\Events as DomainEvents; use Neos\ContentRepository\Core\EventStore\EventsToPublish; @@ -35,6 +37,7 @@ use Neos\ContentRepository\Core\Projection\ProjectionStates; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist; +use Neos\ContentRepository\Core\SharedModel\Id\UuidFactory; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStream; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreams; @@ -45,6 +48,7 @@ use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; +use Neos\EventStore\Model\Event\CorrelationId; use Neos\EventStore\Model\Events; use Psr\Clock\ClockInterface; @@ -98,7 +102,7 @@ public function handle(CommandInterface $command): void // simple case if ($toPublish instanceof EventsToPublish) { - $this->eventStore->commit($toPublish->streamName, $this->enrichAndNormaliseEvents($toPublish->events), $toPublish->expectedVersion); + $this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, correlationId: null), $toPublish->expectedVersion); $fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it. if ($fullCatchUpResult->hadErrors()) { throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors); @@ -107,10 +111,11 @@ public function handle(CommandInterface $command): void } // control-flow aware command handling via generator + $correlationId = CorrelationId::fromString(UuidFactory::create()); try { foreach ($toPublish as $eventsToPublish) { try { - $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormaliseEvents($eventsToPublish->events), $eventsToPublish->expectedVersion); + $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -122,7 +127,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormaliseEvents($yieldedErrorStrategy->events), $yieldedErrorStrategy->expectedVersion); + $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion); } throw $concurrencyException; } @@ -217,7 +222,7 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface return $this->contentDimensionSource; } - private function enrichAndNormaliseEvents(DomainEvents $events): Events + private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId): Events { $initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser(); $initiatingTimestamp = $this->clock->now(); @@ -228,6 +233,9 @@ private function enrichAndNormaliseEvents(DomainEvents $events): Events $initiatingTimestamp ); - return Events::fromArray($events->map($this->eventNormalizer->normalize(...))); + return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) { + $decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId); + return $this->eventNormalizer->normalize($decoratedEvent); + })); } }