Skip to content

Commit

Permalink
Merge pull request #4405 from neos/feature/4289-overhaul-catchup
Browse files Browse the repository at this point in the history
!!! FEATURE: Overhaul catch up implementation
  • Loading branch information
bwaidelich authored Aug 26, 2023
2 parents 047806b + 0c97437 commit bb8bc83
Show file tree
Hide file tree
Showing 39 changed files with 469 additions and 676 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\ContentGraph;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\NodeFactory;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Repository\ProjectionContentGraph;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint;
use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\Feature\NodeModification\Dto\SerializedPropertyValues;
use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked;
use Neos\ContentRepository\Core\Feature\ContentStreamRemoval\Event\ContentStreamWasRemoved;
use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\Event\DimensionShineThroughWasAdded;
use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\Event\DimensionSpacePointWasMoved;
use Neos\ContentRepository\Core\Feature\NodeCreation\Event\NodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Feature\NodeDisabling\Event\NodeAggregateWasDisabled;
use Neos\ContentRepository\Core\Feature\NodeDisabling\Event\NodeAggregateWasEnabled;
use Neos\ContentRepository\Core\Feature\NodeModification\Dto\SerializedPropertyValues;
use Neos\ContentRepository\Core\Feature\NodeModification\Event\NodePropertiesWereSet;
use Neos\ContentRepository\Core\Feature\NodeMove\Event\NodeAggregateWasMoved;
use Neos\ContentRepository\Core\Feature\NodeReferencing\Dto\SerializedNodeReference;
Expand All @@ -44,25 +44,21 @@
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\Timestamps;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateClassification;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Node\NodeName;
use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\EventStore\CatchUp\CatchUp;
use Neos\EventStore\CatchUp\CheckpointStorageInterface;
use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStore\SetupResult;
use Neos\EventStore\Model\EventStream\EventStreamInterface;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* @implements ProjectionInterface<ContentGraph>
Expand All @@ -88,12 +84,10 @@ final class DoctrineDbalContentGraphProjection implements ProjectionInterface, W
private DoctrineCheckpointStorage $checkpointStorage;

