Skip to content

Commit

Permalink
Move savepoint creation back on the subscription store
Browse files Browse the repository at this point in the history
This reverts commit dc5ff10.

We discussed that we dont want to ensure exactly once delivery for external projections. Adding transaction logic via traits for the projections increases logic there. And while with alot of effort we could bring exactly once delivery to work for most cases php could still decide to die in the small timeframe where we commit the one transaction and then the second. So there is no gurantee except when using a dedicated store: neos#5377
  • Loading branch information
mhsdesign committed Dec 3, 2024
1 parent fd9faa4 commit 175ab4c
Show file tree
Hide file tree
Showing 12 changed files with 31 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceRebaseFailed;
use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceWasRebased;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Infrastructure\ProjectionTransactionTrait;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
Expand All @@ -81,8 +80,6 @@
*/
final class DoctrineDbalContentGraphProjection implements ContentGraphProjectionInterface
{
use ProjectionTransactionTrait;

use ContentStream;
use NodeMove;
use NodeRemoval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Infrastructure\ProjectionTransactionTrait;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
Expand All @@ -53,8 +52,6 @@
*/
final class HypergraphProjection implements ContentGraphProjectionInterface
{
use ProjectionTransactionTrait;

use ContentStreamForking;
use NodeCreation;
use SubtreeTagging;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory;
use Neos\ContentRepository\Core\Infrastructure\ProjectionTransactionTrait;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
Expand All @@ -30,8 +29,6 @@
*/
final class DebugEventProjection implements ProjectionInterface
{
use ProjectionTransactionTrait;

private DebugEventProjectionState $state;

private \Closure|null $saboteur = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public function setUp(): void

$this->fakeProjection = $this->getMockBuilder(ProjectionInterface::class)->disableAutoReturnValueGeneration()->getMock();
$this->fakeProjection->method('getState')->willReturn(new class implements ProjectionStateInterface {});
$this->fakeProjection->expects(self::any())->method('transactional')->willReturnCallback(fn ($fn) => $fn())->willReturnCallback(fn ($fn) => $fn());

FakeProjectionFactory::setProjection(
'default',
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@ public function setUp(): void;
*/
public function status(): ProjectionStatus;

/**
* Must invoke the closure which will update the catchup hooks and {@see apply}.
* Additionally, to guarantee exactly once delivery and also to behave correct during exceptions (even fatal ones),
* a database transaction should be started, or if a transaction is already active on the same connection save points
* must be used and rolled back on error.
*
* @param-immediately-invoked-callable $closure
*/
public function transactional(\Closure $closure): void;

public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,16 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
$this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value));
continue;
}

$this->subscriptionStore->createSavepoint();
$error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id);
if ($error !== null) {
// ERROR Case:
// 1.) for the leftover events we are not including this failed subscription for catchup
// 1.) roll back the partially applied event on the subscriber
$this->subscriptionStore->rollbackSavepoint();
// 2.) for the leftover events we are not including this failed subscription for catchup
$subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id);
// 2.) update the subscription error state on either its unchanged or new position (if some events worked)
// 3.) update the subscription error state on either its unchanged or new position (if some events worked)
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ERROR,
Expand All @@ -356,6 +360,7 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
continue;
}
// HAPPY Case:
$this->subscriptionStore->releaseSavepoint();
$highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber;
}
$numberOfProcessedEvents++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public function update(
* @return T
*/
public function transactional(\Closure $closure): mixed;

public function createSavepoint(): void;

public function releaseSavepoint(): void;

public function rollbackSavepoint(): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void

public function handle(EventInterface $event, EventEnvelope $eventEnvelope): void
{
$this->projection->transactional(function () use ($event, $eventEnvelope) {
$this->catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$this->projection->apply($event, $eventEnvelope);
$this->catchUpHook?->onAfterEvent($event, $eventEnvelope);
});
$this->catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$this->projection->apply($event, $eventEnvelope);
$this->catchUpHook?->onAfterEvent($event, $eventEnvelope);
}

public function onAfterCatchUp(): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,19 @@ public function transactional(\Closure $closure): mixed
{
return $this->dbal->transactional($closure);
}

public function createSavepoint(): void
{
$this->dbal->createSavepoint('SUBSCRIBER');
}

public function releaseSavepoint(): void
{
$this->dbal->releaseSavepoint('SUBSCRIBER');
}

public function rollbackSavepoint(): void
{
$this->dbal->rollbackSavepoint('SUBSCRIBER');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Infrastructure\ProjectionTransactionTrait;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
Expand All @@ -41,8 +40,6 @@
*/
final class DocumentUriPathProjection implements ProjectionInterface, WithMarkStaleInterface
{
use ProjectionTransactionTrait;

public const COLUMN_TYPES_DOCUMENT_URIS = [
'shortcutTarget' => Types::JSON,
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory;
use Neos\ContentRepository\Core\Infrastructure\ProjectionTransactionTrait;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
Expand All @@ -53,8 +52,6 @@
*/
class ChangeProjection implements ProjectionInterface
{
use ProjectionTransactionTrait;

/**
* @var ChangeFinder|null Cache for the ChangeFinder returned by {@see getState()},
* so that always the same instance is returned
Expand Down

0 comments on commit 175ab4c

Please sign in to comment.