From 47fc20b6487c8038f41f66d28f1955b98e1b5e72 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Sat, 7 Dec 2024 20:23:13 +0100 Subject: [PATCH] FEATURE: Introduce batching in subscription engine --- .../TestSuite/DebugEventProjectionState.php | 13 +- .../Subscription/CatchUpHookErrorTest.php | 53 ++++ .../Subscription/CatchUpHookTest.php | 120 +++++++++ .../Subscription/SubscriptionBatchingTest.php | 64 +++++ .../Classes/ContentRepository.php | 14 +- .../Service/ContentRepositoryMaintainer.php | 8 +- .../Engine/SubscriptionEngine.php | 239 ++++++++++-------- 7 files changed, 394 insertions(+), 117 deletions(-) create mode 100644 Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionBatchingTest.php diff --git a/Neos.ContentRepository.BehavioralTests/Classes/TestSuite/DebugEventProjectionState.php b/Neos.ContentRepository.BehavioralTests/Classes/TestSuite/DebugEventProjectionState.php index 02dde323ad..da23036635 100644 --- a/Neos.ContentRepository.BehavioralTests/Classes/TestSuite/DebugEventProjectionState.php +++ b/Neos.ContentRepository.BehavioralTests/Classes/TestSuite/DebugEventProjectionState.php @@ -26,7 +26,18 @@ public function __construct( public function findAppliedSequenceNumbers(): iterable { return array_map( - fn ($value) => SequenceNumber::fromInteger((int)$value['sequenceNumber']), + fn (int $value) => SequenceNumber::fromInteger($value), + $this->findAppliedSequenceNumberValues() + ); + } + + /** + * @return iterable + */ + public function findAppliedSequenceNumberValues(): iterable + { + return array_map( + fn ($value) => (int)$value['sequenceNumber'], $this->dbal->fetchAllAssociative("SELECT sequenceNumber from {$this->tableNamePrefix}") ); } diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php index f0f481002c..851428d14e 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php @@ -307,4 +307,57 @@ public function error_onAfterCatchUp_isIgnoredAndCollected_withProjectionError() $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); } + + /** @test */ + public function error_onAfterEvent_stopsEngineAfterFirstBatch() + { + $this->eventStore->setup(); + $this->fakeProjection->expects(self::once())->method('setUp'); + $this->fakeProjection->expects(self::once())->method('apply'); + $this->subscriptionEngine->setup(); + + // commit two events. we expect that the hook will throw the first event and due to the batching its halted + $this->commitExampleContentStreamEvent(); + $this->commitExampleContentStreamEvent(); + + $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::BOOTING); + $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class)); + $exception = new \RuntimeException('This catchup hook is kaputt.'); + $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterEvent')->willThrowException( + $exception + ); + $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp'); + + self::assertEmpty( + $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() + ); + + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(0)); + + $expectedWrappedException = new CatchUpHookFailed( + 'Hook "" failed "onAfterEvent": This catchup hook is kaputt.', + 1733243960, + $exception, + [] + ); + + // one error + $result = $this->subscriptionEngine->boot(batchSize: 1); + self::assertEquals( + ProcessedResult::failed( + 1, + Errors::fromArray([ + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException), + ]) + ), + $result + ); + + // only one event is applied + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(1)); + self::assertEquals( + [SequenceNumber::fromInteger(1)], + $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() + ); + } } diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookTest.php index 56b9d63259..3450835390 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookTest.php @@ -7,6 +7,7 @@ use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\EventStore\Model\Event\SequenceNumber; +use Neos\EventStore\Model\EventEnvelope; final class CatchUpHookTest extends AbstractSubscriptionEngineTestCase { @@ -47,4 +48,123 @@ public function catchUpHooksAreExecutedAndCanAccessTheCorrectProjectionsState() $expectOneHandledEvent(); } + + /** @test */ + public function catchUpBeforeAndAfterCatchupAreRunForZeroEvents() + { + $this->eventStore->setup(); + $this->fakeProjection->expects(self::once())->method('setUp'); + $this->fakeProjection->expects(self::never())->method('apply'); + $this->subscriptionEngine->setup(); + + $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::BOOTING); + $this->catchupHookForFakeProjection->expects(self::never())->method('onBeforeEvent'); + $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent'); + $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp'); + + $result = $this->subscriptionEngine->boot(); + self::assertNull($result->errors); + + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(0)); + self::assertEmpty($this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()); + } + + /** @test */ + public function catchUpBeforeAndAfterCatchupAreNotRunIfNoSubscriberMatches() + { + $this->eventStore->setup(); + $this->fakeProjection->expects(self::once())->method('setUp'); + $this->fakeProjection->expects(self::never())->method('apply'); + $this->subscriptionEngine->setup(); + + $this->catchupHookForFakeProjection->expects(self::never())->method('onBeforeCatchUp'); + $this->catchupHookForFakeProjection->expects(self::never())->method('onBeforeEvent'); + $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent'); + $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterCatchUp'); + + $result = $this->subscriptionEngine->catchUpActive(); + self::assertNull($result->errors); + self::assertEquals(0, $result->numberOfProcessedEvents); + + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(0)); + self::assertEmpty($this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()); + } + + public function provideValidBatchSizes(): iterable + { + yield 'none' => [null]; + yield 'one' => [1]; + yield 'two' => [2]; + yield 'four' => [4]; + yield 'ten' => [10]; + } + + /** + * @dataProvider provideValidBatchSizes + * @test + */ + public function catchUpHooksWithBatching(int|null $batchSize) + { + $this->eventStore->setup(); + $this->fakeProjection->expects(self::once())->method('setUp'); + $this->fakeProjection->expects(self::exactly(4))->method('apply'); + $this->subscriptionEngine->setup(); + + // commit events (will be batched in chunks of two) + $this->commitExampleContentStreamEvent(); + $this->commitExampleContentStreamEvent(); + $this->commitExampleContentStreamEvent(); + $this->commitExampleContentStreamEvent(); + + $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::BOOTING); + $this->catchupHookForFakeProjection->expects($i = self::exactly(4))->method('onBeforeEvent')->willReturnCallback(function ($_, EventEnvelope $eventEnvelope) use ($i) { + match($i->getInvocationCount()) { + 1 => [ + self::assertEquals(1, $eventEnvelope->sequenceNumber->value), + self::assertEquals([], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()) + ], + 2 => [ + self::assertEquals(2, $eventEnvelope->sequenceNumber->value), + self::assertEquals([1], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()) + ], + 3 => [ + self::assertEquals(3, $eventEnvelope->sequenceNumber->value), + self::assertEquals([1,2], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()) + ], + 4 => [ + self::assertEquals(4, $eventEnvelope->sequenceNumber->value), + self::assertEquals([1,2,3], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()) + ], + }; + }); + $this->catchupHookForFakeProjection->expects($i = self::exactly(4))->method('onAfterEvent')->willReturnCallback(function ($_, EventEnvelope $eventEnvelope) use ($i) { + match($i->getInvocationCount()) { + 1 => [ + self::assertEquals(1, $eventEnvelope->sequenceNumber->value), + self::assertEquals([1], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()) + ], + 2 => [ + self::assertEquals(2, $eventEnvelope->sequenceNumber->value), + self::assertEquals([1,2], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()) + ], + 3 => [ + self::assertEquals(3, $eventEnvelope->sequenceNumber->value), + self::assertEquals([1,2,3], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()) + ], + 4 => [ + self::assertEquals(4, $eventEnvelope->sequenceNumber->value), + self::assertEquals([1,2,3,4], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()) + ], + }; + }); + $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp'); + + self::assertEmpty($this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()); + + $result = $this->subscriptionEngine->boot(batchSize: $batchSize); + self::assertNull($result->errors); + + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(4)); + self::assertEquals([1,2,3,4], $this->secondFakeProjection->getState()->findAppliedSequenceNumberValues()); + } } diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionBatchingTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionBatchingTest.php new file mode 100644 index 0000000000..960a755cc6 --- /dev/null +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionBatchingTest.php @@ -0,0 +1,64 @@ +eventStore->setup(); + // commit three events + $this->commitExampleContentStreamEvent(); + $this->commitExampleContentStreamEvent(); + + $this->fakeProjection->expects(self::once())->method('setUp'); + $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); + $this->fakeProjection->expects(self::exactly(2))->method('apply'); + $this->subscriptionEngine->setup(); + + $this->expectOkayStatus('contentGraph', SubscriptionStatus::BOOTING, SequenceNumber::none()); + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); + + $result = $this->subscriptionEngine->boot(batchSize: 1); + self::assertEquals(ProcessedResult::success(2), $result); + + $this->expectOkayStatus('contentGraph', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2)); + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2)); + + self::assertEquals( + [SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)], + $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() + ); + } + + /** @test */ + public function invalidBatchSizes() + { + $this->fakeProjection->expects(self::once())->method('setUp'); + $this->subscriptionEngine->setup(); + + $e = null; + try { + $this->subscriptionEngine->boot(batchSize: 0); + } catch (\Throwable $e) { + } + self::assertInstanceOf(\InvalidArgumentException::class, $e); + self::assertEquals(1733597950, $e->getCode()); + + try { + $this->subscriptionEngine->catchUpActive(batchSize: -1); + } catch (\Throwable $e) { + } + + self::assertInstanceOf(\InvalidArgumentException::class, $e); + self::assertEquals(1733597950, $e->getCode()); + } +} diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 6c5b4f239d..f308279ac4 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -98,9 +98,9 @@ public function handle(CommandInterface $command): void if ($toPublish instanceof EventsToPublish) { $eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish); $this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion); - $catchUpResult = $this->subscriptionEngine->catchUpActive(); - if ($catchUpResult->hadErrors()) { - throw CatchUpHadErrors::createFromErrors($catchUpResult->errors); + $fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it. + if ($fullCatchUpResult->hadErrors()) { + throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors); } return; } @@ -115,7 +115,7 @@ public function handle(CommandInterface $command): void // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // // try { - // yield EventsToPublish(...); + // yield new EventsToPublish(...); // } catch (ConcurrencyException $e) { // yield $this->reopenContentStream(); // throw $e; @@ -130,9 +130,9 @@ public function handle(CommandInterface $command): void } finally { // We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled. // Technically it would be acceptable for the catchup to fail here (due to hook errors) because all the events are already persisted. - $catchUpResult = $this->subscriptionEngine->catchUpActive(); - if ($catchUpResult->hadErrors()) { - throw CatchUpHadErrors::createFromErrors($catchUpResult->errors); + $fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it. + if ($fullCatchUpResult->hadErrors()) { + throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors); } } } diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php index 9b879a814c..8337dae565 100644 --- a/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php @@ -70,6 +70,8 @@ */ final readonly class ContentRepositoryMaintainer implements ContentRepositoryServiceInterface { + private const REPLAY_BATCH_SIZE = 500; + /** * @internal please use the {@see ContentRepositoryMaintainerFactory} instead! */ @@ -127,7 +129,7 @@ public function replaySubscription(SubscriptionId $subscriptionId, \Closure|null if ($resetResult->errors !== null) { return self::createErrorForReason('Reset failed:', $resetResult->errors); } - $bootResult = $this->subscriptionEngine->boot(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback); + $bootResult = $this->subscriptionEngine->boot(SubscriptionEngineCriteria::create([$subscriptionId]), progressCallback: $progressCallback, batchSize: self::REPLAY_BATCH_SIZE); if ($bootResult->errors !== null) { return self::createErrorForReason('Catchup failed:', $bootResult->errors); } @@ -140,7 +142,7 @@ public function replayAllSubscriptions(\Closure|null $progressCallback = null): if ($resetResult->errors !== null) { return self::createErrorForReason('Reset failed:', $resetResult->errors); } - $bootResult = $this->subscriptionEngine->boot(progressCallback: $progressCallback); + $bootResult = $this->subscriptionEngine->boot(progressCallback: $progressCallback, batchSize: self::REPLAY_BATCH_SIZE); if ($bootResult->errors !== null) { return self::createErrorForReason('Catchup failed:', $bootResult->errors); } @@ -163,7 +165,7 @@ public function reactivateSubscription(SubscriptionId $subscriptionId, \Closure| if ($subscriptionStatus->subscriptionStatus === SubscriptionStatus::NEW) { return new Error(sprintf('Subscription "%s" is not setup and cannot be reactivated.', $subscriptionId->value)); } - $reactivateResult = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback); + $reactivateResult = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([$subscriptionId]), progressCallback: $progressCallback, batchSize: self::REPLAY_BATCH_SIZE); if ($reactivateResult->errors !== null) { return self::createErrorForReason('Could not reactivate subscriber:', $reactivateResult->errors); } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php index 7cbddd43cb..c146f8ecd3 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php @@ -73,27 +73,27 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result return $errors === [] ? Result::success() : Result::failed(Errors::fromArray($errors)); } - public function boot(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult + public function boot(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null, int $batchSize = null): ProcessedResult { $criteria ??= SubscriptionEngineCriteria::noConstraints(); return $this->processExclusively( - fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::BOOTING]), $progressCallback) + fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::BOOTING]), $progressCallback, $batchSize) ); } - public function catchUpActive(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult + public function catchUpActive(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null, int $batchSize = null): ProcessedResult { $criteria ??= SubscriptionEngineCriteria::noConstraints(); return $this->processExclusively( - fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::ACTIVE]), $progressCallback) + fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::ACTIVE]), $progressCallback, $batchSize) ); } - public function reactivate(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult + public function reactivate(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null, int $batchSize = null): ProcessedResult { $criteria ??= SubscriptionEngineCriteria::noConstraints(); return $this->processExclusively( - fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::ERROR, SubscriptionStatus::DETACHED]), $progressCallback) + fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::ERROR, SubscriptionStatus::DETACHED]), $progressCallback, $batchSize) ); } @@ -256,8 +256,16 @@ private function resetSubscription(Subscription $subscription): ?Error return null; } - private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, SubscriptionStatusFilter $status, \Closure $progressClosure = null): ProcessedResult + /** + * @param \Closure|null $progressCallback The callback that is invoked for every {@see EventEnvelope} that is processed per subscriber + * @param int|null $batchSize Number of events to process before the transaction is commited and reopened. (defaults to all events). + */ + private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, SubscriptionStatusFilter $status, \Closure|null $progressCallback, int|null $batchSize): ProcessedResult { + if ($batchSize !== null && $batchSize <= 0) { + throw new \InvalidArgumentException(sprintf('Invalid batchSize %d specified, must be either NULL or a positive integer.', $batchSize), 1733597950); + } + $this->logger?->info(sprintf('Subscription Engine: Start catching up subscriptions in states %s.', join(',', $status->toStringArray()))); $subscriptionsToInvokeBeforeAndAfterCatchUpHooks = Subscriptions::none(); @@ -267,118 +275,137 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs /** @var array $errors */ $errors = []; - $this->subscriptionStore->transactional(function () use ($subscriptionCriteria, $progressClosure, &$subscriptionsToInvokeBeforeAndAfterCatchUpHooks, &$numberOfProcessedEvents, &$errors) { - $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate($subscriptionCriteria); - foreach ($subscriptionsToCatchup as $subscription) { - if (!$this->subscribers->contain($subscription->id)) { - // mark detached subscriptions as we cannot handle them and exclude them from catchup - $this->subscriptionStore->update( - $subscription->id, - status: SubscriptionStatus::DETACHED, - position: $subscription->position, - subscriptionError: null, - ); - $this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value)); - $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); - } - } + do { + /** + * If batching is enabled, the {@see $continueBatching} flag will indicate that the last run was stopped and continuation is necessary to handle the rest of the events. + * It's possible that batching stops at the last event, in that case the transaction is still reopened to set the active state correctly. + */ + $continueBatching = $this->subscriptionStore->transactional(function () use ($subscriptionCriteria, $progressCallback, $batchSize, &$subscriptionsToInvokeBeforeAndAfterCatchUpHooks, &$numberOfProcessedEvents, &$errors) { + $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate($subscriptionCriteria); + if ($numberOfProcessedEvents === 0) { + // first batch + foreach ($subscriptionsToCatchup as $subscription) { + if (!$this->subscribers->contain($subscription->id)) { + // mark detached subscriptions as we cannot handle them and exclude them from catchup + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::DETACHED, + position: $subscription->position, + subscriptionError: null, + ); + $this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value)); + $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); + } + } - if ($subscriptionsToCatchup->isEmpty()) { - $this->logger?->info('Subscription Engine: No subscriptions matched criteria. Finishing catch up.'); - return; - } + if ($subscriptionsToCatchup->isEmpty()) { + $this->logger?->info('Subscription Engine: No subscriptions matched criteria. Finishing catch up.'); + return false; + } - $subscriptionsToInvokeBeforeAndAfterCatchUpHooks = $subscriptionsToCatchup; - foreach ($subscriptionsToInvokeBeforeAndAfterCatchUpHooks as $subscription) { - try { - $this->subscribers->get($subscription->id)->catchUpHook?->onBeforeCatchUp($subscription->status); - } catch (\Throwable $e) { - $errors[] = Error::forSubscription($subscription->id, $e); + $subscriptionsToInvokeBeforeAndAfterCatchUpHooks = $subscriptionsToCatchup; + foreach ($subscriptionsToInvokeBeforeAndAfterCatchUpHooks as $subscription) { + try { + $this->subscribers->get($subscription->id)->catchUpHook?->onBeforeCatchUp($subscription->status); + } catch (\Throwable $e) { + $errors[] = Error::forSubscription($subscription->id, $e); + } + } } - } - $startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none(); - $this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value)); + $startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none(); + $this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value)); - /** @var array $highestSequenceNumberForSubscriber */ - $highestSequenceNumberForSubscriber = []; + /** @var array $highestSequenceNumberForSubscriber */ + $highestSequenceNumberForSubscriber = []; - $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber); - foreach ($eventStream as $eventEnvelope) { - $sequenceNumber = $eventEnvelope->sequenceNumber; - if ($numberOfProcessedEvents > 0) { - $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value)); - } - if ($progressClosure !== null) { - $progressClosure($eventEnvelope); - } - $domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event); - foreach ($subscriptionsToCatchup as $subscription) { - if ($subscription->position->value >= $sequenceNumber->value) { - $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); - continue; + $continueBatching = false; + $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber); + foreach ($eventStream as $eventEnvelope) { + $sequenceNumber = $eventEnvelope->sequenceNumber; + if ($numberOfProcessedEvents > 0) { + $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value)); } - $subscriber = $this->subscribers->get($subscription->id); - - try { - $subscriber->catchUpHook?->onBeforeEvent($domainEvent, $eventEnvelope); - } catch (\Throwable $e) { - $errors[] = Error::forSubscription($subscription->id, $e); + if ($progressCallback !== null) { + $progressCallback($eventEnvelope); } - - $this->subscriptionStore->createSavepoint(); - try { - $subscriber->projection->apply($domainEvent, $eventEnvelope); - } catch (\Throwable $e) { - // ERROR Case: - $this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s" (sequence number: %d): %s', $subscriber::class, $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value, $e->getMessage())); - $error = Error::forSubscription($subscription->id, $e); - - // 1.) roll back the partially applied event on the subscriber - $this->subscriptionStore->rollbackSavepoint(); - // 2.) for the leftover events we are not including this failed subscription for catchup - $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); - // 3.) update the subscription error state on either its unchanged or new position (if some events worked) - $this->subscriptionStore->update( - $subscription->id, - status: SubscriptionStatus::ERROR, - position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, - subscriptionError: SubscriptionError::fromPreviousStatusAndException( - $subscription->status, - $error->throwable - ), - ); - $errors[] = $error; - continue; + $domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event); + foreach ($subscriptionsToCatchup as $subscription) { + if ($subscription->position->value >= $sequenceNumber->value) { + $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); + continue; + } + $subscriber = $this->subscribers->get($subscription->id); + + try { + $subscriber->catchUpHook?->onBeforeEvent($domainEvent, $eventEnvelope); + } catch (\Throwable $e) { + $errors[] = Error::forSubscription($subscription->id, $e); + } + + $this->subscriptionStore->createSavepoint(); + try { + $subscriber->projection->apply($domainEvent, $eventEnvelope); + } catch (\Throwable $e) { + // ERROR Case: + $this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s" (sequence number: %d): %s', $subscriber::class, $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value, $e->getMessage())); + $error = Error::forSubscription($subscription->id, $e); + + // 1.) roll back the partially applied event on the subscriber + $this->subscriptionStore->rollbackSavepoint(); + // 2.) for the leftover events we are not including this failed subscription for catchup + $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); + // 3.) update the subscription error state on either its unchanged or new position (if some events worked) + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::ERROR, + position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, + subscriptionError: SubscriptionError::fromPreviousStatusAndException( + $subscription->status, + $error->throwable + ), + ); + $errors[] = $error; + continue; + } + // HAPPY Case: + $this->logger?->debug(sprintf('Subscription Engine: Subscriber "%s" for "%s" processed the event "%s" (sequence number: %d).', substr(strrchr($subscriber::class, '\\') ?: '', 1), $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value)); + $this->subscriptionStore->releaseSavepoint(); + $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; + + try { + $subscriber->catchUpHook?->onAfterEvent($domainEvent, $eventEnvelope); + } catch (\Throwable $e) { + $errors[] = Error::forSubscription($subscription->id, $e); + } } - // HAPPY Case: - $this->logger?->debug(sprintf('Subscription Engine: Subscriber "%s" for "%s" processed the event "%s" (sequence number: %d).', substr(strrchr($subscriber::class, '\\') ?: '', 1), $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value)); - $this->subscriptionStore->releaseSavepoint(); - $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; - - try { - $subscriber->catchUpHook?->onAfterEvent($domainEvent, $eventEnvelope); - } catch (\Throwable $e) { - $errors[] = Error::forSubscription($subscription->id, $e); + $numberOfProcessedEvents++; + if ($batchSize !== null && $numberOfProcessedEvents % $batchSize === 0) { + $continueBatching = true; + $this->logger?->info(sprintf('Subscription Engine: Batch completed with %d events', $numberOfProcessedEvents)); + break; } } - $numberOfProcessedEvents++; - } - foreach ($subscriptionsToCatchup as $subscription) { - // after catchup mark all subscriptions as active, so they are triggered automatically now. - // The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position - $this->subscriptionStore->update( - $subscription->id, - status: SubscriptionStatus::ACTIVE, - position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, - subscriptionError: null, - ); - if ($subscription->status !== SubscriptionStatus::ACTIVE) { - $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value)); + foreach ($subscriptionsToCatchup as $subscription) { + // after catchup mark all subscriptions as active, so they are triggered automatically now. + // The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position + $this->subscriptionStore->update( + $subscription->id, + status: $continueBatching === false ? SubscriptionStatus::ACTIVE : $subscription->status, + position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, + subscriptionError: null, + ); + if ($continueBatching === false && $subscription->status !== SubscriptionStatus::ACTIVE) { + $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting', $subscription->id->value)); + } } + $this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors))); + return $continueBatching; + }); + if ($errors !== []) { + break; } - $this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors))); - }); + } while ($continueBatching === true); // todo do we need want to invoke for failed projections onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent to "shutdown" this catchup iteration? // note that a catchup error in onAfterEvent would bubble up directly and never invoke onAfterCatchUp