diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php index 4adb2301ca..a8e725dc3b 100644 --- a/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentRepositoryMaintainer.php @@ -35,8 +35,8 @@ * * Resetting a content repository with {@see prune} method will purge the event stream and reset all subscription states. * - * Staus information - * ----------------- + * Status information + * ------------------ * The status of the content repository e.g. if a setup is required or if all subscriptions are active and their position * can be examined with {@see status} * diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/Errors.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/Errors.php index 11b9fffc3c..8e8d3b3853 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/Errors.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/Errors.php @@ -21,7 +21,7 @@ private function __construct( if ($errors === []) { throw new \InvalidArgumentException('Errors must not be empty.', 1731612542); } - $this->errors = $errors; + $this->errors = array_values($errors); } /** diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php index 7f8e1f43aa..0038935ec7 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php @@ -13,6 +13,13 @@ use Neos\ContentRepository\Core\Subscription\Exception\CatchUpFailed; use Neos\ContentRepository\Core\Subscription\Exception\SubscriptionEngineAlreadyProcessingException; use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; +use Neos\ContentRepository\Core\Subscription\Store\SubscriptionCriteria; +use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface; +use Neos\ContentRepository\Core\Subscription\Subscriber\Subscribers; +use Neos\ContentRepository\Core\Subscription\Subscription; +use Neos\ContentRepository\Core\Subscription\SubscriptionError; +use Neos\ContentRepository\Core\Subscription\SubscriptionId; +use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionStatusCollection; use Neos\ContentRepository\Core\Subscription\SubscriptionStatusFilter; use Neos\EventStore\EventStoreInterface; @@ -20,12 +27,6 @@ use Neos\EventStore\Model\EventEnvelope; use Neos\EventStore\Model\EventStream\VirtualStreamName; use Psr\Log\LoggerInterface; -use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; -use Neos\ContentRepository\Core\Subscription\Store\SubscriptionCriteria; -use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface; -use Neos\ContentRepository\Core\Subscription\Subscriber\Subscribers; -use Neos\ContentRepository\Core\Subscription\Subscription; -use Neos\ContentRepository\Core\Subscription\Subscriptions; /** * This is the internal core for the catchup @@ -38,7 +39,6 @@ final class SubscriptionEngine { private bool $processing = false; - private readonly SubscriptionManager $subscriptionManager; public function __construct( private readonly EventStoreInterface $eventStore, @@ -47,7 +47,6 @@ public function __construct( private readonly EventNormalizer $eventNormalizer, private readonly LoggerInterface|null $logger = null, ) { - $this->subscriptionManager = new SubscriptionManager($this->subscriptionStore); } public function setup(SubscriptionEngineCriteria|null $criteria = null): Result @@ -58,7 +57,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result $this->subscriptionStore->setup(); $this->discoverNewSubscriptions(); - $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ + $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([ SubscriptionStatus::NEW, SubscriptionStatus::BOOTING, SubscriptionStatus::ACTIVE, @@ -76,7 +75,6 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result $errors[] = $error; } } - $this->subscriptionManager->flush(); return $errors === [] ? Result::success() : Result::failed(Errors::fromArray($errors)); } @@ -95,7 +93,7 @@ public function reset(SubscriptionEngineCriteria|null $criteria = null): Result $criteria ??= SubscriptionEngineCriteria::noConstraints(); $this->logger?->info('Subscription Engine: Start to reset.'); - $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::any())); + $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::any())); if ($subscriptions->isEmpty()) { $this->logger?->info('Subscription Engine: No subscriptions to reset.'); return Result::success(); @@ -107,7 +105,6 @@ public function reset(SubscriptionEngineCriteria|null $criteria = null): Result $errors[] = $error; } } - $this->subscriptionManager->flush(); return $errors === [] ? Result::success() : Result::failed(Errors::fromArray($errors)); } @@ -115,7 +112,7 @@ public function subscriptionStatus(SubscriptionEngineCriteria|null $criteria = n { $statuses = []; try { - $subscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::create(ids: $criteria?->ids)); + $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::create(ids: $criteria?->ids)); } catch (TableNotFoundException) { // the schema is not setup - thus there are no subscribers return SubscriptionStatusCollection::createEmpty(); @@ -158,21 +155,16 @@ public function subscriptionStatus(SubscriptionEngineCriteria|null $criteria = n return SubscriptionStatusCollection::fromArray($statuses); } - private function handleEvent(EventEnvelope $eventEnvelope, EventInterface $domainEvent, Subscription $subscription): Error|null + private function handleEvent(EventEnvelope $eventEnvelope, EventInterface $domainEvent, SubscriptionId $subscriptionId): Error|null { - $subscriber = $this->subscribers->get($subscription->id); + $subscriber = $this->subscribers->get($subscriptionId); try { $subscriber->handle($domainEvent, $eventEnvelope); } catch (\Throwable $e) { - $this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s" (sequence number: %d): %s', $subscriber::class, $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value, $e->getMessage())); - $subscription->fail($e); - $this->subscriptionManager->update($subscription); - return Error::fromSubscriptionIdAndException($subscription->id, $e); + $this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s" (sequence number: %d): %s', $subscriber::class, $subscriptionId->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value, $e->getMessage())); + return Error::fromSubscriptionIdAndException($subscriptionId, $e); } - $this->logger?->debug(sprintf('Subscription Engine: Subscriber "%s" for "%s" processed the event "%s" (sequence number: %d).', substr(strrchr($subscriber::class, '\\') ?: '', 1), $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value)); - $subscription->set( - position: $eventEnvelope->sequenceNumber - ); + $this->logger?->debug(sprintf('Subscription Engine: Subscriber "%s" for "%s" processed the event "%s" (sequence number: %d).', substr(strrchr($subscriber::class, '\\') ?: '', 1), $subscriptionId->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value)); return null; } @@ -184,19 +176,21 @@ private function handleEvent(EventEnvelope $eventEnvelope, EventInterface $domai */ private function discoverNewSubscriptions(): void { - $this->subscriptionManager->findForAndUpdate( - SubscriptionCriteria::noConstraints(), - function (Subscriptions $subscriptions) { - foreach ($this->subscribers as $subscriber) { - if ($subscriptions->contain($subscriber->id)) { - continue; - } - $subscription = Subscription::createFromSubscriber($subscriber); - $this->subscriptionManager->add($subscription); - $this->logger?->info(sprintf('Subscription Engine: New Subscriber "%s" was found and added to the subscription store.', $subscriber->id->value)); - } + $subscriptions = $this->subscriptionStore->findByCriteriaForUpdate(SubscriptionCriteria::noConstraints()); + foreach ($this->subscribers as $subscriber) { + if ($subscriptions->contain($subscriber->id)) { + continue; } - ); + $subscription = new Subscription( + $subscriber->id, + SubscriptionStatus::NEW, + SequenceNumber::fromInteger(0), + null, + null + ); + $this->subscriptionStore->add($subscription); + $this->logger?->info(sprintf('Subscription Engine: New Subscriber "%s" was found and added to the subscription store.', $subscriber->id->value)); + } } /** @@ -207,10 +201,12 @@ private function setupSubscription(Subscription $subscription): ?Error { if (!$this->subscribers->contain($subscription->id)) { // mark detached subscriptions as we cannot set up - $subscription->set( + $this->subscriptionStore->update( + $subscription->id, status: SubscriptionStatus::DETACHED, + position: $subscription->position, + subscriptionError: $subscription->error ); - $this->subscriptionManager->update($subscription); $this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value)); return null; } @@ -221,35 +217,30 @@ private function setupSubscription(Subscription $subscription): ?Error } catch (\Throwable $e) { // todo wrap in savepoint to ensure error do not mess up the projection? $this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" has an error in the setup method: %s', $subscriber::class, $subscription->id->value, $e->getMessage())); - $subscription->fail($e); - $this->subscriptionManager->update($subscription); + $this->subscriptionStore->update( + $subscription->id, + SubscriptionStatus::ERROR, + $subscription->position, + SubscriptionError::fromPreviousStatusAndException($subscription->status, $e) + ); return Error::fromSubscriptionIdAndException($subscription->id, $e); } if ($subscription->status === SubscriptionStatus::ACTIVE) { $this->logger?->debug(sprintf('Subscription Engine: Active subscriber "%s" for "%s" has been re-setup.', $subscriber::class, $subscription->id->value)); return null; - } - if ($subscription->status === SubscriptionStatus::ERROR) { - $this->logger?->debug(sprintf('Subscription Engine: Failed subscriber "%s" for "%s" has been re-setup, set to %s. Previous error: %s.', $subscriber::class, $subscription->id->value, SubscriptionStatus::BOOTING->name, $subscription->error?->errorMessage)); - $subscription->set( - status: SubscriptionStatus::BOOTING + } else { + $this->subscriptionStore->update( + $subscription->id, + SubscriptionStatus::BOOTING, + $subscription->position, + null ); - $subscription->unsetError(); - $this->subscriptionManager->update($subscription); - return null; } $this->logger?->debug(sprintf('Subscription Engine: Subscriber "%s" for "%s" has been setup, set to %s from previous %s.', $subscriber::class, $subscription->id->value, SubscriptionStatus::BOOTING->name, $subscription->status->name)); - $subscription->set( - status: SubscriptionStatus::BOOTING - ); - $this->subscriptionManager->update($subscription); return null; } - /** - * TODO - */ private function resetSubscription(Subscription $subscription): ?Error { $subscriber = $this->subscribers->get($subscription->id); @@ -259,12 +250,12 @@ private function resetSubscription(Subscription $subscription): ?Error $this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" has an error in the resetState method: %s', $subscriber::class, $subscription->id->value, $e->getMessage())); return Error::fromSubscriptionIdAndException($subscription->id, $e); } - $subscription->set( - status: SubscriptionStatus::BOOTING, - position: SequenceNumber::none() + $this->subscriptionStore->update( + $subscription->id, + SubscriptionStatus::BOOTING, + position: SequenceNumber::none(), + subscriptionError: null ); - $subscription->unsetError(); - $this->subscriptionManager->update($subscription); $this->logger?->debug(sprintf('Subscription Engine: For Subscriber "%s" for "%s" the resetState method has been executed.', $subscriber::class, $subscription->id->value)); return null; } @@ -273,26 +264,30 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs { $this->logger?->info(sprintf('Subscription Engine: Start catching up subscriptions in state "%s".', $subscriptionStatus->value)); - return $this->subscriptionManager->findForAndUpdate( - SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, $subscriptionStatus), - function (Subscriptions $subscriptions) use ($subscriptionStatus, $progressClosure) { - foreach ($subscriptions as $subscription) { + $subscriptionEngineCriteria = SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, $subscriptionStatus); + return $this->subscriptionStore->transactional( + function () use ($subscriptionEngineCriteria, $subscriptionStatus, $progressClosure) { + $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate($subscriptionEngineCriteria); + foreach ($subscriptionsToCatchup as $subscription) { if (!$this->subscribers->contain($subscription->id)) { // mark detached subscriptions as we cannot handle them and exclude them from catchup - $subscription->set( + $this->subscriptionStore->update( + $subscription->id, status: SubscriptionStatus::DETACHED, + position: $subscription->position, + subscriptionError: null, ); - $this->subscriptionManager->update($subscription); $this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value)); - $subscriptions = $subscriptions->without($subscription->id); + $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); } } - if ($subscriptions->isEmpty()) { - $this->logger?->info(sprintf('Subscription Engine: No subscriptions in state "%s". Finishing catch up', $subscriptionStatus->value)); + if ($subscriptionsToCatchup->isEmpty()) { + $this->logger?->info(sprintf('Subscription Engine: No subscriptions in state "%s". Finishing catch up', $subscriptionStatus->value)); return ProcessedResult::success(0); } - foreach ($subscriptions as $subscription) { + + foreach ($subscriptionsToCatchup as $subscription) { try { $this->subscribers->get($subscription->id)->onBeforeCatchUp($subscription->status); } catch (\Throwable $e) { @@ -302,48 +297,68 @@ function (Subscriptions $subscriptions) use ($subscriptionStatus, $progressClosu throw new CatchUpFailed($message, 1732374000, $e); } } - $startSequenceNumber = $subscriptions->lowestPosition()?->next() ?? SequenceNumber::none(); + $startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none(); $this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value)); - /** @var list $errors */ + /** @var array $errors */ $errors = []; $numberOfProcessedEvents = 0; - try { - $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber); - foreach ($eventStream as $eventEnvelope) { - $sequenceNumber = $eventEnvelope->sequenceNumber; - if ($numberOfProcessedEvents > 0) { - $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value)); - } - if ($progressClosure !== null) { - $progressClosure($eventEnvelope); + /** @var array $highestSequenceNumberForSubscriber */ + $highestSequenceNumberForSubscriber = []; + + $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber); + foreach ($eventStream as $eventEnvelope) { + $sequenceNumber = $eventEnvelope->sequenceNumber; + if ($numberOfProcessedEvents > 0) { + $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value)); + } + if ($progressClosure !== null) { + $progressClosure($eventEnvelope); + } + $domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event); + foreach ($subscriptionsToCatchup as $subscription) { + if ($subscription->position->value >= $sequenceNumber->value) { + $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); + continue; } - $domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event); - foreach ($subscriptions as $subscription) { - if ($subscription->status !== $subscriptionStatus) { - continue; - } - if ($subscription->position->value >= $sequenceNumber->value) { - $this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value)); - continue; - } - $this->subscriptionStore->createSavepoint(); - $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription); - if (!$error) { - $this->subscriptionStore->releaseSavepoint(); - continue; - } + $this->subscriptionStore->createSavepoint(); + $error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id); + if ($error !== null) { + // ERROR Case: + // 1.) roll back the partially applied event on the subscriber $this->subscriptionStore->rollbackSavepoint(); + // 2.) for the leftover events we are not including this failed subscription for catchup + $subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id); + // 3.) update the subscription error state on either its unchanged or new position (if some events worked) + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::ERROR, + position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, + subscriptionError: SubscriptionError::fromPreviousStatusAndException( + $subscription->status, + $error->throwable + ), + ); + // 4.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed + // todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon + try { + $this->subscribers->get($subscription->id)->onAfterCatchUp(); + } catch (\Throwable $e) { + // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. + $message = sprintf('Subscriber "%s" had an error and also failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); + $this->logger?->critical($message); + throw new CatchUpFailed($message, 1732733740, $e); + } $errors[] = $error; + continue; } - $numberOfProcessedEvents++; - } - } finally { - foreach ($subscriptions as $subscription) { - $this->subscriptionManager->update($subscription); + // HAPPY Case: + $this->subscriptionStore->releaseSavepoint(); + $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; } + $numberOfProcessedEvents++; } - foreach ($subscriptions as $subscription) { + foreach ($subscriptionsToCatchup as $subscription) { try { $this->subscribers->get($subscription->id)->onAfterCatchUp(); } catch (\Throwable $e) { @@ -352,15 +367,15 @@ function (Subscriptions $subscriptions) use ($subscriptionStatus, $progressClosu $this->logger?->critical($message); throw new CatchUpFailed($message, 1732374000, $e); } - if ($subscription->status !== $subscriptionStatus) { - continue; - } - + // after catchup mark all subscriptions as active, so they are triggered automatically now. + // The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position + $this->subscriptionStore->update( + $subscription->id, + status: SubscriptionStatus::ACTIVE, + position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position, + subscriptionError: null, + ); if ($subscription->status !== SubscriptionStatus::ACTIVE) { - $subscription->set( - status: SubscriptionStatus::ACTIVE, - ); - $this->subscriptionManager->update($subscription); $this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value)); } } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionManager.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionManager.php deleted file mode 100644 index 2daa7e9db3..0000000000 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionManager.php +++ /dev/null @@ -1,76 +0,0 @@ - */ - private \SplObjectStorage $forAdd; - - /** @var \SplObjectStorage */ - private \SplObjectStorage $forUpdate; - - public function __construct( - private readonly SubscriptionStoreInterface $subscriptionStore, - ) { - $this->forAdd = new \SplObjectStorage(); - $this->forUpdate = new \SplObjectStorage(); - } - - /** - * @template T - * @param \Closure(Subscriptions):T $closure - * @return T - */ - public function findForAndUpdate(SubscriptionCriteria $criteria, \Closure $closure): mixed - { - return $this->subscriptionStore->transactional( - /** @return T */ - function () use ($closure, $criteria): mixed { - try { - return $closure($this->subscriptionStore->findByCriteria($criteria)); - } finally { - $this->flush(); - } - }, - ); - } - - public function add(Subscription $subscription): void - { - $this->forAdd->attach($subscription); - } - - public function update(Subscription $subscription): void - { - $this->forUpdate->attach($subscription); - } - - public function flush(): void - { - foreach ($this->forAdd as $subscription) { - $this->subscriptionStore->add($subscription); - } - - foreach ($this->forUpdate as $subscription) { - if ($this->forAdd->contains($subscription)) { - continue; - } - - $this->subscriptionStore->update($subscription); - } - - $this->forAdd = new \SplObjectStorage(); - $this->forUpdate = new \SplObjectStorage(); - } -} diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php index 0127769fde..85f3107271 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Store/SubscriptionStoreInterface.php @@ -5,7 +5,11 @@ namespace Neos\ContentRepository\Core\Subscription\Store; use Neos\ContentRepository\Core\Subscription\Subscription; +use Neos\ContentRepository\Core\Subscription\SubscriptionError; +use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\Subscriptions; +use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; +use Neos\EventStore\Model\Event\SequenceNumber; /** * @internal only API for custom content repository integrations @@ -14,11 +18,16 @@ interface SubscriptionStoreInterface { public function setup(): void; - public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions; + public function findByCriteriaForUpdate(SubscriptionCriteria $criteria): Subscriptions; public function add(Subscription $subscription): void; - public function update(Subscription $subscription): void; + public function update( + SubscriptionId $subscriptionId, + SubscriptionStatus $status, + SequenceNumber $position, + SubscriptionError|null $subscriptionError, + ): void; /** * @template T diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Subscription.php b/Neos.ContentRepository.Core/Classes/Subscription/Subscription.php index 902f7210db..ccfb08f07e 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Subscription.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Subscription.php @@ -4,60 +4,19 @@ namespace Neos\ContentRepository\Core\Subscription; -use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine; -use Neos\ContentRepository\Core\Subscription\Subscriber\ProjectionSubscriber; use Neos\EventStore\Model\Event\SequenceNumber; /** - * Note: This class is mutable by design! - * * @internal implementation detail of the catchup */ -final class Subscription +final readonly class Subscription { public function __construct( - public readonly SubscriptionId $id, + public SubscriptionId $id, public SubscriptionStatus $status, public SequenceNumber $position, - public SubscriptionError|null $error = null, - public readonly \DateTimeImmutable|null $lastSavedAt = null, + public SubscriptionError|null $error, + public \DateTimeImmutable|null $lastSavedAt, ) { } - - /** - * @internal Only the {@see SubscriptionEngine} is supposed to instantiate subscriptions - */ - public static function createFromSubscriber(ProjectionSubscriber $subscriber): self - { - return new self( - $subscriber->id, - SubscriptionStatus::NEW, - SequenceNumber::fromInteger(0), - ); - } - - /** - * @internal Only the {@see SubscriptionEngine} is supposed to mutate subscriptions - */ - public function set( - SubscriptionStatus $status = null, - SequenceNumber $position = null - ): void { - $this->status = $status ?? $this->status; - $this->position = $position ?? $this->position; - } - - public function unsetError(): void - { - $this->error = null; - } - - /** - * @internal Only the {@see SubscriptionEngine} is supposed to mutate subscriptions - */ - public function fail(\Throwable $exception): void - { - $this->error = SubscriptionError::fromPreviousStatusAndException($this->status, $exception); - $this->status = SubscriptionStatus::ERROR; - } } diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php b/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php index 234f1b4483..9c7c9e9200 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/SubscriptionStore/DoctrineSubscriptionStore.php @@ -64,7 +64,7 @@ public function setup(): void } } - public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions + public function findByCriteriaForUpdate(SubscriptionCriteria $criteria): Subscriptions { $queryBuilder = $this->dbal->createQueryBuilder() ->select('*') @@ -107,15 +107,24 @@ public function add(Subscription $subscription): void ); } - public function update(Subscription $subscription): void - { - $row = self::toDatabase($subscription); + public function update( + SubscriptionId $subscriptionId, + SubscriptionStatus $status, + SequenceNumber $position, + SubscriptionError|null $subscriptionError, + ): void { + $row = []; $row['last_saved_at'] = $this->clock->now()->format('Y-m-d H:i:s'); + $row['status'] = $status->name; + $row['position'] = $position->value; + $row['error_message'] = $subscriptionError?->errorMessage; + $row['error_previous_status'] = $subscriptionError?->previousStatus?->name; + $row['error_trace'] = $subscriptionError?->errorTrace; $this->dbal->update( $this->tableName, $row, [ - 'id' => $subscription->id->value, + 'id' => $subscriptionId->value, ] ); }