public function __construct(
private readonly EventNormalizer $eventNormalizer,
private readonly DbalClientInterface $dbalClient,
private readonly NodeFactory $nodeFactory,
private readonly NodeTypeManager $nodeTypeManager,
private readonly ProjectionContentGraph $projectionContentGraph,
private readonly CatchUpHookFactoryInterface $catchUpHookFactory,
private readonly string $tableNamePrefix,
) {
$this->checkpointStorage = new DoctrineCheckpointStorage(
Expand Down Expand Up @@ -158,10 +152,9 @@ private function truncateDatabaseTables(): void
$connection->executeQuery('TRUNCATE table ' . $this->tableNamePrefix . '_restrictionrelation');
}

public function canHandle(Event $event): bool
public function canHandle(EventInterface $event): bool
{
$eventClassName = $this->eventNormalizer->getEventClassName($event);
return in_array($eventClassName, [
return in_array($event::class, [
RootNodeAggregateWithNodeWasCreated::class,
RootNodeAggregateDimensionsWereUpdated::class,
NodeAggregateWithNodeWasCreated::class,
Expand All @@ -183,75 +176,34 @@ public function canHandle(Event $event): bool
]);
}

public function catchUp(EventStreamInterface $eventStream, ContentRepository $contentRepository): void
public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
{
$catchUpHook = $this->catchUpHookFactory->build($contentRepository);
$catchUpHook->onBeforeCatchUp();
$catchUp = CatchUp::create(
fn(EventEnvelope $eventEnvelope) => $this->apply($eventEnvelope, $catchUpHook),
$this->checkpointStorage
);
$catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted());
$catchUp->run($eventStream);
$catchUpHook->onAfterCatchUp();
}

private function apply(EventEnvelope $eventEnvelope, CatchUpHookInterface $catchUpHook): void
{
if (!$this->canHandle($eventEnvelope->event)) {
return;
}

$eventInstance = $this->eventNormalizer->denormalize($eventEnvelope->event);

$catchUpHook->onBeforeEvent($eventInstance, $eventEnvelope);

if ($eventInstance instanceof RootNodeAggregateWithNodeWasCreated) {
$this->whenRootNodeAggregateWithNodeWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof RootNodeAggregateDimensionsWereUpdated) {
$this->whenRootNodeAggregateDimensionsWereUpdated($eventInstance);
} elseif ($eventInstance instanceof NodeAggregateWithNodeWasCreated) {
$this->whenNodeAggregateWithNodeWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeAggregateNameWasChanged) {
$this->whenNodeAggregateNameWasChanged($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof ContentStreamWasForked) {
$this->whenContentStreamWasForked($eventInstance);
} elseif ($eventInstance instanceof ContentStreamWasRemoved) {
$this->whenContentStreamWasRemoved($eventInstance);
} elseif ($eventInstance instanceof NodePropertiesWereSet) {
$this->whenNodePropertiesWereSet($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeReferencesWereSet) {
$this->whenNodeReferencesWereSet($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeAggregateWasEnabled) {
$this->whenNodeAggregateWasEnabled($eventInstance);
} elseif ($eventInstance instanceof NodeAggregateTypeWasChanged) {
$this->whenNodeAggregateTypeWasChanged($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof DimensionSpacePointWasMoved) {
$this->whenDimensionSpacePointWasMoved($eventInstance);
} elseif ($eventInstance instanceof DimensionShineThroughWasAdded) {
$this->whenDimensionShineThroughWasAdded($eventInstance);
} elseif ($eventInstance instanceof NodeAggregateWasRemoved) {
$this->whenNodeAggregateWasRemoved($eventInstance);
} elseif ($eventInstance instanceof NodeAggregateWasMoved) {
$this->whenNodeAggregateWasMoved($eventInstance);
} elseif ($eventInstance instanceof NodeSpecializationVariantWasCreated) {
$this->whenNodeSpecializationVariantWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeGeneralizationVariantWasCreated) {
$this->whenNodeGeneralizationVariantWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodePeerVariantWasCreated) {
$this->whenNodePeerVariantWasCreated($eventInstance, $eventEnvelope);
} elseif ($eventInstance instanceof NodeAggregateWasDisabled) {
$this->whenNodeAggregateWasDisabled($eventInstance);
} else {
throw new \RuntimeException('Not supported: ' . get_class($eventInstance));
}

$catchUpHook->onAfterEvent($eventInstance, $eventEnvelope);
match ($event::class) {
RootNodeAggregateWithNodeWasCreated::class => $this->whenRootNodeAggregateWithNodeWasCreated($event, $eventEnvelope),
RootNodeAggregateDimensionsWereUpdated::class => $this->whenRootNodeAggregateDimensionsWereUpdated($event),
NodeAggregateWithNodeWasCreated::class => $this->whenNodeAggregateWithNodeWasCreated($event, $eventEnvelope),
NodeAggregateNameWasChanged::class => $this->whenNodeAggregateNameWasChanged($event, $eventEnvelope),
ContentStreamWasForked::class => $this->whenContentStreamWasForked($event),
ContentStreamWasRemoved::class => $this->whenContentStreamWasRemoved($event),
NodePropertiesWereSet::class => $this->whenNodePropertiesWereSet($event, $eventEnvelope),
NodeReferencesWereSet::class => $this->whenNodeReferencesWereSet($event, $eventEnvelope),
NodeAggregateWasEnabled::class => $this->whenNodeAggregateWasEnabled($event),
NodeAggregateTypeWasChanged::class => $this->whenNodeAggregateTypeWasChanged($event, $eventEnvelope),
DimensionSpacePointWasMoved::class => $this->whenDimensionSpacePointWasMoved($event),
DimensionShineThroughWasAdded::class => $this->whenDimensionShineThroughWasAdded($event),
NodeAggregateWasRemoved::class => $this->whenNodeAggregateWasRemoved($event),
NodeAggregateWasMoved::class => $this->whenNodeAggregateWasMoved($event),
NodeSpecializationVariantWasCreated::class => $this->whenNodeSpecializationVariantWasCreated($event, $eventEnvelope),
NodeGeneralizationVariantWasCreated::class => $this->whenNodeGeneralizationVariantWasCreated($event, $eventEnvelope),
NodePeerVariantWasCreated::class => $this->whenNodePeerVariantWasCreated($event, $eventEnvelope),
NodeAggregateWasDisabled::class => $this->whenNodeAggregateWasDisabled($event),
default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))),
};
}

public function getSequenceNumber(): SequenceNumber
public function getCheckpointStorage(): CheckpointStorageInterface
{
return $this->checkpointStorage->getHighestAppliedSequenceNumber();
return $this->checkpointStorage;
}

public function getState(): ContentGraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\Factory\ProjectionFactoryDependencies;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjection;
use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\Projections;

/**
* Use this class as ProjectionFactory in your configuration to construct a content graph
Expand All @@ -38,8 +35,6 @@ public static function graphProjectionTableNamePrefix(
public function build(
ProjectionFactoryDependencies $projectionFactoryDependencies,
array $options,
CatchUpHookFactoryInterface $catchUpHookFactory,
Projections $projectionsSoFar
): ContentGraphProjection {
$tableNamePrefix = self::graphProjectionTableNamePrefix(
$projectionFactoryDependencies->contentRepositoryId
Expand All @@ -48,7 +43,6 @@ public function build(
return new ContentGraphProjection(
// @phpstan-ignore-next-line
new DoctrineDbalContentGraphProjection(
$projectionFactoryDependencies->eventNormalizer,
$this->dbalClient,
new NodeFactory(
$projectionFactoryDependencies->contentRepositoryId,
Expand All @@ -60,7 +54,6 @@ public function build(
$this->dbalClient,
$tableNamePrefix
),
$catchUpHookFactory,
$tableNamePrefix
)
);
Expand Down
Loading

0 comments on commit bb8bc83

Please sign in to comment.