Skip to content

Commit

Permalink
FEATURE: Implement reactivateSubscription
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Nov 27, 2024
1 parent ac425ff commit eb0d792
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
use Neos\ContentRepository\Core\Subscription\Engine\Error;
use Neos\ContentRepository\Core\Subscription\Engine\Errors;
use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult;
use Neos\ContentRepository\Core\Subscription\Engine\Result;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria;
use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\SubscriptionError;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use PHPUnit\Framework\MockObject\Stub\Exception as WillThrowException;

final class ProjectionErrorTest extends AbstractSubscriptionEngineTestCase
{
Expand Down Expand Up @@ -76,25 +77,29 @@ public function projectionWithError()
}

/** @test */
public function fixFailedProjection()
public function fixFailedProjectionViaReset()
{
$this->eventStore->setup();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok());
$this->fakeProjection->expects(self::once())->method('resetState');
$this->fakeProjection->expects(self::exactly(2))->method('apply');
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

// commit an event
$this->commitExampleContentStreamEvent();

// catchup active tries to apply the commited event
$this->fakeProjection->expects(self::exactly(2))->method('apply')->with(self::isInstanceOf(ContentStreamWasCreated::class))->willReturnOnConsecutiveCalls(
new WillThrowException($exception = new \RuntimeException('This projection is kaputt.')),
null // okay again
);
$exception = new \RuntimeException('This projection is kaputt.');
$this->secondFakeProjection->injectSaboteur(function (EventEnvelope $eventEnvelope) use ($exception) {
self::assertEquals(SequenceNumber::fromInteger(1), $eventEnvelope->sequenceNumber);
self::assertEquals('ContentStreamWasCreated', $eventEnvelope->event->type->value);
throw $exception;
});

$expectedFailure = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'),
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception),
Expand All @@ -106,40 +111,121 @@ public function fixFailedProjection()

self::assertEquals(
$expectedFailure,
$this->subscriptionStatus('Vendor.Package:FakeProjection')
$this->subscriptionStatus('Vendor.Package:SecondFakeProjection')
);

// catchup active does not change anything
$this->secondFakeProjection->killSaboteur();

$result = $this->subscriptionEngine->reset();
self::assertNull($result->errors);

// expect the subscriptionError to be reset to null
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none());

$result = $this->subscriptionEngine->boot();
self::assertNull($result->errors);

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1));
}

/** @test */
public function irreparableProjection()
{
// test ways NOT to fix a projection :)
$this->eventStore->setup();
$this->fakeProjection->expects(self::exactly(2))->method('setUp');
$this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok());
$this->fakeProjection->expects(self::exactly(2))->method('apply');
$this->fakeProjection->expects(self::once())->method('resetState');
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

// commit an event
$this->commitExampleContentStreamEvent();

$exception = new \RuntimeException('This projection is kaputt.');
$this->secondFakeProjection->injectSaboteur(function (EventEnvelope $eventEnvelope) use ($exception) {
self::assertEquals(SequenceNumber::fromInteger(1), $eventEnvelope->sequenceNumber);
self::assertEquals('ContentStreamWasCreated', $eventEnvelope->event->type->value);
throw $exception;
});

$expectedFailure = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception),
setupStatus: ProjectionStatus::ok(),
);

// catchup active tries to apply the commited event
$result = $this->subscriptionEngine->catchUpActive();
// but fails
self::assertTrue($result->hasFailed());
self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection'));

// a second catchup active does not change anything
$result = $this->subscriptionEngine->catchUpActive();
self::assertEquals(ProcessedResult::success(0), $result);
self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection'));

// boot neither
$result = $this->subscriptionEngine->boot();
self::assertEquals(ProcessedResult::success(0), $result);
// still the same state
self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection'));

// setup neither
$result = $this->subscriptionEngine->setup();
self::assertEquals(Result::success(), $result);
self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection'));

