Skip to content

Commit

Permalink
FEATURE: Introduce batching in subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Dec 7, 2024
1 parent 21318b2 commit 47fc20b
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,18 @@ public function __construct(
public function findAppliedSequenceNumbers(): iterable
{
return array_map(
fn ($value) => SequenceNumber::fromInteger((int)$value['sequenceNumber']),
fn (int $value) => SequenceNumber::fromInteger($value),
$this->findAppliedSequenceNumberValues()
);
}

/**
* @return iterable<int>
*/
public function findAppliedSequenceNumberValues(): iterable
{
return array_map(
fn ($value) => (int)$value['sequenceNumber'],
$this->dbal->fetchAllAssociative("SELECT sequenceNumber from {$this->tableNamePrefix}")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,57 @@ public function error_onAfterCatchUp_isIgnoredAndCollected_withProjectionError()
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);
}

/** @test */
public function error_onAfterEvent_stopsEngineAfterFirstBatch()
{
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
$this->subscriptionEngine->setup();

// commit two events. we expect that the hook will throw the first event and due to the batching its halted
$this->commitExampleContentStreamEvent();
$this->commitExampleContentStreamEvent();

$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::BOOTING);
$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class));
$exception = new \RuntimeException('This catchup hook is kaputt.');
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterEvent')->willThrowException(
$exception
);
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp');

self::assertEmpty(
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(0));

$expectedWrappedException = new CatchUpHookFailed(
'Hook "" failed "onAfterEvent": This catchup hook is kaputt.',
1733243960,
$exception,
[]
);

// one error
$result = $this->subscriptionEngine->boot(batchSize: 1);
self::assertEquals(
ProcessedResult::failed(
1,
Errors::fromArray([
Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException),
])
),
$result
);

// only one event is applied
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(1));
self::assertEquals(
[SequenceNumber::fromInteger(1)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;

final class CatchUpHookTest extends AbstractSubscriptionEngineTestCase
{
Expand Down Expand Up @@ -47,4 +48,123 @@ public function catchUpHooksAreExecutedAndCanAccessTheCorrectProjectionsState()

$expectOneHandledEvent();
}

/** @test */
public function catchUpBeforeAndAfterCatchupAreRunForZeroEvents()
{
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::never())->method('apply');
$this->subscriptionEngine->setup();

$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::BOOTING);
$this->catchupHookForFakeProjection->expects(self::never())->method('onBeforeEvent');
$this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent');
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp');

$result = $this->subscriptionEngine->boot();
self::assertNull($result->errors);

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(0));
self::assertEmpty($this->secondFakeProjection->getState()->findAppliedSequenceNumberValues());
}

/** @test */
public function catchUpBeforeAndAfterCatchupAreNotRunIfNoSubscriberMatches()
{
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::never())->method('apply');
$this->subscriptionEngine->setup();

$this->catchupHookForFakeProjection->expects(self::never())->method('onBeforeCatchUp');
$this->catchupHookForFakeProjection->expects(self::never())->method('onBeforeEvent');
$this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent');
$this->catchupHookForFakeProjection->expects(self::never())->method('onAfterCatchUp');

$result = $this->subscriptionEngine->catchUpActive();
self::assertNull($result->errors);
self::assertEquals(0, $result->numberOfProcessedEvents);

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(0));
self::assertEmpty($this->secondFakeProjection->getState()->findAppliedSequenceNumberValues());
}

public function provideValidBatchSizes(): iterable
{
yield 'none' => [null];
yield 'one' => [1];
yield 'two' => [2];
yield 'four' => [4];
yield 'ten' => [10];
}

/**
* @dataProvider provideValidBatchSizes
* @test
*/
public function catchUpHooksWithBatching(int|null $batchSize)
{
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::exactly(4))->method('apply');
$this->subscriptionEngine->setup();

// commit events (will be batched in chunks of two)
$this->commitExampleContentStreamEvent();
$this->commitExampleContentStreamEvent();
$this->commitExampleContentStreamEvent();
$this->commitExampleContentStreamEvent();

$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::BOOTING);
$this->catchupHookForFakeProjection->expects($i = self::exactly(4))->method('onBeforeEvent')->willReturnCallback(function ($_, EventEnvelope $eventEnvelope) use ($i) {
match($i->getInvocationCount()) {
1 => [
self::assertEquals(1, $eventEnvelope->sequenceNumber->value),
self::assertEquals([], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues())
],
2 => [
self::assertEquals(2, $eventEnvelope->sequenceNumber->value),
self::assertEquals([1], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues())
],
3 => [
self::assertEquals(3, $eventEnvelope->sequenceNumber->value),
self::assertEquals([1,2], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues())
],
4 => [
self::assertEquals(4, $eventEnvelope->sequenceNumber->value),
self::assertEquals([1,2,3], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues())
],
};
});
$this->catchupHookForFakeProjection->expects($i = self::exactly(4))->method('onAfterEvent')->willReturnCallback(function ($_, EventEnvelope $eventEnvelope) use ($i) {
match($i->getInvocationCount()) {
1 => [
self::assertEquals(1, $eventEnvelope->sequenceNumber->value),
self::assertEquals([1], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues())
],
2 => [
self::assertEquals(2, $eventEnvelope->sequenceNumber->value),
self::assertEquals([1,2], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues())
],
3 => [
self::assertEquals(3, $eventEnvelope->sequenceNumber->value),
self::assertEquals([1,2,3], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues())
],
4 => [
self::assertEquals(4, $eventEnvelope->sequenceNumber->value),
self::assertEquals([1,2,3,4], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues())
],
};
});
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp');

