Skip to content

Commit

Permalink
TASK: Remove SubscriptionManager and make subscriptions immutable
Browse files Browse the repository at this point in the history
This removes any magic from the code flow and makes transactions, locking, and update more explicit and easier to follow.
  • Loading branch information
mhsdesign committed Nov 27, 2024
1 parent fd768da commit ac425ff
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
*
* Resetting a content repository with {@see prune} method will purge the event stream and reset all subscription states.
*
* Staus information
* -----------------
* Status information
* ------------------
* The status of the content repository e.g. if a setup is required or if all subscriptions are active and their position
* can be examined with {@see status}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ private function __construct(
if ($errors === []) {
throw new \InvalidArgumentException('Errors must not be empty.', 1731612542);
}
$this->errors = $errors;
$this->errors = array_values($errors);
}

/**
Expand Down

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
namespace Neos\ContentRepository\Core\Subscription\Store;

use Neos\ContentRepository\Core\Subscription\Subscription;
use Neos\ContentRepository\Core\Subscription\SubscriptionError;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\Subscriptions;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* @internal only API for custom content repository integrations
Expand All @@ -14,11 +18,16 @@ interface SubscriptionStoreInterface
{
public function setup(): void;

public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions;
public function findByCriteriaForUpdate(SubscriptionCriteria $criteria): Subscriptions;

public function add(Subscription $subscription): void;

public function update(Subscription $subscription): void;
public function update(
SubscriptionId $subscriptionId,
SubscriptionStatus $status,
SequenceNumber $position,
SubscriptionError|null $subscriptionError,
): void;

/**
* @template T
Expand Down
49 changes: 4 additions & 45 deletions Neos.ContentRepository.Core/Classes/Subscription/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,19 @@

namespace Neos\ContentRepository\Core\Subscription;

use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\ContentRepository\Core\Subscription\Subscriber\ProjectionSubscriber;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* Note: This class is mutable by design!
*
* @internal implementation detail of the catchup
*/
final class Subscription
final readonly class Subscription
{
public function __construct(
public readonly SubscriptionId $id,
public SubscriptionId $id,
public SubscriptionStatus $status,
public SequenceNumber $position,
public SubscriptionError|null $error = null,
public readonly \DateTimeImmutable|null $lastSavedAt = null,
public SubscriptionError|null $error,
public \DateTimeImmutable|null $lastSavedAt,
) {
}

/**
* @internal Only the {@see SubscriptionEngine} is supposed to instantiate subscriptions
*/
public static function createFromSubscriber(ProjectionSubscriber $subscriber): self
{
return new self(
$subscriber->id,
SubscriptionStatus::NEW,
SequenceNumber::fromInteger(0),
);
}

/**
* @internal Only the {@see SubscriptionEngine} is supposed to mutate subscriptions
*/
public function set(
SubscriptionStatus $status = null,
SequenceNumber $position = null
): void {
$this->status = $status ?? $this->status;
$this->position = $position ?? $this->position;
}

public function unsetError(): void
{
$this->error = null;
}

/**
* @internal Only the {@see SubscriptionEngine} is supposed to mutate subscriptions
*/
public function fail(\Throwable $exception): void
{
$this->error = SubscriptionError::fromPreviousStatusAndException($this->status, $exception);
$this->status = SubscriptionStatus::ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public function setup(): void
}
}

public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions
public function findByCriteriaForUpdate(SubscriptionCriteria $criteria): Subscriptions
{
$queryBuilder = $this->dbal->createQueryBuilder()
->select('*')
Expand Down Expand Up @@ -107,15 +107,24 @@ public function add(Subscription $subscription): void
);
}

public function update(Subscription $subscription): void
{
$row = self::toDatabase($subscription);
public function update(
SubscriptionId $subscriptionId,
SubscriptionStatus $status,
SequenceNumber $position,
SubscriptionError|null $subscriptionError,
): void {
$row = [];
$row['last_saved_at'] = $this->clock->now()->format('Y-m-d H:i:s');
$row['status'] = $status->name;
$row['position'] = $position->value;
$row['error_message'] = $subscriptionError?->errorMessage;
$row['error_previous_status'] = $subscriptionError?->previousStatus?->name;
$row['error_trace'] = $subscriptionError?->errorTrace;
$this->dbal->update(
$this->tableName,
$row,
[
'id' => $subscription->id->value,
'id' => $subscriptionId->value,
]
);
}
Expand Down

0 comments on commit ac425ff

Please sign in to comment.