// reactivation will attempt to retry fix this, but can only work if the projection is repaired and will lead to an error otherwise:
$result = $this->subscriptionEngine->reactivate();
self::assertEquals(
$expectedFailure,
$this->subscriptionStatus('Vendor.Package:FakeProjection')
ProcessedResult::failed(1, Errors::fromArray([
Error::fromSubscriptionIdAndException(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $exception)
])),
$result
);

$this->fakeProjection->expects(self::once())->method('resetState');
self::assertEquals(
ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
// previous state is now an error too also error:
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ERROR, $exception),
setupStatus: ProjectionStatus::ok(),
),
$this->subscriptionStatus('Vendor.Package:SecondFakeProjection')
);

// expect the subscriptionError to be reset to null
$result = $this->subscriptionEngine->reset();
self::assertNull($result->errors);
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none());

// expect the subscriptionError to be reset to null
$this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none());

// but booting will rethrow that error :D
$result = $this->subscriptionEngine->boot();
self::assertNull($result->errors);

$this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1));
self::assertTrue($result->hasFailed());
self::assertEquals(
ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
// previous state is now booting
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::BOOTING, $exception),
setupStatus: ProjectionStatus::ok(),
),
$this->subscriptionStatus('Vendor.Package:SecondFakeProjection')
);
}

/** @test */
public function projectionIsRolledBackAfterError()
{
$this->eventStore->setup();
$this->fakeProjection->expects(self::exactly(2))->method('setUp');
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
$result = $this->subscriptionEngine->setup();
self::assertNull($result->errors);
Expand Down Expand Up @@ -184,16 +270,11 @@ public function projectionIsRolledBackAfterError()

$this->secondFakeProjection->killSaboteur();

$result = $this->subscriptionEngine->setup();
self::assertNull($result->errors);

// subscriptionError is reset
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none());

// catchup after fix
$result = $this->subscriptionEngine->boot();
// reactivate and catchup
$result = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:SecondFakeProjection')]));
self::assertNull($result->errors);

