Skip to content

Commit

Permalink
Merge pull request #5392 from mhsdesign/task/subscription-engine-save…
Browse files Browse the repository at this point in the history
…point-simplication

TASK: Overhaul CatchUpHook error behaviour; At least once delivery for ERROR projections
  • Loading branch information
mhsdesign authored Dec 11, 2024
2 parents 79d4ec7 + 8f55975 commit 6423dc6
Show file tree
Hide file tree
Showing 37 changed files with 1,335 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceRebaseFailed;
use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceWasRebased;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Infrastructure\ProjectionTransactionTrait;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
Expand All @@ -81,8 +80,6 @@
*/
final class DoctrineDbalContentGraphProjection implements ContentGraphProjectionInterface
{
use ProjectionTransactionTrait;

use ContentStream;
use NodeMove;
use NodeRemoval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged;
use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Infrastructure\ProjectionTransactionTrait;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
Expand All @@ -53,8 +52,6 @@
*/
final class HypergraphProjection implements ContentGraphProjectionInterface
{
use ProjectionTransactionTrait;

use ContentStreamForking;
use NodeCreation;
use SubtreeTagging;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $event
{
}

public function onAfterBatchCompleted(): void
{
}

public function onAfterCatchUp(): void
{
// we only want to track relevant lock release calls (i.e. if we were in the event processing loop before)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory;
use Neos\ContentRepository\Core\Infrastructure\ProjectionTransactionTrait;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
Expand All @@ -30,8 +29,6 @@
*/
final class DebugEventProjection implements ProjectionInterface
{
use ProjectionTransactionTrait;

private DebugEventProjectionState $state;

private \Closure|null $saboteur = null;
Expand All @@ -53,6 +50,9 @@ public function setUp(): void
foreach ($this->determineRequiredSqlStatements() as $statement) {
$this->dbal->executeStatement($statement);
}
if ($this->saboteur) {
($this->saboteur)();
}
}

public function status(): ProjectionStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,23 @@ public function __construct(
}

/**
* @return iterable<SequenceNumber>
* @return array<SequenceNumber>
*/
public function findAppliedSequenceNumbers(): iterable
public function findAppliedSequenceNumbers(): array
{
return array_map(
fn ($value) => SequenceNumber::fromInteger((int)$value['sequenceNumber']),
fn (int $value) => SequenceNumber::fromInteger($value),
$this->findAppliedSequenceNumberValues()
);
}

/**
* @return array<int>
*/
public function findAppliedSequenceNumberValues(): array
{
return array_map(
fn ($value) => (int)$value['sequenceNumber'],
$this->dbal->fetchAllAssociative("SELECT sequenceNumber from {$this->tableNamePrefix}")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
*/
abstract class AbstractSubscriptionEngineTestCase extends TestCase // we don't use Flows functional test case as it would reset the database afterwards
{
protected static ContentRepositoryId $contentRepositoryId;

protected ContentRepository $contentRepository;

protected SubscriptionEngine $subscriptionEngine;
Expand All @@ -56,32 +58,37 @@ abstract class AbstractSubscriptionEngineTestCase extends TestCase // we don't u

protected CatchUpHookInterface&MockObject $catchupHookForFakeProjection;

public static function setUpBeforeClass(): void
{
static::$contentRepositoryId = ContentRepositoryId::fromString('t_subscription');
}

public function setUp(): void
{
if ($this->getObject(Connection::class)->getDatabasePlatform() instanceof PostgreSQLPlatform) {
$this->markTestSkipped('TODO: The content graph is not available in postgres currently: https://github.com/neos/neos-development-collection/issues/3855');
}
$contentRepositoryId = ContentRepositoryId::fromString('t_subscription');

$this->resetDatabase(
$this->getObject(Connection::class),
$contentRepositoryId,
self::$contentRepositoryId,
keepSchema: true
);

$this->fakeProjection = $this->getMockBuilder(ProjectionInterface::class)->disableAutoReturnValueGeneration()->getMock();
$this->fakeProjection->method('getState')->willReturn(new class implements ProjectionStateInterface {});
$this->fakeProjection->expects(self::any())->method('transactional')->willReturnCallback(fn ($fn) => $fn())->willReturnCallback(fn ($fn) => $fn());

FakeProjectionFactory::setProjection(
'default',
$this->fakeProjection
);

$this->secondFakeProjection = new DebugEventProjection(
sprintf('cr_%s_debug_projection', $contentRepositoryId->value),
$this->getObject(Connection::class)
);
if (!isset($this->secondFakeProjection)) {
$this->secondFakeProjection = new DebugEventProjection(
sprintf('cr_%s_debug_projection', self::$contentRepositoryId->value),
$this->getObject(Connection::class)
);
}

FakeProjectionFactory::setProjection(
'second',
Expand All @@ -98,9 +105,9 @@ public function setUp(): void
FakeNodeTypeManagerFactory::setConfiguration([]);
FakeContentDimensionSourceFactory::setWithoutDimensions();

$this->getObject(ContentRepositoryRegistry::class)->resetFactoryInstance($contentRepositoryId);
$this->getObject(ContentRepositoryRegistry::class)->resetFactoryInstance(self::$contentRepositoryId);

$this->setupContentRepositoryDependencies($contentRepositoryId);
$this->setupContentRepositoryDependencies(self::$contentRepositoryId);
}

final protected function setupContentRepositoryDependencies(ContentRepositoryId $contentRepositoryId)
Expand Down Expand Up @@ -180,7 +187,7 @@ final protected function commitExampleContentStreamEvent(): void
);
}

final protected function expectOkayStatus($subscriptionId, SubscriptionStatus $status, SequenceNumber $sequenceNumber): void
final protected function expectOkayStatus(string $subscriptionId, SubscriptionStatus $status, SequenceNumber $sequenceNumber): void
{
$actual = $this->subscriptionStatus($subscriptionId);
self::assertEquals(
Expand Down
Loading

0 comments on commit 6423dc6

Please sign in to comment.