Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

!!! FEATURE: Overhaul catch up implementation #4405

Merged
merged 16 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\ContentGraph;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\NodeFactory;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\ProjectionContentGraph;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\Feature\NodeModification\Dto\SerializedPropertyValues;
use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked;
use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved;
use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\Event\DimensionShineThroughWasAdded;
use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\Event\DimensionSpacePointWasMoved;
use Neos\ContentRepository\Core\Feature\NodeCreation\Event\NodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Feature\NodeDisabling\Event\NodeAggregateWasDisabled;
use Neos\ContentRepository\Core\Feature\NodeDisabling\Event\NodeAggregateWasEnabled;
use Neos\ContentRepository\Core\Feature\NodeModification\Dto\SerializedPropertyValues;
use Neos\ContentRepository\Core\Feature\NodeModification\Event\NodePropertiesWereSet;
use Neos\ContentRepository\Core\Feature\NodeMove\Event\NodeAggregateWasMoved;
use Neos\ContentRepository\Core\Feature\NodeReferencing\Dto\SerializedNodeReference;
Expand All @@ -44,25 +44,21 @@
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\Timestamps;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateClassification;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Node\NodeName;
use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\EventStore\CatchUp\CatchUp;
use Neos\EventStore\CatchUp\CheckpointStorageInterface;
use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStore\SetupResult;
use Neos\EventStore\Model\EventStream\EventStreamInterface;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* @implements ProjectionInterface<ContentGraph>
Expand All @@ -88,12 +84,10 @@ final class DoctrineDbalContentGraphProjection implements ProjectionInterface, W
private DoctrineCheckpointStorage $checkpointStorage;

