Skip to content

Commit

Permalink
TASK: Add metadata which command class caused publish / workspace ope…
Browse files Browse the repository at this point in the history
…ration to first event

For commands with multiple events like workspace publishing, we can now add to the event metadata like the causation command short class name. Via the correlation id they are grouped so we only add this information to the first event,

Note that for 'simple' commands we dont need to do this as `RebaseableCommand::enrichWithCommand` will actually fully serialize the command into commandName and payload
  • Loading branch information
mhsdesign committed Jan 26, 2025
1 parent bc79d6e commit ee59523
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 24 deletions.
26 changes: 19 additions & 7 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public function handle(CommandInterface $command): void

// simple case
if ($toPublish instanceof EventsToPublish) {
$this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, correlationId: null), $toPublish->expectedVersion);
$this->eventStore->commit($toPublish->streamName, Events::fromArray($this->enrichEventsWithInitiatingMetadata($toPublish->events)->map($this->eventNormalizer->normalize(...))), $toPublish->expectedVersion);
$fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it.
if ($fullCatchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors);
Expand All @@ -111,11 +111,13 @@ public function handle(CommandInterface $command): void
}

// control-flow aware command handling via generator
$isFirstEvent = true;
$causationCommandClassName = $command::class;
$correlationId = CorrelationId::fromString(UuidFactory::create());
try {
foreach ($toPublish as $eventsToPublish) {
try {
$this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion);
$this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId, $isFirstEvent, $causationCommandClassName), $eventsToPublish->expectedVersion);
} catch (ConcurrencyException $concurrencyException) {
// we pass the exception into the generator (->throw), so it could be try-caught and reacted upon:
//
Expand All @@ -127,7 +129,7 @@ public function handle(CommandInterface $command): void
// }
$yieldedErrorStrategy = $toPublish->throw($concurrencyException);
if ($yieldedErrorStrategy instanceof EventsToPublish) {
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion);
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId, $isFirstEvent, $causationCommandClassName), $yieldedErrorStrategy->expectedVersion);
}
throw $concurrencyException;
}
Expand Down Expand Up @@ -222,19 +224,29 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface
return $this->contentDimensionSource;
}

private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId): Events
private function enrichEventsWithInitiatingMetadata(DomainEvents $events): DomainEvents
{
$initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser();
$initiatingTimestamp = $this->clock->now();

$events = InitiatingEventMetadata::enrichEventsWithInitiatingMetadata(
return InitiatingEventMetadata::enrichEventsWithInitiatingMetadata(
$events,
$initiatingUserId,
$initiatingTimestamp
);
}

private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId, bool &$isFirstEvent, string $causationCommandClassName): Events
{
$events = $this->enrichEventsWithInitiatingMetadata($events);

return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) {
$decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId);
return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId, $causationCommandClassName, &$isFirstEvent) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
if ($isFirstEvent) {
$metadata['debug_causationCommand'] = substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1);
$isFirstEvent = false;
}
$decoratedEvent = DecoratedEvent::create($event, metadata: $metadata, correlationId: $correlationId);
return $this->eventNormalizer->normalize($decoratedEvent);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,15 @@ trait ContentStreamHandling
private function closeContentStream(
ContentStreamId $contentStreamId,
Version $contentStreamVersion,
string $causationCommandClassName
): EventsToPublish {
$streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName();

return new EventsToPublish(
$streamName,
Events::with(
DecoratedEvent::create(
new ContentStreamWasClosed(
$contentStreamId,
),
metadata: array_filter(['debug_causationCommand' => substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1)])
)
new ContentStreamWasClosed(
$contentStreamId,
),
),
ExpectedVersion::fromVersion($contentStreamVersion)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ private function handlePublishWorkspace(

yield $this->closeContentStream(
$workspace->currentContentStreamId,
$workspaceContentStreamVersion,
$command::class
$workspaceContentStreamVersion
);

$commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName);
Expand Down Expand Up @@ -354,8 +353,7 @@ private function handleRebaseWorkspace(
// if we have no changes in the workspace we can fork from the base directly
yield $this->closeContentStream(
$workspace->currentContentStreamId,
$workspaceContentStreamVersion,
$command::class
$workspaceContentStreamVersion
);

yield from $this->rebaseWorkspaceWithoutChanges(
Expand All @@ -376,8 +374,7 @@ private function handleRebaseWorkspace(

yield $this->closeContentStream(
$workspace->currentContentStreamId,
$workspaceContentStreamVersion,
$command::class
$workspaceContentStreamVersion
);

$commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName);
Expand Down Expand Up @@ -463,8 +460,7 @@ private function handlePublishIndividualNodesFromWorkspace(

yield $this->closeContentStream(
$workspace->currentContentStreamId,
$workspaceContentStreamVersion,
$command::class
$workspaceContentStreamVersion
);

$commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName);
Expand Down Expand Up @@ -588,8 +584,7 @@ private function handleDiscardIndividualNodesFromWorkspace(

yield $this->closeContentStream(
$workspace->currentContentStreamId,
$workspaceContentStreamVersion,
$command::class
$workspaceContentStreamVersion
);

if ($commandsToKeep->isEmpty()) {
Expand Down

0 comments on commit ee59523

Please sign in to comment.