From eb0d792d38a3b9fec8628258f19bf8a3a42baed6 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Wed, 27 Nov 2024 21:21:24 +0100 Subject: [PATCH] FEATURE: Implement `reactivateSubscription` Reverts a8f246be1bbffa5259cd135b27bdfa4cb9446ba9 and 73e10975cb872a2013a7acf43edd8f498bee8e0e as setup doesnt do that task anymore. Similar to: https://github.com/patchlevel/event-sourcing/blob/b8591c56b21b049f46bead8e7ab424fd2afe9917/src/Subscription/Engine/DefaultSubscriptionEngine.php#L624 --- .../Subscription/ProjectionErrorTest.php | 153 ++++++++--- .../SubscriptionDetachedStatusTest.php | 10 +- .../Service/ContentRepositoryMaintainer.php | 24 +- .../Engine/SubscriptionEngine.php | 254 ++++++++++-------- 4 files changed, 272 insertions(+), 169 deletions(-) diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php index 53ded4f33c..78d3054c6b 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php @@ -12,13 +12,14 @@ use Neos\ContentRepository\Core\Subscription\Engine\Error; use Neos\ContentRepository\Core\Subscription\Engine\Errors; use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult; +use Neos\ContentRepository\Core\Subscription\Engine\Result; +use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria; use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionError; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; -use PHPUnit\Framework\MockObject\Stub\Exception as WillThrowException; final class ProjectionErrorTest extends AbstractSubscriptionEngineTestCase { @@ -76,11 +77,13 @@ public function projectionWithError() } /** @test */ - public function fixFailedProjection() + public function fixFailedProjectionViaReset() { $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); + $this->fakeProjection->expects(self::once())->method('resetState'); + $this->fakeProjection->expects(self::exactly(2))->method('apply'); $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); @@ -88,13 +91,15 @@ public function fixFailedProjection() $this->commitExampleContentStreamEvent(); // catchup active tries to apply the commited event - $this->fakeProjection->expects(self::exactly(2))->method('apply')->with(self::isInstanceOf(ContentStreamWasCreated::class))->willReturnOnConsecutiveCalls( - new WillThrowException($exception = new \RuntimeException('This projection is kaputt.')), - null // okay again - ); + $exception = new \RuntimeException('This projection is kaputt.'); + $this->secondFakeProjection->injectSaboteur(function (EventEnvelope $eventEnvelope) use ($exception) { + self::assertEquals(SequenceNumber::fromInteger(1), $eventEnvelope->sequenceNumber); + self::assertEquals('ContentStreamWasCreated', $eventEnvelope->event->type->value); + throw $exception; + }); $expectedFailure = ProjectionSubscriptionStatus::create( - subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'), + subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), subscriptionStatus: SubscriptionStatus::ERROR, subscriptionPosition: SequenceNumber::none(), subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception), @@ -106,40 +111,121 @@ public function fixFailedProjection() self::assertEquals( $expectedFailure, - $this->subscriptionStatus('Vendor.Package:FakeProjection') + $this->subscriptionStatus('Vendor.Package:SecondFakeProjection') ); - // catchup active does not change anything + $this->secondFakeProjection->killSaboteur(); + + $result = $this->subscriptionEngine->reset(); + self::assertNull($result->errors); + + // expect the subscriptionError to be reset to null + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); + + $result = $this->subscriptionEngine->boot(); + self::assertNull($result->errors); + + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); + } + + /** @test */ + public function irreparableProjection() + { + // test ways NOT to fix a projection :) + $this->eventStore->setup(); + $this->fakeProjection->expects(self::exactly(2))->method('setUp'); + $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); + $this->fakeProjection->expects(self::exactly(2))->method('apply'); + $this->fakeProjection->expects(self::once())->method('resetState'); + $this->subscriptionEngine->setup(); + $this->subscriptionEngine->boot(); + + // commit an event + $this->commitExampleContentStreamEvent(); + + $exception = new \RuntimeException('This projection is kaputt.'); + $this->secondFakeProjection->injectSaboteur(function (EventEnvelope $eventEnvelope) use ($exception) { + self::assertEquals(SequenceNumber::fromInteger(1), $eventEnvelope->sequenceNumber); + self::assertEquals('ContentStreamWasCreated', $eventEnvelope->event->type->value); + throw $exception; + }); + + $expectedFailure = ProjectionSubscriptionStatus::create( + subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), + subscriptionStatus: SubscriptionStatus::ERROR, + subscriptionPosition: SequenceNumber::none(), + subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception), + setupStatus: ProjectionStatus::ok(), + ); + + // catchup active tries to apply the commited event + $result = $this->subscriptionEngine->catchUpActive(); + // but fails + self::assertTrue($result->hasFailed()); + self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection')); + + // a second catchup active does not change anything $result = $this->subscriptionEngine->catchUpActive(); self::assertEquals(ProcessedResult::success(0), $result); + self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection')); + // boot neither $result = $this->subscriptionEngine->boot(); self::assertEquals(ProcessedResult::success(0), $result); - // still the same state + self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection')); + + // setup neither + $result = $this->subscriptionEngine->setup(); + self::assertEquals(Result::success(), $result); + self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection')); + + // reactivation will attempt to retry fix this, but can only work if the projection is repaired and will lead to an error otherwise: + $result = $this->subscriptionEngine->reactivate(); self::assertEquals( - $expectedFailure, - $this->subscriptionStatus('Vendor.Package:FakeProjection') + ProcessedResult::failed(1, Errors::fromArray([ + Error::fromSubscriptionIdAndException(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $exception) + ])), + $result ); - $this->fakeProjection->expects(self::once())->method('resetState'); + self::assertEquals( + ProjectionSubscriptionStatus::create( + subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), + subscriptionStatus: SubscriptionStatus::ERROR, + subscriptionPosition: SequenceNumber::none(), + // previous state is now an error too also error: + subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ERROR, $exception), + setupStatus: ProjectionStatus::ok(), + ), + $this->subscriptionStatus('Vendor.Package:SecondFakeProjection') + ); + // expect the subscriptionError to be reset to null $result = $this->subscriptionEngine->reset(); self::assertNull($result->errors); + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); - // expect the subscriptionError to be reset to null - $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); - + // but booting will rethrow that error :D $result = $this->subscriptionEngine->boot(); - self::assertNull($result->errors); - - $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); + self::assertTrue($result->hasFailed()); + self::assertEquals( + ProjectionSubscriptionStatus::create( + subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), + subscriptionStatus: SubscriptionStatus::ERROR, + subscriptionPosition: SequenceNumber::none(), + // previous state is now booting + subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::BOOTING, $exception), + setupStatus: ProjectionStatus::ok(), + ), + $this->subscriptionStatus('Vendor.Package:SecondFakeProjection') + ); } /** @test */ public function projectionIsRolledBackAfterError() { $this->eventStore->setup(); - $this->fakeProjection->expects(self::exactly(2))->method('setUp'); + $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::once())->method('apply'); $result = $this->subscriptionEngine->setup(); self::assertNull($result->errors); @@ -184,16 +270,11 @@ public function projectionIsRolledBackAfterError() $this->secondFakeProjection->killSaboteur(); - $result = $this->subscriptionEngine->setup(); - self::assertNull($result->errors); - - // subscriptionError is reset - $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none()); - - // catchup after fix - $result = $this->subscriptionEngine->boot(); + // reactivate and catchup + $result = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:SecondFakeProjection')])); self::assertNull($result->errors); + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); self::assertEquals( [SequenceNumber::fromInteger(1)], $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() @@ -204,7 +285,7 @@ public function projectionIsRolledBackAfterError() public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents() { $this->eventStore->setup(); - $this->fakeProjection->expects(self::exactly(2))->method('setUp'); + $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::exactly(2))->method('apply'); $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); @@ -255,20 +336,12 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents() $this->secondFakeProjection->killSaboteur(); - $result = $this->subscriptionEngine->setup(); - self::assertNull($result->errors); - - // subscriptionError is reset, but the position is preserved - $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(1)); - self::assertEquals( - [SequenceNumber::fromInteger(1)], - $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() - ); - // catchup after fix - $result = $this->subscriptionEngine->boot(); + $result = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:SecondFakeProjection')])); self::assertNull($result->errors); + // subscriptionError is reset, and the position is advanced if there were events + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2)); self::assertEquals( [SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)], $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionDetachedStatusTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionDetachedStatusTest.php index dabdcce9ca..5b073a4db9 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionDetachedStatusTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/SubscriptionDetachedStatusTest.php @@ -7,6 +7,7 @@ use Neos\ContentRepository\Core\Projection\ProjectionStatus; use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult; +use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria; use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; @@ -97,7 +98,7 @@ public function projectionIsDetachedOnCatchupActive() /** @test */ public function projectionIsDetachedOnSetupAndReattachedIfPossible() { - $this->fakeProjection->expects(self::exactly(2))->method('setUp'); + $this->fakeProjection->expects(self::once())->method('setUp'); $this->fakeProjection->expects(self::once())->method('apply'); $this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok()); @@ -160,9 +161,10 @@ public function projectionIsDetachedOnSetupAndReattachedIfPossible() $this->subscriptionStatus('Vendor.Package:FakeProjection') ); - // setup does re-attach as the projection is found again - $this->subscriptionEngine->setup(); + // reactivate does re-attach as the projection if its found again + $result = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:FakeProjection')])); + self::assertNull($result->errors); - $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(1)); + $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); } } diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php index a8e725dc3b..9b879a814c 100644 --- a/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php @@ -85,14 +85,14 @@ public function setUp(): Error|null $eventStoreIsEmpty = iterator_count($this->eventStore->load(VirtualStreamName::all())->limit(1)) === 0; $setupResult = $this->subscriptionEngine->setup(); if ($setupResult->errors !== null) { - return self::createErrorForReason('setup', $setupResult->errors); + return self::createErrorForReason('Setup failed:', $setupResult->errors); } if ($eventStoreIsEmpty) { // note: possibly introduce $skipBooting flag instead // see https://github.com/patchlevel/event-sourcing/blob/b8591c56b21b049f46bead8e7ab424fd2afe9917/src/Subscription/Engine/DefaultSubscriptionEngine.php#L42 $bootResult = $this->subscriptionEngine->boot(); if ($bootResult->errors !== null) { - return self::createErrorForReason('initial catchup', $bootResult->errors); + return self::createErrorForReason('Initial catchup failed:', $bootResult->errors); } } return null; @@ -125,11 +125,11 @@ public function replaySubscription(SubscriptionId $subscriptionId, \Closure|null } $resetResult = $this->subscriptionEngine->reset(SubscriptionEngineCriteria::create([$subscriptionId])); if ($resetResult->errors !== null) { - return self::createErrorForReason('reset', $resetResult->errors); + return self::createErrorForReason('Reset failed:', $resetResult->errors); } $bootResult = $this->subscriptionEngine->boot(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback); if ($bootResult->errors !== null) { - return self::createErrorForReason('catchup', $bootResult->errors); + return self::createErrorForReason('Catchup failed:', $bootResult->errors); } return null; } @@ -138,11 +138,11 @@ public function replayAllSubscriptions(\Closure|null $progressCallback = null): { $resetResult = $this->subscriptionEngine->reset(); if ($resetResult->errors !== null) { - return self::createErrorForReason('reset', $resetResult->errors); + return self::createErrorForReason('Reset failed:', $resetResult->errors); } $bootResult = $this->subscriptionEngine->boot(progressCallback: $progressCallback); if ($bootResult->errors !== null) { - return self::createErrorForReason('catchup', $bootResult->errors); + return self::createErrorForReason('Catchup failed:', $bootResult->errors); } return null; } @@ -163,8 +163,10 @@ public function reactivateSubscription(SubscriptionId $subscriptionId, \Closure| if ($subscriptionStatus->subscriptionStatus === SubscriptionStatus::NEW) { return new Error(sprintf('Subscription "%s" is not setup and cannot be reactivated.', $subscriptionId->value)); } - - // todo implement https://github.com/patchlevel/event-sourcing/blob/b8591c56b21b049f46bead8e7ab424fd2afe9917/src/Subscription/Engine/DefaultSubscriptionEngine.php#L624 + $reactivateResult = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback); + if ($reactivateResult->errors !== null) { + return self::createErrorForReason('Could not reactivate subscriber:', $reactivateResult->errors); + } return null; } @@ -183,12 +185,12 @@ public function prune(): Error|null } $resetResult = $this->subscriptionEngine->reset(); if ($resetResult->errors !== null) { - return self::createErrorForReason('reset', $resetResult->errors); + return self::createErrorForReason('Reset failed:', $resetResult->errors); } // note: possibly introduce $skipBooting flag like for setup $bootResult = $this->subscriptionEngine->boot(); if ($bootResult->errors !== null) { - return self::createErrorForReason('booting', $bootResult->errors); + return self::createErrorForReason('Catchup failed:', $bootResult->errors); } return null; } @@ -196,7 +198,7 @@ public function prune(): Error|null private static function createErrorForReason(string $method, Errors $errors): Error { $message = []; - $message[] = sprintf('%s produced the following error%s', $method, $errors->count() === 1 ? '' : 's'); + $message[] = sprintf('%s: Following error%s', $method, $errors->count() === 1 ? '' : 's'); foreach ($errors as $error) { $message[] = sprintf(' Subscription "%s": %s', $error->subscriptionId->value, $error->message); } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php index 0038935ec7..8d556ec801 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php @@ -19,6 +19,7 @@ use Neos\ContentRepository\Core\Subscription\Subscription; use Neos\ContentRepository\Core\Subscription\SubscriptionError; use Neos\ContentRepository\Core\Subscription\SubscriptionId; +use Neos\ContentRepository\Core\Subscription\Subscriptions; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionStatusCollection; use Neos\ContentRepository\Core\Subscription\SubscriptionStatusFilter; @@ -60,9 +61,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ SubscriptionStatus::NEW, SubscriptionStatus::BOOTING, - SubscriptionStatus::ACTIVE, - SubscriptionStatus::DETACHED, - SubscriptionStatus::ERROR, + SubscriptionStatus::ACTIVE ]))); if ($subscriptions->isEmpty()) { $this->logger?->info('Subscription Engine: No subscriptions found.'); // todo not happy? Because there must be at least the content graph?!! @@ -80,12 +79,47 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result public function boot(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult { - return $this->processExclusively(fn () => $this->catchUpSubscriptions($criteria ?? SubscriptionEngineCriteria::noConstraints(), SubscriptionStatus::BOOTING, $progressCallback)); + $criteria ??= SubscriptionEngineCriteria::noConstraints(); + return $this->processExclusively(fn () => $this->subscriptionStore->transactional( + function () use ($criteria, $progressCallback) { + $this->logger?->info('Subscription Engine: Start catching up subscriptions in state "BOOTING".'); + $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate( + SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatus::BOOTING) + ); + return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback); + }) + ); } public function catchUpActive(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult { - return $this->processExclusively(fn () => $this->catchUpSubscriptions($criteria ?? SubscriptionEngineCriteria::noConstraints(), SubscriptionStatus::ACTIVE, $progressCallback)); + $criteria ??= SubscriptionEngineCriteria::noConstraints(); + return $this->processExclusively(fn () => $this->subscriptionStore->transactional( + function () use ($criteria, $progressCallback) { + $this->logger?->info('Subscription Engine: Start catching up subscriptions in state "ACTIVE".'); + $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate( + SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatus::ACTIVE) + ); + return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback); + }) + ); + } + + public function reactivate(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult + { + $criteria ??= SubscriptionEngineCriteria::noConstraints(); + return $this->processExclusively(fn () => $this->subscriptionStore->transactional( + function () use ($criteria, $progressCallback) { + $this->logger?->info('Subscription Engine: Start catching up subscriptions in state "ACTIVE".'); + $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate( + SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ + SubscriptionStatus::ERROR, + SubscriptionStatus::DETACHED, + ])) + ); + return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback); + }) + ); } public function reset(SubscriptionEngineCriteria|null $criteria = null): Result @@ -260,129 +294,121 @@ private function resetSubscription(Subscription $subscription): ?Error return null; } - private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, SubscriptionStatus $subscriptionStatus, \Closure $progressClosure = null): ProcessedResult + private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Closure $progressClosure = null): ProcessedResult { - $this->logger?->info(sprintf('Subscription Engine: Start catching up subscriptions in state "%s".', $subscriptionStatus->value)); - - $subscriptionEngineCriteria = SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, $subscriptionStatus); - return $this->subscriptionStore->transactional( - function () use ($subscriptionEngineCriteria, $subscriptionStatus, $progressClosure) { - $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate($subscriptionEngineCriteria); - foreach ($subscriptionsToCatchup as $subscription) { - if (!$this->subscribers->contain($subscription->id)) { - // mark detached subscriptions as we cannot handle them and exclude them from catchup - $this->subscriptionStore->update( - $subscription->id, - status: SubscriptionStatus::DETACHED, - position: $subscription->position, - subscriptionError: null, - ); - $this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value)); - $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); - } - } + foreach ($subscriptionsToCatchup as $subscription) { + if (!$this->subscribers->contain($subscription->id)) { + // mark detached subscriptions as we cannot handle them and exclude them from catchup + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::DETACHED, + position: $subscription->position, + subscriptionError: null, + ); + $this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value)); + $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); + } + } - if ($subscriptionsToCatchup->isEmpty()) { - $this->logger?->info(sprintf('Subscription Engine: No subscriptions in state "%s". Finishing catch up', $subscriptionStatus->value)); - return ProcessedResult::success(0); - } + if ($subscriptionsToCatchup->isEmpty()) { + $this->logger?->info('Subscription Engine: No subscriptions matched criteria. Finishing catch up.'); + return ProcessedResult::success(0); + } - foreach ($subscriptionsToCatchup as $subscription) { - try { - $this->subscribers->get($subscription->id)->onBeforeCatchUp($subscription->status); - } catch (\Throwable $e) { - // analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error. - $message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage()); - $this->logger?->critical($message); - throw new CatchUpFailed($message, 1732374000, $e); - } - } - $startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none(); - $this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value)); + foreach ($subscriptionsToCatchup as $subscription) { + try { + $this->subscribers->get($subscription->id)->onBeforeCatchUp($subscription->status); + } catch (\Throwable $e) { + // analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error. + $message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage()); + $this->logger?->critical($message); + throw new CatchUpFailed($message, 1732374000, $e); + } + } + $startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none(); + $this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value)); - /** @var array $errors */ - $errors = []; - $numberOfProcessedEvents = 0; - /** @var array $highestSequenceNumberForSubscriber */ - $highestSequenceNumberForSubscriber = []; + /** @var array $errors */ + $errors = []; + $numberOfProcessedEvents = 0; + /** @var array $highestSequenceNumberForSubscriber */ + $highestSequenceNumberForSubscriber = []; - $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber); - foreach ($eventStream as $eventEnvelope) { - $sequenceNumber = $eventEnvelope->sequenceNumber; - if ($numberOfProcessedEvents > 0) { - $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value)); - } - if ($progressClosure !== null) { - $progressClosure($eventEnvelope); - } - $domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event); - foreach ($subscriptionsToCatchup as $subscription) { - if ($subscription->position->value >= $sequenceNumber->value) { - $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); - continue; - } - $this->subscriptionStore->createSavepoint(); - $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id); - if ($error !== null) { - // ERROR Case: - // 1.) roll back the partially applied event on the subscriber - $this->subscriptionStore->rollbackSavepoint(); - // 2.) for the leftover events we are not including this failed subscription for catchup - $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); - // 3.) update the subscription error state on either its unchanged or new position (if some events worked) - $this->subscriptionStore->update( - $subscription->id, - status: SubscriptionStatus::ERROR, - position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, - subscriptionError: SubscriptionError::fromPreviousStatusAndException( - $subscription->status, - $error->throwable - ), - ); - // 4.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed - // todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon - try { - $this->subscribers->get($subscription->id)->onAfterCatchUp(); - } catch (\Throwable $e) { - // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. - $message = sprintf('Subscriber "%s" had an error and also failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); - $this->logger?->critical($message); - throw new CatchUpFailed($message, 1732733740, $e); - } - $errors[] = $error; - continue; - } - // HAPPY Case: - $this->subscriptionStore->releaseSavepoint(); - $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; - } - $numberOfProcessedEvents++; + $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber); + foreach ($eventStream as $eventEnvelope) { + $sequenceNumber = $eventEnvelope->sequenceNumber; + if ($numberOfProcessedEvents > 0) { + $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value)); + } + if ($progressClosure !== null) { + $progressClosure($eventEnvelope); + } + $domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event); + foreach ($subscriptionsToCatchup as $subscription) { + if ($subscription->position->value >= $sequenceNumber->value) { + $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); + continue; } - foreach ($subscriptionsToCatchup as $subscription) { + $this->subscriptionStore->createSavepoint(); + $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id); + if ($error !== null) { + // ERROR Case: + // 1.) roll back the partially applied event on the subscriber + $this->subscriptionStore->rollbackSavepoint(); + // 2.) for the leftover events we are not including this failed subscription for catchup + $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); + // 3.) update the subscription error state on either its unchanged or new position (if some events worked) + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::ERROR, + position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, + subscriptionError: SubscriptionError::fromPreviousStatusAndException( + $subscription->status, + $error->throwable + ), + ); + // 4.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed + // todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon try { $this->subscribers->get($subscription->id)->onAfterCatchUp(); } catch (\Throwable $e) { // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. - $message = sprintf('Subscriber "%s" failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); + $message = sprintf('Subscriber "%s" had an error and also failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); $this->logger?->critical($message); - throw new CatchUpFailed($message, 1732374000, $e); - } - // after catchup mark all subscriptions as active, so they are triggered automatically now. - // The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position - $this->subscriptionStore->update( - $subscription->id, - status: SubscriptionStatus::ACTIVE, - position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, - subscriptionError: null, - ); - if ($subscription->status !== SubscriptionStatus::ACTIVE) { - $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value)); + throw new CatchUpFailed($message, 1732733740, $e); } + $errors[] = $error; + continue; } - $this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors))); - return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors)); + // HAPPY Case: + $this->subscriptionStore->releaseSavepoint(); + $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; } - ); + $numberOfProcessedEvents++; + } + foreach ($subscriptionsToCatchup as $subscription) { + try { + $this->subscribers->get($subscription->id)->onAfterCatchUp(); + } catch (\Throwable $e) { + // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. + $message = sprintf('Subscriber "%s" failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); + $this->logger?->critical($message); + throw new CatchUpFailed($message, 1732374000, $e); + } + // after catchup mark all subscriptions as active, so they are triggered automatically now. + // The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::ACTIVE, + position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, + subscriptionError: null, + ); + if ($subscription->status !== SubscriptionStatus::ACTIVE) { + $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value)); + } + } + $this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors))); + return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors)); } /**