Skip to content

Commit

Permalink
Merge pull request #4846 from neos/feature/4468-contentrepository-sta…
Browse files Browse the repository at this point in the history
…tus-2

FEATURE: Content Repository status
  • Loading branch information
bwaidelich authored Jan 22, 2024
2 parents 4ce6cec + 543afbf commit 06904c3
Show file tree
Hide file tree
Showing 23 changed files with 708 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Types\Types;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeDisabling;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeMove;
Expand Down Expand Up @@ -46,10 +45,13 @@
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType;
use Neos\ContentRepository\Core\Projection\ContentGraph\Timestamps;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateClassification;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
Expand Down Expand Up @@ -108,24 +110,49 @@ protected function getTableNamePrefix(): string

public function setUp(): void
{
$this->setupTables();
foreach ($this->determineRequiredSqlStatements() as $statement) {
$this->getDatabaseConnection()->executeStatement($statement);
}
$this->checkpointStorage->setUp();
}

private function setupTables(): void
/**
* @return array<string>
*/
private function determineRequiredSqlStatements(): array
{
$connection = $this->dbalClient->getConnection();
$schemaManager = $connection->getSchemaManager();
if (!$schemaManager instanceof AbstractSchemaManager) {
throw new \RuntimeException('Failed to retrieve Schema Manager', 1625653914);
}

$schema = (new DoctrineDbalContentGraphSchemaBuilder($this->tableNamePrefix))->buildSchema($schemaManager);
return DbalSchemaDiff::determineRequiredSqlStatements($connection, $schema);
}

$schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema);
foreach ($schemaDiff->toSaveSql($connection->getDatabasePlatform()) as $statement) {
$connection->executeStatement($statement);
public function status(): ProjectionStatus
{
$checkpointStorageStatus = $this->checkpointStorage->status();
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) {
return ProjectionStatus::error($checkpointStorageStatus->details);
}
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) {
return ProjectionStatus::setupRequired($checkpointStorageStatus->details);
}
try {
$this->getDatabaseConnection()->connect();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
}
try {
$requiredSqlStatements = $this->determineRequiredSqlStatements();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage()));
}
if ($requiredSqlStatements !== []) {
return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
}
return ProjectionStatus::ok();
}

public function reset(): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\ContentStreamForking;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\NodeCreation;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\NodeDisabling;
Expand Down Expand Up @@ -46,8 +45,11 @@
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;

Expand Down Expand Up @@ -95,11 +97,50 @@ public function __construct(

public function setUp(): void
{
$this->setupTables();
foreach ($this->determineRequiredSqlStatements() as $statement) {
$this->getDatabaseConnection()->executeStatement($statement);
}
$this->getDatabaseConnection()->executeStatement('
CREATE INDEX IF NOT EXISTS node_properties ON ' . $this->tableNamePrefix . '_node USING GIN(properties);
create index if not exists hierarchy_children
on ' . $this->tableNamePrefix . '_hierarchyhyperrelation using gin (childnodeanchors);
create index if not exists restriction_affected
on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids);
');
$this->checkpointStorage->setUp();
}

private function setupTables(): void
public function status(): ProjectionStatus
{
$checkpointStorageStatus = $this->checkpointStorage->status();
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) {
return ProjectionStatus::error($checkpointStorageStatus->details);
}
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) {
return ProjectionStatus::setupRequired($checkpointStorageStatus->details);
}
try {
$this->getDatabaseConnection()->connect();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
}
try {
$requiredSqlStatements = $this->determineRequiredSqlStatements();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage()));
}
if ($requiredSqlStatements !== []) {
return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
}
return ProjectionStatus::ok();
}

/**
* @return array<string>
*/
private function determineRequiredSqlStatements(): array
{
$connection = $this->databaseClient->getConnection();
HypergraphSchemaBuilder::registerTypes($connection->getDatabasePlatform());
Expand All @@ -109,19 +150,7 @@ private function setupTables(): void
}

$schema = (new HypergraphSchemaBuilder($this->tableNamePrefix))->buildSchema();
$schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema);
foreach ($schemaDiff->toSaveSql($connection->getDatabasePlatform()) as $statement) {
$connection->executeStatement($statement);
}
$connection->executeStatement('
CREATE INDEX IF NOT EXISTS node_properties ON ' . $this->tableNamePrefix . '_node USING GIN(properties);
create index if not exists hierarchy_children
on ' . $this->tableNamePrefix . '_hierarchyhyperrelation using gin (childnodeanchors);
create index if not exists restriction_affected
on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids);
');
return DbalSchemaDiff::determineRequiredSqlStatements($connection, $schema);
}

public function reset(): void
Expand Down
14 changes: 14 additions & 0 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatuses;
use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryStatus;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\EventMetadata;
Expand Down Expand Up @@ -199,6 +201,18 @@ public function setUp(): void
}
}

public function status(): ContentRepositoryStatus
{
$projectionStatuses = ProjectionStatuses::create();
foreach ($this->projectionsAndCatchUpHooks->projections as $projectionClassName => $projection) {
$projectionStatuses = $projectionStatuses->with($projectionClassName, $projection->status());
}
return new ContentRepositoryStatus(
$this->eventStore->status(),
$projectionStatuses,
);
}

