Skip to content

Commit

Permalink
TASK: Harden event store importer by not using projections and checki…
Browse files Browse the repository at this point in the history
…ng that the workspace is a base workspace
  • Loading branch information
mhsdesign committed Nov 20, 2024
1 parent 9c12556 commit 9d45d27
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public function build(ContentRepositoryServiceFactoryDependencies $serviceFactor
$this->targetWorkspaceName,
$this->keepEventIds,
$serviceFactoryDependencies->eventStore,
$serviceFactoryDependencies->eventNormalizer,
$serviceFactoryDependencies->contentRepository,
$serviceFactoryDependencies->eventNormalizer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

namespace Neos\ContentRepository\Export\Processors;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface;
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
use Neos\ContentRepository\Core\Feature\ContentStreamEventStreamName;
use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked;
use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved;
use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated;
use Neos\ContentRepository\Core\Feature\WorkspaceEventStreamName;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Export\Event\ValueObject\ExportedEvent;
use Neos\ContentRepository\Export\ProcessingContext;
Expand All @@ -20,8 +21,11 @@
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Event\EventTypes;
use Neos\EventStore\Model\Event\Version;
use Neos\EventStore\Model\Events;
use Neos\EventStore\Model\EventStream\EventStreamFilter;
use Neos\EventStore\Model\EventStream\ExpectedVersion;
use Neos\Flow\Utility\Algorithms;

Expand All @@ -34,8 +38,7 @@ public function __construct(
private WorkspaceName $targetWorkspaceName,
private bool $keepEventIds,
private EventStoreInterface $eventStore,
private EventNormalizer $eventNormalizer,
private ContentRepository $contentRepository,
private EventNormalizer $eventNormalizer
) {
}

Expand All @@ -48,15 +51,27 @@ public function run(ProcessingContext $context): void
/** @var array<string, string> $eventIdMap */
$eventIdMap = [];

$workspace = $this->contentRepository->findWorkspaceByName($this->targetWorkspaceName);
if ($workspace === null) {
throw new \InvalidArgumentException("Workspace {$this->targetWorkspaceName} does not exist", 1729530978);
$rootWorkspaceContentStreamId = null;
foreach ($this->eventStore->load(
WorkspaceEventStreamName::fromWorkspaceName($this->targetWorkspaceName)->getEventStreamName(),
EventStreamFilter::create(EventTypes::create(EventType::fromString('RootWorkspaceWasCreated')))
) as $eventEnvelope) {
$rootWorkspaceWasCreatedEvent = $this->eventNormalizer->denormalize($eventEnvelope->event);
if (!$rootWorkspaceWasCreatedEvent instanceof RootWorkspaceWasCreated) {
throw new \RuntimeException(sprintf('Expected event of type %s got %s', RootWorkspaceWasCreated::class, $rootWorkspaceWasCreatedEvent::class), 1732109840);
}
$rootWorkspaceContentStreamId = $rootWorkspaceWasCreatedEvent->newContentStreamId;
break;
}

if ($rootWorkspaceContentStreamId === null) {
throw new \InvalidArgumentException(sprintf('Workspace "%s" does not exist or is not a root workspace', $this->targetWorkspaceName), 1729530978);
}

while (($line = fgets($eventFileResource)) !== false) {
$event =
ExportedEvent::fromJson(trim($line))
->processPayload(fn (array $payload) => [...$payload, 'contentStreamId' => $workspace->currentContentStreamId->value, 'workspaceName' => $this->targetWorkspaceName->value]);
->processPayload(fn (array $payload) => [...$payload, 'contentStreamId' => $rootWorkspaceContentStreamId->value, 'workspaceName' => $this->targetWorkspaceName->value]);
if (!$this->keepEventIds) {
try {
$newEventId = Algorithms::generateUUID();
Expand Down Expand Up @@ -96,11 +111,11 @@ public function run(ProcessingContext $context): void
$domainEvents[] = $this->eventNormalizer->normalize($domainEvent);
}

$contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId)->getEventStreamName();
$contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($rootWorkspaceContentStreamId)->getEventStreamName();
try {
$this->eventStore->commit($contentStreamStreamName, Events::fromArray($domainEvents), ExpectedVersion::fromVersion(Version::first()));
} catch (ConcurrencyException $e) {
throw new \RuntimeException(sprintf('Failed to publish %d events because the event stream "%s" for workspace "%s". Please prune the content repository first via `./flow site:pruneAll`.', count($domainEvents), $contentStreamStreamName->value, $workspace->workspaceName->value), 1729506818, $e);
throw new \RuntimeException(sprintf('Failed to publish %d events because the content stream "%s" for workspace "%s" already contains events. Please consider to prune the content repository first via `./flow site:pruneAll`.', count($domainEvents), $contentStreamStreamName->value, $this->targetWorkspaceName->value), 1729506818, $e);
}
}
}

0 comments on commit 9d45d27

Please sign in to comment.