public function __construct(
private readonly EventNormalizer $eventNormalizer,
private readonly DbalClientInterface $dbalClient,
private readonly NodeFactory $nodeFactory,
private readonly NodeTypeManager $nodeTypeManager,
private readonly ProjectionContentGraph $projectionContentGraph,
private readonly CatchUpHookFactoryInterface $catchUpHookFactory,
private readonly string $tableNamePrefix,
) {
$this->checkpointStorage = new DoctrineCheckpointStorage(
Expand Down Expand Up @@ -158,10 +152,9 @@ private function truncateDatabaseTables(): void
$connection->executeQuery('TRUNCATE table ' . $this->tableNamePrefix . '_restrictionrelation');
}

public function canHandle(Event $event): bool
public function canHandle(EventInterface $event): bool
{
$eventClassName = $this->eventNormalizer->getEventClassName($event);
return in_array($eventClassName, [
return in_array($event::class, [
RootNodeAggregateWithNodeWasCreated::class,
RootNodeAggregateDimensionsWereUpdated::class,
NodeAggregateWithNodeWasCreated::class,
Expand All @@ -183,75 +176,34 @@ public function canHandle(Event $event): bool
]);
}

public function catchUp(EventStreamInterface $eventStream, ContentRepository $contentRepository): void
public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
{
$catchUpHook = $this->catchUpHookFactory->build($contentRepository);
$catchUpHook->onBeforeCatchUp();
$catchUp = CatchUp::create(
fn(EventEnvelope $eventEnvelope) => $this->apply($eventEnvelope, $catchUpHook),
$this->checkpointStorage
);
$catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted());
$catchUp->run($eventStream);
$catchUpHook->onAfterCatchUp();
}

private function apply(EventEnvelope $eventEnvelope, CatchUpHookInterface $catchUpHook): void
{
if (!$this->canHandle($eventEnvelope->event)) {
return;
}

$eventInstance = $this->eventNormalizer->denormalize($eventEnvelope->event);

$catchUpHook->onBeforeEvent($eventInstance, $eventEnvelope);

if ($eventInstance instanceof RootNodeAggregateWithNodeWasCreated) {
$this->whenRootNodeAggregateWithNodeWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof RootNodeAggregateDimensionsWereUpdated) {
$this->whenRootNodeAggregateDimensionsWereUpdated($eventInstance);
} elseif ($eventInstance instanceof NodeAggregateWithNodeWasCreated) {
$this->whenNodeAggregateWithNodeWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeAggregateNameWasChanged) {
$this->whenNodeAggregateNameWasChanged($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof ContentStreamWasForked) {
$this->whenContentStreamWasForked($eventInstance);
} elseif ($eventInstance instanceof ContentStreamWasRemoved) {
$this->whenContentStreamWasRemoved($eventInstance);
} elseif ($eventInstance instanceof NodePropertiesWereSet) {
$this->whenNodePropertiesWereSet($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeReferencesWereSet) {
$this->whenNodeReferencesWereSet($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeAggregateWasEnabled) {
$this->whenNodeAggregateWasEnabled($eventInstance);
} elseif ($eventInstance instanceof NodeAggregateTypeWasChanged) {
$this->whenNodeAggregateTypeWasChanged($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof DimensionSpacePointWasMoved) {
$this->whenDimensionSpacePointWasMoved($eventInstance);
} elseif ($eventInstance instanceof DimensionShineThroughWasAdded) {
$this->whenDimensionShineThroughWasAdded($eventInstance);
} elseif ($eventInstance instanceof NodeAggregateWasRemoved) {
$this->whenNodeAggregateWasRemoved($eventInstance);
} elseif ($eventInstance instanceof NodeAggregateWasMoved) {
$this->whenNodeAggregateWasMoved($eventInstance);
} elseif ($eventInstance instanceof NodeSpecializationVariantWasCreated) {
$this->whenNodeSpecializationVariantWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeGeneralizationVariantWasCreated) {
$this->whenNodeGeneralizationVariantWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodePeerVariantWasCreated) {
$this->whenNodePeerVariantWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeAggregateWasDisabled) {
$this->whenNodeAggregateWasDisabled($eventInstance);
} else {
throw new \RuntimeException('Not supported: ' . get_class($eventInstance));
}

$catchUpHook->onAfterEvent($eventInstance, $eventEnvelope);
match ($event::class) {
RootNodeAggregateWithNodeWasCreated::class => $this->whenRootNodeAggregateWithNodeWasCreated($event, $eventEnvelope),
RootNodeAggregateDimensionsWereUpdated::class => $this->whenRootNodeAggregateDimensionsWereUpdated($event),
NodeAggregateWithNodeWasCreated::class => $this->whenNodeAggregateWithNodeWasCreated($event, $eventEnvelope),
NodeAggregateNameWasChanged::class => $this->whenNodeAggregateNameWasChanged($event, $eventEnvelope),
ContentStreamWasForked::class => $this->whenContentStreamWasForked($event),
ContentStreamWasRemoved::class => $this->whenContentStreamWasRemoved($event),
NodePropertiesWereSet::class => $this->whenNodePropertiesWereSet($event, $eventEnvelope),
NodeReferencesWereSet::class => $this->whenNodeReferencesWereSet($event, $eventEnvelope),
NodeAggregateWasEnabled::class => $this->whenNodeAggregateWasEnabled($event),
NodeAggregateTypeWasChanged::class => $this->whenNodeAggregateTypeWasChanged($event, $eventEnvelope),
DimensionSpacePointWasMoved::class => $this->whenDimensionSpacePointWasMoved($event),
DimensionShineThroughWasAdded::class => $this->whenDimensionShineThroughWasAdded($event),
NodeAggregateWasRemoved::class => $this->whenNodeAggregateWasRemoved($event),
NodeAggregateWasMoved::class => $this->whenNodeAggregateWasMoved($event),
NodeSpecializationVariantWasCreated::class => $this->whenNodeSpecializationVariantWasCreated($event, $eventEnvelope),
NodeGeneralizationVariantWasCreated::class => $this->whenNodeGeneralizationVariantWasCreated($event, $eventEnvelope),
NodePeerVariantWasCreated::class => $this->whenNodePeerVariantWasCreated($event, $eventEnvelope),
NodeAggregateWasDisabled::class => $this->whenNodeAggregateWasDisabled($event),
default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))),
};
}

public function getSequenceNumber(): SequenceNumber
public function getCheckpointStorage(): CheckpointStorageInterface
{
return $this->checkpointStorage->getHighestAppliedSequenceNumber();
return $this->checkpointStorage;
}

public function getState(): ContentGraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\Factory\ProjectionFactoryDependencies;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjection;
use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\Projections;

/**
* Use this class as ProjectionFactory in your configuration to construct a content graph
Expand All @@ -38,8 +35,6 @@ public static function graphProjectionTableNamePrefix(
public function build(
ProjectionFactoryDependencies $projectionFactoryDependencies,
array $options,
CatchUpHookFactoryInterface $catchUpHookFactory,
Projections $projectionsSoFar
): ContentGraphProjection {
$tableNamePrefix = self::graphProjectionTableNamePrefix(
$projectionFactoryDependencies->contentRepositoryId
Expand All @@ -48,7 +43,6 @@ public function build(
return new ContentGraphProjection(
// @phpstan-ignore-next-line
new DoctrineDbalContentGraphProjection(
$projectionFactoryDependencies->eventNormalizer,
$this->dbalClient,
new NodeFactory(
$projectionFactoryDependencies->contentRepositoryId,
Expand All @@ -60,7 +54,6 @@ public function build(
$this->dbalClient,
$tableNamePrefix
),
$catchUpHookFactory,
$tableNamePrefix
)
);
Expand Down
Loading