public function resetProjectionStates(): void
{
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
use Doctrine\DBAL\Platforms\MySqlPlatform;
use Doctrine\DBAL\Platforms\PostgreSqlPlatform;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Column;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Neos\ContentRepository\Core\Projection\CheckpointStorageInterface;
use Neos\ContentRepository\Core\Projection\CheckpointStorageStatus;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
Expand Down Expand Up @@ -44,18 +46,7 @@ public function __construct(

public function setUp(): void
{
$schemaManager = $this->connection->getSchemaManager();
if (!$schemaManager instanceof AbstractSchemaManager) {
throw new \RuntimeException('Failed to retrieve Schema Manager', 1652269057);
}
$schema = new Schema();
$table = $schema->createTable($this->tableName);
$table->addColumn('subscriberid', Types::STRING, ['length' => 255]);
$table->addColumn('appliedsequencenumber', Types::INTEGER);
$table->setPrimaryKey(['subscriberid']);

$schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema);
foreach ($schemaDiff->toSaveSql($this->platform) as $statement) {
foreach ($this->determineRequiredSqlStatements() as $statement) {
$this->connection->executeStatement($statement);
}
try {
Expand All @@ -65,6 +56,32 @@ public function setUp(): void
}
}

public function status(): CheckpointStorageStatus
{
try {
$this->connection->connect();
} catch (\Throwable $e) {
return CheckpointStorageStatus::error(sprintf('Failed to connect to database for subscriber "%s": %s', $this->subscriberId, $e->getMessage()));
}
try {
$requiredSqlStatements = $this->determineRequiredSqlStatements();
} catch (\Throwable $e) {
return CheckpointStorageStatus::error(sprintf('Failed to compare database schema for subscriber "%s": %s', $this->subscriberId, $e->getMessage()));
}
if ($requiredSqlStatements !== []) {
return CheckpointStorageStatus::setupRequired(sprintf('The following SQL statement%s required for subscriber "%s": %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', $this->subscriberId, implode(chr(10), $requiredSqlStatements)));
}
try {
$appliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->tableName . ' WHERE subscriberid = :subscriberId', ['subscriberId' => $this->subscriberId]);
} catch (\Throwable $e) {
return CheckpointStorageStatus::error(sprintf('Failed to determine initial applied sequence number for subscriber "%s": %s', $this->subscriberId, $e->getMessage()));
}
if ($appliedSequenceNumber === false) {
return CheckpointStorageStatus::setupRequired(sprintf('Initial initial applied sequence number not set for subscriber "%s"', $this->subscriberId));
}
return CheckpointStorageStatus::ok();
}

public function acquireLock(): SequenceNumber
{
if ($this->connection->isTransactionActive()) {
Expand Down Expand Up @@ -121,4 +138,27 @@ public function getHighestAppliedSequenceNumber(): SequenceNumber
}
return SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber);
}

// --------------

/**
* @return array<string>
*/
private function determineRequiredSqlStatements(): array
{
$schemaManager = $this->connection->getSchemaManager();
if (!$schemaManager instanceof AbstractSchemaManager) {
throw new \RuntimeException('Failed to retrieve Schema Manager', 1705681161);
}
$tableSchema = new Table(
$this->tableName,
[
(new Column('subscriberid', Type::getType(Types::STRING)))->setLength(255),
(new Column('appliedsequencenumber', Type::getType(Types::INTEGER)))
]
);
$tableSchema->setPrimaryKey(['subscriberid']);
$schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [$tableSchema]);
return DbalSchemaDiff::determineRequiredSqlStatements($this->connection, $schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

declare(strict_types=1);

namespace Neos\ContentRepository\Core\Infrastructure;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;

/**
* @internal
*/
final class DbalSchemaDiff
{
// This class only contains static members and should not be constructed
private function __construct()
{
}

/**
* Compares the tables of the given $schema with existing tables for the given $connection
* and returns an array of required CREATE and ALTER TABLE statements if they don't match
*
* @return array<string> Array of SQL statements that have to be executed in order to create/adjust the tables
*/
public static function determineRequiredSqlStatements(Connection $connection, Schema $schema): array
{
$schemaManager = $connection->getSchemaManager();
if (!$schemaManager instanceof AbstractSchemaManager) {
throw new \RuntimeException('Failed to retrieve Schema Manager', 1705679142);
}
try {
$platform = $connection->getDatabasePlatform();
} catch (Exception $e) {
throw new \RuntimeException(sprintf('Failed to retrieve Database platform: %s', $e->getMessage()), 1705679144, $e);
}
if ($platform === null) { // @phpstan-ignore-line This is not possible according to doc types, but there is no corresponding type hint in DBAL 2.x
throw new \RuntimeException('Failed to retrieve Database platform', 1705679147);
}
$fromTableSchemas = [];
foreach ($schema->getTables() as $tableSchema) {
if ($schemaManager->tablesExist([$tableSchema->getName()])) {
$fromTableSchemas[] = $schemaManager->listTableDetails($tableSchema->getName());
}
}
$fromSchema = new Schema($fromTableSchemas, [], $schemaManager->createSchemaConfig());
return (new Comparator())->compare($fromSchema, $schema)->toSql($platform);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ interface CheckpointStorageInterface
*/
public function setUp(): void;

/**
* Retrieve the status of this checkpoint storage instance
*/
public function status(): CheckpointStorageStatus;

/**
* Obtain an exclusive lock (to prevent multiple instances from being executed simultaneously)
* and return the highest {@see SequenceNumber} that was processed by this checkpoint storage.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Neos\ContentRepository\Core\Projection;

/**
* @api
*/
final readonly class CheckpointStorageStatus
{
public function __construct(
public CheckpointStorageStatusType $type,
public string $details,
) {
}

public static function ok(string $details = ''): self
{
return new self(CheckpointStorageStatusType::OK, $details);
}

public static function error(string $details): self
{
return new self(CheckpointStorageStatusType::ERROR, $details);
}

public static function setupRequired(string $details = ''): self
{
return new self(CheckpointStorageStatusType::SETUP_REQUIRED, $details);
}
}
Loading

0 comments on commit 06904c3

Please sign in to comment.