$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1));
self::assertEquals(
[SequenceNumber::fromInteger(1)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
Expand All @@ -204,7 +285,7 @@ public function projectionIsRolledBackAfterError()
public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents()
{
$this->eventStore->setup();
$this->fakeProjection->expects(self::exactly(2))->method('setUp');
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::exactly(2))->method('apply');
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();
Expand Down Expand Up @@ -255,20 +336,12 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents()

$this->secondFakeProjection->killSaboteur();

$result = $this->subscriptionEngine->setup();
self::assertNull($result->errors);

// subscriptionError is reset, but the position is preserved
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(1));
self::assertEquals(
[SequenceNumber::fromInteger(1)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

// catchup after fix
$result = $this->subscriptionEngine->boot();
$result = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:SecondFakeProjection')]));
self::assertNull($result->errors);

// subscriptionError is reset, and the position is advanced if there were events
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2));
self::assertEquals(
[SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria;
use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
Expand Down Expand Up @@ -97,7 +98,7 @@ public function projectionIsDetachedOnCatchupActive()
/** @test */
public function projectionIsDetachedOnSetupAndReattachedIfPossible()
{
$this->fakeProjection->expects(self::exactly(2))->method('setUp');
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
$this->fakeProjection->expects(self::any())->method('status')->willReturn(ProjectionStatus::ok());

Expand Down Expand Up @@ -160,9 +161,10 @@ public function projectionIsDetachedOnSetupAndReattachedIfPossible()
$this->subscriptionStatus('Vendor.Package:FakeProjection')
);

// setup does re-attach as the projection is found again
$this->subscriptionEngine->setup();
// reactivate does re-attach as the projection if its found again
$result = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([SubscriptionId::fromString('Vendor.Package:FakeProjection')]));
self::assertNull($result->errors);

$this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(1));
$this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ public function setUp(): Error|null
$eventStoreIsEmpty = iterator_count($this->eventStore->load(VirtualStreamName::all())->limit(1)) === 0;
$setupResult = $this->subscriptionEngine->setup();
if ($setupResult->errors !== null) {
return self::createErrorForReason('setup', $setupResult->errors);
return self::createErrorForReason('Setup failed:', $setupResult->errors);
}
if ($eventStoreIsEmpty) {
// note: possibly introduce $skipBooting flag instead
// see https://github.com/patchlevel/event-sourcing/blob/b8591c56b21b049f46bead8e7ab424fd2afe9917/src/Subscription/Engine/DefaultSubscriptionEngine.php#L42
$bootResult = $this->subscriptionEngine->boot();
if ($bootResult->errors !== null) {
return self::createErrorForReason('initial catchup', $bootResult->errors);
return self::createErrorForReason('Initial catchup failed:', $bootResult->errors);
}
}
return null;
Expand Down Expand Up @@ -125,11 +125,11 @@ public function replaySubscription(SubscriptionId $subscriptionId, \Closure|null
}
$resetResult = $this->subscriptionEngine->reset(SubscriptionEngineCriteria::create([$subscriptionId]));
if ($resetResult->errors !== null) {
return self::createErrorForReason('reset', $resetResult->errors);
return self::createErrorForReason('Reset failed:', $resetResult->errors);
}
$bootResult = $this->subscriptionEngine->boot(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback);
if ($bootResult->errors !== null) {
return self::createErrorForReason('catchup', $bootResult->errors);
return self::createErrorForReason('Catchup failed:', $bootResult->errors);
}
return null;
}
Expand All @@ -138,11 +138,11 @@ public function replayAllSubscriptions(\Closure|null $progressCallback = null):
{
$resetResult = $this->subscriptionEngine->reset();
if ($resetResult->errors !== null) {
return self::createErrorForReason('reset', $resetResult->errors);
return self::createErrorForReason('Reset failed:', $resetResult->errors);
}
$bootResult = $this->subscriptionEngine->boot(progressCallback: $progressCallback);
if ($bootResult->errors !== null) {
return self::createErrorForReason('catchup', $bootResult->errors);
return self::createErrorForReason('Catchup failed:', $bootResult->errors);
}
return null;
}
Expand All @@ -163,8 +163,10 @@ public function reactivateSubscription(SubscriptionId $subscriptionId, \Closure|
if ($subscriptionStatus->subscriptionStatus === SubscriptionStatus::NEW) {
return new Error(sprintf('Subscription "%s" is not setup and cannot be reactivated.', $subscriptionId->value));
}

// todo implement https://github.com/patchlevel/event-sourcing/blob/b8591c56b21b049f46bead8e7ab424fd2afe9917/src/Subscription/Engine/DefaultSubscriptionEngine.php#L624
$reactivateResult = $this->subscriptionEngine->reactivate(SubscriptionEngineCriteria::create([$subscriptionId]), $progressCallback);
if ($reactivateResult->errors !== null) {
return self::createErrorForReason('Could not reactivate subscriber:', $reactivateResult->errors);
}
return null;
}

Expand All @@ -183,20 +185,20 @@ public function prune(): Error|null
}
$resetResult = $this->subscriptionEngine->reset();
if ($resetResult->errors !== null) {
return self::createErrorForReason('reset', $resetResult->errors);
return self::createErrorForReason('Reset failed:', $resetResult->errors);
}
// note: possibly introduce $skipBooting flag like for setup
$bootResult = $this->subscriptionEngine->boot();
if ($bootResult->errors !== null) {
return self::createErrorForReason('booting', $bootResult->errors);
return self::createErrorForReason('Catchup failed:', $bootResult->errors);
}
return null;
}

private static function createErrorForReason(string $method, Errors $errors): Error
{
$message = [];
$message[] = sprintf('%s produced the following error%s', $method, $errors->count() === 1 ? '' : 's');
$message[] = sprintf('%s: Following error%s', $method, $errors->count() === 1 ? '' : 's');
foreach ($errors as $error) {
$message[] = sprintf(' Subscription "%s": %s', $error->subscriptionId->value, $error->message);
}
Expand Down
Loading

0 comments on commit eb0d792

Please sign in to comment.