self::assertEmpty($this->secondFakeProjection->getState()->findAppliedSequenceNumberValues());

$result = $this->subscriptionEngine->boot(batchSize: $batchSize);
self::assertNull($result->errors);

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(4));
self::assertEquals([1,2,3,4], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

declare(strict_types=1);

namespace Neos\ContentRepository\BehavioralTests\Tests\Functional\Subscription;

use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\Event\SequenceNumber;

final class SubscriptionBatchingTest extends AbstractSubscriptionEngineTestCase
{
/** @test */
public function singleBatchSize()
{
$this->eventStore->setup();
// commit three events
$this->commitExampleContentStreamEvent();
$this->commitExampleContentStreamEvent();

$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok());
$this->fakeProjection->expects(self::exactly(2))->method('apply');
$this->subscriptionEngine->setup();

$this->expectOkayStatus('contentGraph', SubscriptionStatus::BOOTING, SequenceNumber::none());
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none());

$result = $this->subscriptionEngine->boot(batchSize: 1);
self::assertEquals(ProcessedResult::success(2), $result);

$this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2));
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2));

self::assertEquals(
[SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);
}

/** @test */
public function invalidBatchSizes()
{
$this->fakeProjection->expects(self::once())->method('setUp');
$this->subscriptionEngine->setup();

$e = null;
try {
$this->subscriptionEngine->boot(batchSize: 0);
} catch (\Throwable $e) {
}
self::assertInstanceOf(\InvalidArgumentException::class, $e);
self::assertEquals(1733597950, $e->getCode());

try {
$this->subscriptionEngine->catchUpActive(batchSize: -1);
} catch (\Throwable $e) {
}

self::assertInstanceOf(\InvalidArgumentException::class, $e);
self::assertEquals(1733597950, $e->getCode());
}
}
14 changes: 7 additions & 7 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public function handle(CommandInterface $command): void
if ($toPublish instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish);
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion);
$catchUpResult = $this->subscriptionEngine->catchUpActive();
if ($catchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($catchUpResult->errors);
$fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it.
if ($fullCatchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors);
}
return;
}
Expand All @@ -115,7 +115,7 @@ public function handle(CommandInterface $command): void
// we pass the exception into the generator (->throw), so it could be try-caught and reacted upon:
//
// try {
// yield EventsToPublish(...);
// yield new EventsToPublish(...);
// } catch (ConcurrencyException $e) {
// yield $this->reopenContentStream();
// throw $e;
Expand All @@ -130,9 +130,9 @@ public function handle(CommandInterface $command): void
} finally {
// We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled.
// Technically it would be acceptable for the catchup to fail here (due to hook errors) because all the events are already persisted.
$catchUpResult = $this->subscriptionEngine->catchUpActive();
if ($catchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($catchUpResult->errors);
$fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it.
if ($fullCatchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
*/
final readonly class ContentRepositoryMaintainer implements ContentRepositoryServiceInterface
{
private const REPLAY_BATCH_SIZE = 500;

/**
* @internal please use the {@see ContentRepositoryMaintainerFactory} instead!
*/
Expand Down Expand Up @@ -127,7 +129,7 @@ public function replaySubscription(SubscriptionId $subscriptionId, \Closure|null
if ($resetResult->errors !== null) {
return self::createErrorForReason('Reset failed:', $resetResult->errors);
}
$bootResult = $this->subscriptionEngine->boot(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback);
$bootResult = $this->subscriptionEngine->boot(SubscriptionEngineCriteria::create([$subscriptionId]), progressCallback: $progressCallback, batchSize: self::REPLAY_BATCH_SIZE);
if ($bootResult->errors !== null) {
return self::createErrorForReason('Catchup failed:', $bootResult->errors);
}
Expand All @@ -140,7 +142,7 @@ public function replayAllSubscriptions(\Closure|null $progressCallback = null):
if ($resetResult->errors !== null) {
return self::createErrorForReason('Reset failed:', $resetResult->errors);
}
$bootResult = $this->subscriptionEngine->boot(progressCallback: $progressCallback);
$bootResult = $this->subscriptionEngine->boot(progressCallback: $progressCallback, batchSize: self::REPLAY_BATCH_SIZE);
if ($bootResult->errors !== null) {
return self::createErrorForReason('Catchup failed:', $bootResult->errors);
}
Expand All @@ -163,7 +165,7 @@ public function reactivateSubscription(SubscriptionId $subscriptionId, \Closure|
if ($subscriptionStatus->subscriptionStatus === SubscriptionStatus::NEW) {
return new Error(sprintf('Subscription "%s" is not setup and cannot be reactivated.', $subscriptionId->value));
}
$reactivateResult = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback);
$reactivateResult = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([$subscriptionId]), progressCallback: $progressCallback, batchSize: self::REPLAY_BATCH_SIZE);
if ($reactivateResult->errors !== null) {
return self::createErrorForReason('Could not reactivate subscriber:', $reactivateResult->errors);
}
Expand Down
Loading

0 comments on commit 47fc20b

Please sign in to comment.