Skip to content

Commit

Permalink
Merge pull request #5374 from mhsdesign/task/improve-events-exporter
Browse files Browse the repository at this point in the history
BUGFIX: Harden events exporter
  • Loading branch information
mhsdesign authored Nov 28, 2024
2 parents 8ebf99c + 9d45d27 commit 516ca6c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,9 @@ public function iExpectTheFollowingJsonL(PyStringNode $string): void
$eventsWithoutRandomIds = [];

foreach ($exportedEvents as $exportedEvent) {
// we have to remove the event id and initiatingTimestamp to make the events diff able
// we have to remove the event ids to make the events diff-able
$eventsWithoutRandomIds[] = $exportedEvent
->withIdentifier('random-event-uuid')
->processMetadata(function (array $metadata) {
$metadata[InitiatingEventMetadata::INITIATING_TIMESTAMP] = 'random-time';
return $metadata;
});
->withIdentifier('random-event-uuid');
}

Assert::assertSame($string->getRaw(), ExportedEvents::fromIterable($eventsWithoutRandomIds)->toJsonl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Feature: As a user of the CR I want to export the event stream using the EventExportProcessor

Background:
Given the current date and time is "2023-03-16T12:00:00+01:00"
Given using the following content dimensions:
| Identifier | Values | Generalizations |
| language | de, gsw, fr | gsw->de |
Expand All @@ -20,14 +21,12 @@ Feature: As a user of the CR I want to export the event stream using the EventEx
| Key | Value |
| nodeAggregateId | "lady-eleonode-rootford" |
| nodeTypeName | "Neos.ContentRepository:Root" |
And the event NodeAggregateWithNodeWasCreated was published with payload:
And the command CreateNodeAggregateWithNode is executed with payload:
| Key | Value |
| workspaceName | "live" |
| contentStreamId | "cs-identifier" |
| nodeAggregateId | "nody-mc-nodeface" |
| nodeTypeName | "Neos.ContentRepository.Testing:Document" |
| originDimensionSpacePoint | {"language":"de"} |
| coveredDimensionSpacePoints | [{"language":"de"},{"language":"gsw"},{"language":"fr"}] |
| parentNodeAggregateId | "lady-eleonode-rootford" |
| nodeName | "child-document" |
| nodeAggregateClassification | "regular" |
Expand All @@ -37,7 +36,7 @@ Feature: As a user of the CR I want to export the event stream using the EventEx
When the events are exported
Then I expect the following jsonl:
"""
{"identifier":"random-event-uuid","type":"RootNodeAggregateWithNodeWasCreated","payload":{"nodeAggregateId":"lady-eleonode-rootford","nodeTypeName":"Neos.ContentRepository:Root","coveredDimensionSpacePoints":[{"language":"de"},{"language":"gsw"},{"language":"fr"}],"nodeAggregateClassification":"root"},"metadata":{"commandClass":"Neos\\ContentRepository\\Core\\Feature\\RootNodeCreation\\Command\\CreateRootNodeAggregateWithNode","commandPayload":{"workspaceName":"live","nodeAggregateId":"lady-eleonode-rootford","nodeTypeName":"Neos.ContentRepository:Root","tetheredDescendantNodeAggregateIds":[]},"initiatingUserId":"system","initiatingTimestamp":"random-time"}}
{"identifier":"random-event-uuid","type":"NodeAggregateWithNodeWasCreated","payload":{"nodeAggregateId":"nody-mc-nodeface","nodeTypeName":"Neos.ContentRepository.Testing:Document","originDimensionSpacePoint":{"language":"de"},"succeedingSiblingsForCoverage":[{"dimensionSpacePoint":{"language":"de"},"nodeAggregateId":null},{"dimensionSpacePoint":{"language":"gsw"},"nodeAggregateId":null},{"dimensionSpacePoint":{"language":"fr"},"nodeAggregateId":null}],"parentNodeAggregateId":"lady-eleonode-rootford","nodeName":"child-document","initialPropertyValues":[],"nodeAggregateClassification":"regular","nodeReferences":[]},"metadata":{"initiatingTimestamp":"random-time"}}
{"identifier":"random-event-uuid","type":"RootNodeAggregateWithNodeWasCreated","payload":{"nodeAggregateId":"lady-eleonode-rootford","nodeTypeName":"Neos.ContentRepository:Root","coveredDimensionSpacePoints":[{"language":"de"},{"language":"gsw"},{"language":"fr"}],"nodeAggregateClassification":"root"},"metadata":{"initiatingUserId":"system","initiatingTimestamp":"2023-03-16T12:00:00+01:00"}}
{"identifier":"random-event-uuid","type":"NodeAggregateWithNodeWasCreated","payload":{"nodeAggregateId":"nody-mc-nodeface","nodeTypeName":"Neos.ContentRepository.Testing:Document","originDimensionSpacePoint":{"language":"de"},"succeedingSiblingsForCoverage":[{"dimensionSpacePoint":{"language":"de"},"nodeAggregateId":null},{"dimensionSpacePoint":{"language":"gsw"},"nodeAggregateId":null}],"parentNodeAggregateId":"lady-eleonode-rootford","nodeName":"child-document","initialPropertyValues":[],"nodeAggregateClassification":"regular","nodeReferences":[]},"metadata":{"initiatingUserId":"system","initiatingTimestamp":"2023-03-16T12:00:00+01:00"}}
"""
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ public static function fromRawEvent(Event $event): self
// unset content stream id as this is overwritten during import
unset($payload['contentStreamId'], $payload['workspaceName']);

$metaData = $event->metadata?->value ?? [];
unset($metaData['commandClass'], $metaData['commandPayload']);

return new self(
$event->id->value,
$event->type->value,
$payload,
$event->metadata?->value ?? [],
$metaData,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public function build(ContentRepositoryServiceFactoryDependencies $serviceFactor
$this->targetWorkspaceName,
$this->keepEventIds,
$serviceFactoryDependencies->eventStore,
$serviceFactoryDependencies->eventNormalizer,
$serviceFactoryDependencies->contentRepository,
$serviceFactoryDependencies->eventNormalizer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

namespace Neos\ContentRepository\Export\Processors;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface;
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
use Neos\ContentRepository\Core\Feature\ContentStreamEventStreamName;
use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked;
use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved;
use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated;
use Neos\ContentRepository\Core\Feature\WorkspaceEventStreamName;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Export\Event\ValueObject\ExportedEvent;
use Neos\ContentRepository\Export\ProcessingContext;
Expand All @@ -20,8 +21,11 @@
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Event\EventTypes;
use Neos\EventStore\Model\Event\Version;
use Neos\EventStore\Model\Events;
use Neos\EventStore\Model\EventStream\EventStreamFilter;
use Neos\EventStore\Model\EventStream\ExpectedVersion;
use Neos\Flow\Utility\Algorithms;

Expand All @@ -34,8 +38,7 @@ public function __construct(
private WorkspaceName $targetWorkspaceName,
private bool $keepEventIds,
private EventStoreInterface $eventStore,
private EventNormalizer $eventNormalizer,
private ContentRepository $contentRepository,
private EventNormalizer $eventNormalizer
) {
}

Expand All @@ -48,15 +51,27 @@ public function run(ProcessingContext $context): void
/** @var array<string, string> $eventIdMap */
$eventIdMap = [];

$workspace = $this->contentRepository->findWorkspaceByName($this->targetWorkspaceName);
if ($workspace === null) {
throw new \InvalidArgumentException("Workspace {$this->targetWorkspaceName} does not exist", 1729530978);
$rootWorkspaceContentStreamId = null;
foreach ($this->eventStore->load(
WorkspaceEventStreamName::fromWorkspaceName($this->targetWorkspaceName)->getEventStreamName(),
EventStreamFilter::create(EventTypes::create(EventType::fromString('RootWorkspaceWasCreated')))
) as $eventEnvelope) {
$rootWorkspaceWasCreatedEvent = $this->eventNormalizer->denormalize($eventEnvelope->event);
if (!$rootWorkspaceWasCreatedEvent instanceof RootWorkspaceWasCreated) {
throw new \RuntimeException(sprintf('Expected event of type %s got %s', RootWorkspaceWasCreated::class, $rootWorkspaceWasCreatedEvent::class), 1732109840);
}
$rootWorkspaceContentStreamId = $rootWorkspaceWasCreatedEvent->newContentStreamId;
break;
}

if ($rootWorkspaceContentStreamId === null) {
throw new \InvalidArgumentException(sprintf('Workspace "%s" does not exist or is not a root workspace', $this->targetWorkspaceName), 1729530978);
}

while (($line = fgets($eventFileResource)) !== false) {
$event =
ExportedEvent::fromJson(trim($line))
->processPayload(fn (array $payload) => [...$payload, 'contentStreamId' => $workspace->currentContentStreamId->value, 'workspaceName' => $this->targetWorkspaceName->value]);
->processPayload(fn (array $payload) => [...$payload, 'contentStreamId' => $rootWorkspaceContentStreamId->value, 'workspaceName' => $this->targetWorkspaceName->value]);
if (!$this->keepEventIds) {
try {
$newEventId = Algorithms::generateUUID();
Expand Down Expand Up @@ -96,11 +111,11 @@ public function run(ProcessingContext $context): void
$domainEvents[] = $this->eventNormalizer->normalize($domainEvent);
}

$contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId)->getEventStreamName();
$contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($rootWorkspaceContentStreamId)->getEventStreamName();
try {
$this->eventStore->commit($contentStreamStreamName, Events::fromArray($domainEvents), ExpectedVersion::fromVersion(Version::first()));
} catch (ConcurrencyException $e) {
throw new \RuntimeException(sprintf('Failed to publish %d events because the event stream "%s" for workspace "%s". Please prune the content repository first via `./flow site:pruneAll`.', count($domainEvents), $contentStreamStreamName->value, $workspace->workspaceName->value), 1729506818, $e);
throw new \RuntimeException(sprintf('Failed to publish %d events because the content stream "%s" for workspace "%s" already contains events. Please consider to prune the content repository first via `./flow site:pruneAll`.', count($domainEvents), $contentStreamStreamName->value, $this->targetWorkspaceName->value), 1729506818, $e);
}
}
}

0 comments on commit 516ca6c

Please sign in to comment.