Skip to content

Commit

Permalink
TASK: Prepare content repository to set correlation ids
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Jan 26, 2025
1 parent e62ac1f commit be0ba17
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 23 deletions.
28 changes: 13 additions & 15 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint;
use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\EventStore\Events as DomainEvents;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\EventStore\InitiatingEventMetadata;
use Neos\ContentRepository\Core\Feature\Security\AuthProviderInterface;
Expand All @@ -44,6 +45,7 @@
use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Events;
use Psr\Clock\ClockInterface;

/**
Expand Down Expand Up @@ -96,8 +98,7 @@ public function handle(CommandInterface $command): void

// simple case
if ($toPublish instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish);
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion);
$this->eventStore->commit($toPublish->streamName, $this->enrichAndNormaliseEvents($toPublish->events), $toPublish->expectedVersion);
$fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it.
if ($fullCatchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors);
Expand All @@ -107,10 +108,9 @@ public function handle(CommandInterface $command): void

// control-flow aware command handling via generator
try {
foreach ($toPublish as $yieldedEventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish);
foreach ($toPublish as $eventsToPublish) {
try {
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion);
$this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormaliseEvents($eventsToPublish->events), $eventsToPublish->expectedVersion);
} catch (ConcurrencyException $concurrencyException) {
// we pass the exception into the generator (->throw), so it could be try-caught and reacted upon:
//
Expand All @@ -122,7 +122,7 @@ public function handle(CommandInterface $command): void
// }
$yieldedErrorStrategy = $toPublish->throw($concurrencyException);
if ($yieldedErrorStrategy instanceof EventsToPublish) {
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->eventNormalizer->normalizeEvents($yieldedErrorStrategy->events), $yieldedErrorStrategy->expectedVersion);
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormaliseEvents($yieldedErrorStrategy->events), $yieldedErrorStrategy->expectedVersion);
}
throw $concurrencyException;
}
Expand Down Expand Up @@ -217,19 +217,17 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface
return $this->contentDimensionSource;
}

private function enrichEventsToPublishWithMetadata(EventsToPublish $eventsToPublish): EventsToPublish
private function enrichAndNormaliseEvents(DomainEvents $events): Events
{
$initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser();
$initiatingTimestamp = $this->clock->now();

return new EventsToPublish(
$eventsToPublish->streamName,
InitiatingEventMetadata::enrichEventsWithInitiatingMetadata(
$eventsToPublish->events,
$initiatingUserId,
$initiatingTimestamp
),
$eventsToPublish->expectedVersion,
$events = InitiatingEventMetadata::enrichEventsWithInitiatingMetadata(
$events,
$initiatingUserId,
$initiatingTimestamp
);

return Events::fromArray($events->map($this->eventNormalizer->normalize(...)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Neos\ContentRepository\Core\EventStore;

use Neos\ContentRepository\Core\EventStore\Events as DomainEvents;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasClosed;
use Neos\ContentRepository\Core\Feature\ContentStreamClosing\Event\ContentStreamWasReopened;
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
Expand All @@ -26,7 +25,6 @@
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Dto\SubtreeTag;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged;
use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated;
Expand All @@ -45,7 +43,6 @@
use Neos\EventStore\Model\Event\EventData;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Events;

/**
* Central authority to convert Content Repository domain events to Event Store EventData and EventType, vice versa.
Expand Down Expand Up @@ -156,11 +153,6 @@ public function normalize(EventInterface|DecoratedEvent $event): Event
);
}

public function normalizeEvents(DomainEvents $events): Events
{
return Events::fromArray($events->map($this->normalize(...)));
}

public function denormalize(Event $event): EventInterface
{
$eventClassName = $this->getEventClassName($event);
Expand Down

0 comments on commit be0ba17

Please sign in to comment.