Skip to content

Commit

Permalink
Merge pull request #4845 from neos/task/eventstore-1.0
Browse files Browse the repository at this point in the history
TASK: Update neos/eventstore packages to 1.0
  • Loading branch information
bwaidelich authored Jan 22, 2024
2 parents 9beb48c + 129072d commit 4ce6cec
Show file tree
Hide file tree
Showing 25 changed files with 469 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateDimensionsWereUpdated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
Expand All @@ -54,11 +55,8 @@
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
use Neos\ContentRepository\Core\SharedModel\Node\NodeName;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\EventStore\CatchUp\CheckpointStorageInterface;
use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStore\SetupResult;

/**
* @implements ProjectionInterface<ContentGraph>
Expand All @@ -81,7 +79,7 @@ final class DoctrineDbalContentGraphProjection implements ProjectionInterface, W
*/
private ?ContentGraph $contentGraph = null;

private DoctrineCheckpointStorage $checkpointStorage;
private DbalCheckpointStorage $checkpointStorage;

public function __construct(
private readonly DbalClientInterface $dbalClient,
Expand All @@ -91,7 +89,7 @@ public function __construct(
private readonly ProjectionContentGraph $projectionContentGraph,
private readonly string $tableNamePrefix,
) {
$this->checkpointStorage = new DoctrineCheckpointStorage(
$this->checkpointStorage = new DbalCheckpointStorage(
$this->dbalClient->getConnection(),
$this->tableNamePrefix . '_checkpoint',
self::class
Expand All @@ -111,10 +109,10 @@ protected function getTableNamePrefix(): string
public function setUp(): void
{
$this->setupTables();
$this->checkpointStorage->setup();
$this->checkpointStorage->setUp();
}

private function setupTables(): SetupResult
private function setupTables(): void
{
$connection = $this->dbalClient->getConnection();
$schemaManager = $connection->getSchemaManager();
Expand All @@ -128,7 +126,6 @@ private function setupTables(): SetupResult
foreach ($schemaDiff->toSaveSql($connection->getDatabasePlatform()) as $statement) {
$connection->executeStatement($statement);
}
return SetupResult::success('');
}

public function reset(): void
Expand Down Expand Up @@ -202,7 +199,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
};
}

public function getCheckpointStorage(): CheckpointStorageInterface
public function getCheckpointStorage(): DbalCheckpointStorage
{
return $this->checkpointStorage;
}
Expand Down Expand Up @@ -1239,7 +1236,8 @@ private function getDatabaseConnection(): Connection

private static function initiatingDateTime(EventEnvelope $eventEnvelope): \DateTimeImmutable
{
$result = $eventEnvelope->event->metadata->has('initiatingTimestamp') ? \DateTimeImmutable::createFromFormat(\DateTimeInterface::ATOM, $eventEnvelope->event->metadata->get('initiatingTimestamp')) : $eventEnvelope->recordedAt;
$initiatingTimestamp = $eventEnvelope->event->metadata?->get('initiatingTimestamp');
$result = $initiatingTimestamp !== null ? \DateTimeImmutable::createFromFormat(\DateTimeInterface::ATOM, $initiatingTimestamp) : $eventEnvelope->recordedAt;
if (!$result instanceof \DateTimeImmutable) {
throw new \RuntimeException(sprintf('Failed to extract initiating timestamp from event "%s"', $eventEnvelope->event->id->value), 1678902291);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodePeerVariantWasCreated;
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\EventStore\CatchUp\CheckpointStorageInterface;
use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStore\SetupResult;

/**
* The alternate reality-aware hypergraph projector for the PostgreSQL backend via Doctrine DBAL
Expand All @@ -76,7 +74,7 @@ final class HypergraphProjection implements ProjectionInterface
* so that always the same instance is returned
*/
private ?ContentHypergraph $contentHypergraph = null;
private DoctrineCheckpointStorage $checkpointStorage;
private DbalCheckpointStorage $checkpointStorage;
private ProjectionHypergraph $projectionHypergraph;

public function __construct(
Expand All @@ -87,7 +85,7 @@ public function __construct(
private readonly string $tableNamePrefix,
) {
$this->projectionHypergraph = new ProjectionHypergraph($this->databaseClient, $this->tableNamePrefix);
$this->checkpointStorage = new DoctrineCheckpointStorage(
$this->checkpointStorage = new DbalCheckpointStorage(
$this->databaseClient->getConnection(),
$this->tableNamePrefix . '_checkpoint',
self::class
Expand All @@ -98,10 +96,10 @@ public function __construct(
public function setUp(): void
{
$this->setupTables();
$this->checkpointStorage->setup();
$this->checkpointStorage->setUp();
}

private function setupTables(): SetupResult
private function setupTables(): void
{
$connection = $this->databaseClient->getConnection();
HypergraphSchemaBuilder::registerTypes($connection->getDatabasePlatform());
Expand All @@ -124,8 +122,6 @@ private function setupTables(): SetupResult
create index if not exists restriction_affected
on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids);
');

return SetupResult::success('');
}

public function reset(): void
Expand Down Expand Up @@ -212,7 +208,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
};
}

public function getCheckpointStorage(): CheckpointStorageInterface
public function getCheckpointStorage(): DbalCheckpointStorage
{
return $this->checkpointStorage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger;
use Neos\EventStore\DoctrineAdapter\DoctrineCheckpointStorage;

/**
* We had some race conditions in projections, where {@see DoctrineCheckpointStorage} was not working properly.
* We had some race conditions in projections, where {@see \Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage} was not working properly.
* We saw some non-deterministic, random errors when running the tests - unluckily only on Linux, not on OSX:
* On OSX, forking a new subprocess in {@see SubprocessProjectionCatchUpTrigger} is *WAY* slower than in Linux;
* and thus the race conditions which appears if two projector instances of the same class run concurrently
Expand Down
19 changes: 5 additions & 14 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\CatchUp;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder;
Expand All @@ -36,13 +37,10 @@
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\EventStore\CatchUp\CatchUp;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\EventMetadata;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStore\SetupResult;
use Neos\EventStore\Model\EventStream\VirtualStreamName;
use Neos\EventStore\ProvidesSetupInterface;
use Psr\Clock\ClockInterface;

/**
Expand Down Expand Up @@ -118,10 +116,10 @@ public function handle(CommandInterface $command): CommandResult
$initiatingUserId,
$initiatingTimestamp
) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata->value : [];
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
$metadata['initiatingUserId'] ??= $initiatingUserId;
$metadata['initiatingTimestamp'] ??= $initiatingTimestamp;
return DecoratedEvent::withMetadata($event, EventMetadata::fromArray($metadata));
return DecoratedEvent::create($event, metadata: EventMetadata::fromArray($metadata));
})
),
$eventsToPublish->expectedVersion,
Expand Down Expand Up @@ -193,19 +191,12 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o
$catchUpHook?->onAfterCatchUp();
}

public function setUp(): SetupResult
public function setUp(): void
{
if ($this->eventStore instanceof ProvidesSetupInterface) {
$result = $this->eventStore->setup();
// TODO better result object
if ($result->errors !== []) {
return $result;
}
}
$this->eventStore->setup();
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
$projection->setUp();
}
return SetupResult::success('done');
}

public function resetProjectionStates(): void
Expand Down
49 changes: 22 additions & 27 deletions Neos.ContentRepository.Core/Classes/EventStore/DecoratedEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Neos\ContentRepository\Core\EventStore;

use Neos\EventStore\Model\Event\CausationId;
use Neos\EventStore\Model\Event\CorrelationId;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventMetadata;

Expand All @@ -16,39 +18,32 @@ final class DecoratedEvent
{
private function __construct(
public readonly EventInterface $innerEvent,
public readonly EventId $eventId,
public readonly EventMetadata $eventMetadata,
public readonly ?EventId $eventId,
public readonly ?EventMetadata $eventMetadata,
public readonly ?CausationId $causationId,
public readonly ?CorrelationId $correlationId,
) {
}

public static function withMetadata(DecoratedEvent|EventInterface $event, EventMetadata $metadata): self
{
$event = self::wrapWithDecoratedEventIfNecessary($event);
return new self($event->innerEvent, $event->eventId, $metadata);
}

public static function withEventId(DecoratedEvent|EventInterface $event, EventId $eventId): self
{
$event = self::wrapWithDecoratedEventIfNecessary($event);
return new self($event->innerEvent, $eventId, $event->eventMetadata);
}

public static function withCausationId(
/**
* @param EventMetadata|array<string, mixed>|null $metadata
*/
public static function create(
DecoratedEvent|EventInterface $event,
EventId $causationId
EventId $eventId = null,
EventMetadata|array $metadata = null,
EventId|CausationId $causationId = null,
CorrelationId $correlationId = null,
): self {
$event = self::wrapWithDecoratedEventIfNecessary($event);
$eventMetadata = $event->eventMetadata->value;
$eventMetadata['causationId'] = $causationId->value;

return new self($event->innerEvent, $event->eventId, EventMetadata::fromArray($eventMetadata));
}

private static function wrapWithDecoratedEventIfNecessary(EventInterface|DecoratedEvent $event): DecoratedEvent
{
if ($event instanceof EventInterface) {
$event = new self($event, EventId::create(), EventMetadata::none());
$event = new self($event, null, null, null, null);
}
if ($causationId instanceof EventId) {
$causationId = CausationId::fromString($causationId->value);
}
if (is_array($metadata)) {
$metadata = EventMetadata::fromArray($metadata);
}
return $event;
return new self($event->innerEvent, $eventId ?? $event->eventId, $metadata ?? $event->eventMetadata, $causationId ?? $event->causationId, $correlationId ?? $event->correlationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,18 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult

private function normalizeEvent(EventInterface|DecoratedEvent $event): Event
{
if ($event instanceof DecoratedEvent) {
$eventId = $event->eventId;
$eventMetadata = $event->eventMetadata;
$event = $event->innerEvent;
} else {
$eventId = EventId::create();
$eventMetadata = EventMetadata::none();
}
$eventId = $event instanceof DecoratedEvent && $event->eventId !== null ? $event->eventId : EventId::create();
$eventMetadata = $event instanceof DecoratedEvent ? $event->eventMetadata : null;
$causationId = $event instanceof DecoratedEvent ? $event->causationId : null;
$correlationId = $event instanceof DecoratedEvent ? $event->correlationId : null;
$event = $event instanceof DecoratedEvent ? $event->innerEvent : $event;
return new Event(
$eventId,
$this->eventNormalizer->getEventType($event),
$this->eventNormalizer->getEventData($event),
$eventMetadata,
$causationId,
$correlationId,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Neos\ContentRepository\Core\CommandHandler\CommandInterface;
use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\Events;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventMetadata;

/**
Expand Down Expand Up @@ -71,13 +72,13 @@ public static function enrichWithCommand(
'commandClass' => get_class($command),
'commandPayload' => $commandPayload
]);
$event = DecoratedEvent::withMetadata($event, $metadata);
$event = DecoratedEvent::create($event, eventId: EventId::create(), metadata: $metadata);
// we remember the 1st event's identifier as causation identifier for all the others
$causationId = $event->eventId;
} else {
// event 2,3,4,...n get a causation identifier set, as they all originate from the 1st event.
if ($causationId !== null) {
$event = DecoratedEvent::withCausationId($event, $causationId);
$event = DecoratedEvent::create($event, causationId: $causationId);
}
}
$processedEvents[] = $event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,7 @@ private function publishContentStream(
$copiedEvent = $event->createCopyForContentStream($baseContentStreamId);
// We need to add the event metadata here for rebasing in nested workspace situations
// (and for exporting)
$events[] = DecoratedEvent::withMetadata(
$copiedEvent,
$eventEnvelope->event->metadata
);
$events[] = DecoratedEvent::create($copiedEvent, metadata: $eventEnvelope->event->metadata, causationId: $eventEnvelope->event->causationId, correlationId: $eventEnvelope->event->correlationId);
}
}

Expand Down Expand Up @@ -464,7 +461,7 @@ private function extractCommandsFromContentStreamMetadata(

$commands = [];
foreach ($workspaceContentStream as $eventEnvelope) {
$metadata = $eventEnvelope->event->metadata->value;
$metadata = $eventEnvelope->event->metadata?->value ?? [];
// TODO: Add this logic to the NodeAggregateCommandHandler;
// so that we can be sure these can be parsed again.
if (isset($metadata['commandClass'])) {
Expand Down
Loading

0 comments on commit 4ce6cec

Please sign in to comment.