Skip to content

Commit

Permalink
TASK: Set correlation ids #3887 in ContentRepository::handle()
Browse files Browse the repository at this point in the history
The refactored publishing V3 opens another now used advantage: For commands with multiple events like workspace publishing, we can now centrally add metadata like a simple correlation id.

Note that for 'simple' commands we dont need to do this as `RebaseableCommand::enrichWithCommand` will already group them with a causationId which is what we decided to use for tethered nodes for example. The id never changes for causation ids.
  • Loading branch information
mhsdesign committed Jan 26, 2025
1 parent be0ba17 commit bc79d6e
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint;
use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph;
use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\EventStore\Events as DomainEvents;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
Expand All @@ -35,6 +37,7 @@
use Neos\ContentRepository\Core\Projection\ProjectionStates;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist;
use Neos\ContentRepository\Core\SharedModel\Id\UuidFactory;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStream;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreams;
Expand All @@ -45,6 +48,7 @@
use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event\CorrelationId;
use Neos\EventStore\Model\Events;
use Psr\Clock\ClockInterface;

Expand Down Expand Up @@ -98,7 +102,7 @@ public function handle(CommandInterface $command): void

// simple case
if ($toPublish instanceof EventsToPublish) {
$this->eventStore->commit($toPublish->streamName, $this->enrichAndNormaliseEvents($toPublish->events), $toPublish->expectedVersion);
$this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, correlationId: null), $toPublish->expectedVersion);
$fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it.
if ($fullCatchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors);
Expand All @@ -107,10 +111,11 @@ public function handle(CommandInterface $command): void
}

// control-flow aware command handling via generator
$correlationId = CorrelationId::fromString(UuidFactory::create());
try {
foreach ($toPublish as $eventsToPublish) {
try {
$this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormaliseEvents($eventsToPublish->events), $eventsToPublish->expectedVersion);
$this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion);
} catch (ConcurrencyException $concurrencyException) {
// we pass the exception into the generator (->throw), so it could be try-caught and reacted upon:
//
Expand All @@ -122,7 +127,7 @@ public function handle(CommandInterface $command): void
// }
$yieldedErrorStrategy = $toPublish->throw($concurrencyException);
if ($yieldedErrorStrategy instanceof EventsToPublish) {
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormaliseEvents($yieldedErrorStrategy->events), $yieldedErrorStrategy->expectedVersion);
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion);
}
throw $concurrencyException;
}
Expand Down Expand Up @@ -217,7 +222,7 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface
return $this->contentDimensionSource;
}

private function enrichAndNormaliseEvents(DomainEvents $events): Events
private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId): Events
{
$initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser();
$initiatingTimestamp = $this->clock->now();
Expand All @@ -228,6 +233,9 @@ private function enrichAndNormaliseEvents(DomainEvents $events): Events
$initiatingTimestamp
);

return Events::fromArray($events->map($this->eventNormalizer->normalize(...)));
return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) {
$decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId);
return $this->eventNormalizer->normalize($decoratedEvent);
}));
}
}

0 comments on commit bc79d6e

Please sign in to comment.