Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK: Cleanup projection catch-up trigger extensibility #5288

Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public function createNodesForPerformanceTest(int $nodesPerLevel, int $levels):
NodeAggregateClassification::CLASSIFICATION_ROOT,
);

$this->eventPersister->publishEvents(new EventsToPublish(
$this->eventPersister->publishEvents($this->contentRepository, new EventsToPublish(
$this->contentStreamEventStream->getEventStreamName(),
Events::with($rootNodeAggregateWasCreated),
ExpectedVersion::ANY()
Expand All @@ -102,7 +102,7 @@ public function createNodesForPerformanceTest(int $nodesPerLevel, int $levels):
$sumSoFar = 0;
$events = [];
$this->createHierarchy($rootNodeAggregateId, 1, $levels, $nodesPerLevel, $sumSoFar, $events);
$this->eventPersister->publishEvents(new EventsToPublish(
$this->eventPersister->publishEvents($this->contentRepository, new EventsToPublish(
$this->contentStreamEventStream->getEventStreamName(),
Events::fromArray($events),
ExpectedVersion::ANY()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntryType;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger;
use Neos\EventStore\Model\EventEnvelope;
use Neos\Flow\Annotations as Flow;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

use Neos\ContentRepository\Core\CommandHandler\CommandInterface;
use Neos\ContentRepository\Core\CommandHandler\CommandResult;
use Neos\ContentRepository\Core\EventStore\EventPersister;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspace;
use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist;
Expand All @@ -38,15 +40,22 @@ final class CommandHandlingDependencies
*/
private array $overriddenContentGraphInstances = [];

public function __construct(private readonly ContentRepository $contentRepository)
{
public function __construct(
private readonly ContentRepository $contentRepository,
private readonly EventPersister $eventPersister
) {
}

public function handle(CommandInterface $command): CommandResult
{
return $this->contentRepository->handle($command);
}

public function publishEvents(EventsToPublish $eventsToPublish): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the weirdest API, why would we have this here? CommandHandlingDependencies::publishEvents makes no sense to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that for publishing events we need a catchup afterwards and the catchup hook extensibility require the content repository to be build.

The CommandHandlingDependencies are a purely internal thing and the handle() method on it already is a code smell due to the workspace command handler "doing to much" besides just publishing events for one command as we issue sub commands in the middle of handling another command.

For publishing an event stream, we need low level access to publish all events which was previously done via $this->eventPersister->publishEvents($events).
This leveraged the odd dependency flow of refetching the content repository from the Neos registry (see comment above) but now we need to pass the content repository for the said reasons explicitly.

Instead of exposing the content repository from $commandHandlingDependencies and using it like $this->eventPersister->publishEvents($events, $commandHandlingDependencies->contentRepository) i propose to keep the contentRepository instance private and introduce the new method: $commandHandlingDependencies->publishEvents($events)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather expose the CR here instead of adding this method, but maybe an abstraction around projection catchup is not bad, I do like the current api better than what happens in this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we discussed having a factory like so, but the catchup logic is still placed ON the content repository and we would have to move the codeblock to the event persister ...

final readonly class CatchUpHookFactoryBuilder
{
    public function __construct(
        private ContentRepository $contentRepository
    ) {
    }

    public function build(CatchUpHookFactoryInterface $catchUpHookFactory): CatchUpHookInterface
    {
        return $catchUpHookFactory->build($this->contentRepository);
    }
}

Also introducing a read site content repository or just passing the content repository read model to the catchup hooks would certainly be promising as it would prevent recursive handling via handle but nothing for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha @kitsunet with #5301 on the rise i guess i can revert d3119b8 as well have so much logic on the poor object that an additional method persistEvents wouldn't hurt :D

I just experimented with returning the events instead of handling them in between, which is much cleaner imo and would allow us to actually get rid of that thing :D https://neos-project.slack.com/archives/C0L41E268/p1729546247272549

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still like it better this way 😆

{
$this->eventPersister->publishEvents($this->contentRepository, $eventsToPublish);
}

public function getContentStreamVersion(ContentStreamId $contentStreamId): Version
{
$contentStream = $this->contentRepository->findContentStreamById($contentStreamId);
Expand Down
13 changes: 11 additions & 2 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public function __construct(
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
) {
$this->commandHandlingDependencies = new CommandHandlingDependencies($this);
$this->commandHandlingDependencies = new CommandHandlingDependencies($this, $eventPersister);
}

/**
Expand Down Expand Up @@ -132,7 +132,7 @@ public function handle(CommandInterface $command): CommandResult
$eventsToPublish->expectedVersion,
);

return $this->eventPersister->publishEvents($eventsToPublish);
return $this->eventPersister->publishEvents($this, $eventsToPublish);
}


Expand Down Expand Up @@ -201,6 +201,15 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o
$catchUpHook?->onAfterCatchUp();
}

public function catchupProjections(): void
{
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// FIXME optimise by only loading required events once and not per projection
// see https://github.com/neos/neos-development-collection/pull/4988/
$this->catchUpProjection($projection::class, CatchUpOptions::create());
}
}

public function setUp(): void
{
$this->eventStore->setup();
Expand Down
21 changes: 4 additions & 17 deletions Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
namespace Neos\ContentRepository\Core\EventStore;

use Neos\ContentRepository\Core\CommandHandler\CommandResult;
use Neos\ContentRepository\Core\CommandHandler\PendingProjections;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Events;
Expand All @@ -23,9 +20,7 @@
{
public function __construct(
private EventStoreInterface $eventStore,
private ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private EventNormalizer $eventNormalizer,
private Projections $projections,
) {
}

Expand All @@ -34,7 +29,7 @@ public function __construct(
* @return CommandResult
* @throws ConcurrencyException in case the expectedVersion does not match
*/
public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
public function publishEvents(ContentRepository $contentRepository, EventsToPublish $eventsToPublish): CommandResult
{
if ($eventsToPublish->events->isEmpty()) {
return new CommandResult();
Expand All @@ -44,21 +39,13 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
$normalizedEvents = Events::fromArray(
$eventsToPublish->events->map($this->eventNormalizer->normalize(...))
);
$commitResult = $this->eventStore->commit(
$this->eventStore->commit(
$eventsToPublish->streamName,
$normalizedEvents,
$eventsToPublish->expectedVersion
);
// for performance reasons, we do not want to update ALL projections all the time; but instead only
// the projections which are interested in the events from above.
// Further details can be found in the docs of PendingProjections.
$pendingProjections = PendingProjections::fromProjectionsAndEventsAndSequenceNumber(
mhsdesign marked this conversation as resolved.
Show resolved Hide resolved
$this->projections,
$eventsToPublish->events,
$commitResult->highestCommittedSequenceNumber
);

$this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections);
$contentRepository->catchUpProjections();
return new CommandResult();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler;
use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
Expand All @@ -53,7 +52,6 @@ public function __construct(
ContentDimensionSourceInterface $contentDimensionSource,
Serializer $propertySerializer,
ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
) {
Expand Down Expand Up @@ -137,7 +135,6 @@ private function buildCommandBus(): CommandBus
new ContentStreamCommandHandler(
),
new WorkspaceCommandHandler(
$this->buildEventPersister(),
$this->projectionFactoryDependencies->eventStore,
$this->projectionFactoryDependencies->eventNormalizer,
),
Expand Down Expand Up @@ -166,9 +163,7 @@ private function buildEventPersister(): EventPersister
if (!$this->eventPersister) {
$this->eventPersister = new EventPersister(
$this->projectionFactoryDependencies->eventStore,
$this->projectionCatchUpTrigger,
$this->projectionFactoryDependencies->eventNormalizer,
$this->projectionsAndCatchUpHooks->projections,
);
}
return $this->eventPersister;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
final readonly class WorkspaceCommandHandler implements CommandHandlerInterface
{
public function __construct(
private EventPersister $eventPersister,
private EventStoreInterface $eventStore,
private EventNormalizer $eventNormalizer,
) {
Expand Down Expand Up @@ -202,6 +201,7 @@ private function handlePublishWorkspace(
$baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies);

$this->publishContentStream(
$commandHandlingDependencies,
$workspace->currentContentStreamId,
$baseWorkspace->workspaceName,
$baseWorkspace->currentContentStreamId
Expand Down Expand Up @@ -238,10 +238,11 @@ private function handlePublishWorkspace(
* @throws \Exception
*/
private function publishContentStream(
CommandHandlingDependencies $commandHandlingDependencies,
ContentStreamId $contentStreamId,
WorkspaceName $baseWorkspaceName,
ContentStreamId $baseContentStreamId,
): ?CommandResult {
): void {
$baseWorkspaceContentStreamName = ContentStreamEventStreamName::fromContentStreamId(
$baseContentStreamId
);
Expand Down Expand Up @@ -286,10 +287,10 @@ private function publishContentStream(
}

if (count($events) === 0) {
return null;
return;
}
try {
return $this->eventPersister->publishEvents(
$commandHandlingDependencies->publishEvents(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this seems to be the single use of this weird API, why don't we keep the EventPersister here?

new EventsToPublish(
$baseWorkspaceContentStreamName->getEventStreamName(),
Events::fromArray($events),
Expand Down Expand Up @@ -501,6 +502,7 @@ function () use ($matchingCommands, $commandHandlingDependencies, $baseWorkspace

// 5) take EVENTS(MATCHING) and apply them to base WS.
$this->publishContentStream(
$commandHandlingDependencies,
$command->contentStreamIdForMatchingPart,
$baseWorkspace->workspaceName,
$baseWorkspace->currentContentStreamId
Expand Down

This file was deleted.

Loading
Loading