Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Content Repository status #4846

Merged
merged 5 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Types\Types;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeDisabling;
use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeMove;
Expand Down Expand Up @@ -46,10 +45,13 @@
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType;
use Neos\ContentRepository\Core\Projection\ContentGraph\Timestamps;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateClassification;
use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId;
Expand Down Expand Up @@ -108,24 +110,49 @@ protected function getTableNamePrefix(): string

public function setUp(): void
{
$this->setupTables();
foreach ($this->determineRequiredSqlStatements() as $statement) {
$this->getDatabaseConnection()->executeStatement($statement);
}
$this->checkpointStorage->setUp();
}

private function setupTables(): void
/**
* @return array<string>
*/
private function determineRequiredSqlStatements(): array
{
$connection = $this->dbalClient->getConnection();
$schemaManager = $connection->getSchemaManager();
if (!$schemaManager instanceof AbstractSchemaManager) {
throw new \RuntimeException('Failed to retrieve Schema Manager', 1625653914);
}

$schema = (new DoctrineDbalContentGraphSchemaBuilder($this->tableNamePrefix))->buildSchema($schemaManager);
return DbalSchemaDiff::determineRequiredSqlStatements($connection, $schema);
}

$schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema);
foreach ($schemaDiff->toSaveSql($connection->getDatabasePlatform()) as $statement) {
$connection->executeStatement($statement);
public function status(): ProjectionStatus
{
$checkpointStorageStatus = $this->checkpointStorage->status();
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) {
return ProjectionStatus::error($checkpointStorageStatus->details);
}
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) {
return ProjectionStatus::setupRequired($checkpointStorageStatus->details);
}
try {
$this->getDatabaseConnection()->connect();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
}
try {
$requiredSqlStatements = $this->determineRequiredSqlStatements();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage()));
}
if ($requiredSqlStatements !== []) {
return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
}
return ProjectionStatus::ok();
}

public function reset(): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\ContentStreamForking;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\NodeCreation;
use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\Feature\NodeDisabling;
Expand Down Expand Up @@ -46,8 +45,11 @@
use Neos\ContentRepository\Core\Feature\NodeVariation\Event\NodeSpecializationVariantWasCreated;
use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated;
use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage;
use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatus;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;

Expand Down Expand Up @@ -95,11 +97,50 @@ public function __construct(

public function setUp(): void
{
$this->setupTables();
foreach ($this->determineRequiredSqlStatements() as $statement) {
$this->getDatabaseConnection()->executeStatement($statement);
}
$this->getDatabaseConnection()->executeStatement('
CREATE INDEX IF NOT EXISTS node_properties ON ' . $this->tableNamePrefix . '_node USING GIN(properties);

create index if not exists hierarchy_children
on ' . $this->tableNamePrefix . '_hierarchyhyperrelation using gin (childnodeanchors);

create index if not exists restriction_affected
on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids);
');
$this->checkpointStorage->setUp();
}

private function setupTables(): void
public function status(): ProjectionStatus
{
$checkpointStorageStatus = $this->checkpointStorage->status();
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) {
return ProjectionStatus::error($checkpointStorageStatus->details);
}
if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) {
return ProjectionStatus::setupRequired($checkpointStorageStatus->details);
}
try {
$this->getDatabaseConnection()->connect();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage()));
}
try {
$requiredSqlStatements = $this->determineRequiredSqlStatements();
} catch (\Throwable $e) {
return ProjectionStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage()));
}
if ($requiredSqlStatements !== []) {
return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements)));
}
return ProjectionStatus::ok();
}

/**
* @return array<string>
*/
private function determineRequiredSqlStatements(): array
{
$connection = $this->databaseClient->getConnection();
HypergraphSchemaBuilder::registerTypes($connection->getDatabasePlatform());
Expand All @@ -109,19 +150,7 @@ private function setupTables(): void
}

$schema = (new HypergraphSchemaBuilder($this->tableNamePrefix))->buildSchema();
$schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema);
foreach ($schemaDiff->toSaveSql($connection->getDatabasePlatform()) as $statement) {
$connection->executeStatement($statement);
}
$connection->executeStatement('
CREATE INDEX IF NOT EXISTS node_properties ON ' . $this->tableNamePrefix . '_node USING GIN(properties);

create index if not exists hierarchy_children
on ' . $this->tableNamePrefix . '_hierarchyhyperrelation using gin (childnodeanchors);

create index if not exists restriction_affected
on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids);
');
return DbalSchemaDiff::determineRequiredSqlStatements($connection, $schema);
}

public function reset(): void
Expand Down
14 changes: 14 additions & 0 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatuses;
use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryStatus;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\EventMetadata;
Expand Down Expand Up @@ -199,6 +201,18 @@ public function setUp(): void
}
}

public function status(): ContentRepositoryStatus
{
$projectionStatuses = ProjectionStatuses::create();
foreach ($this->projectionsAndCatchUpHooks->projections as $projectionClassName => $projection) {
$projectionStatuses = $projectionStatuses->with($projectionClassName, $projection->status());
}
return new ContentRepositoryStatus(
$this->eventStore->status(),
$projectionStatuses,
);
}

public function resetProjectionStates(): void
{
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
use Doctrine\DBAL\Platforms\MySqlPlatform;
use Doctrine\DBAL\Platforms\PostgreSqlPlatform;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Column;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Neos\ContentRepository\Core\Projection\CheckpointStorageInterface;
use Neos\ContentRepository\Core\Projection\CheckpointStorageStatus;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
Expand Down Expand Up @@ -44,18 +46,7 @@ public function __construct(

public function setUp(): void
{
$schemaManager = $this->connection->getSchemaManager();
if (!$schemaManager instanceof AbstractSchemaManager) {
throw new \RuntimeException('Failed to retrieve Schema Manager', 1652269057);
}
$schema = new Schema();
$table = $schema->createTable($this->tableName);
$table->addColumn('subscriberid', Types::STRING, ['length' => 255]);
$table->addColumn('appliedsequencenumber', Types::INTEGER);
$table->setPrimaryKey(['subscriberid']);

$schemaDiff = (new Comparator())->compare($schemaManager->createSchema(), $schema);
foreach ($schemaDiff->toSaveSql($this->platform) as $statement) {
foreach ($this->determineRequiredSqlStatements() as $statement) {
$this->connection->executeStatement($statement);
}
try {
Expand All @@ -65,6 +56,32 @@ public function setUp(): void
}
}

public function status(): CheckpointStorageStatus
{
try {
$this->connection->connect();
} catch (\Throwable $e) {
return CheckpointStorageStatus::error(sprintf('Failed to connect to database for subscriber "%s": %s', $this->subscriberId, $e->getMessage()));
}
try {
$requiredSqlStatements = $this->determineRequiredSqlStatements();
} catch (\Throwable $e) {
return CheckpointStorageStatus::error(sprintf('Failed to compare database schema for subscriber "%s": %s', $this->subscriberId, $e->getMessage()));
}
if ($requiredSqlStatements !== []) {
return CheckpointStorageStatus::setupRequired(sprintf('The following SQL statement%s required for subscriber "%s": %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', $this->subscriberId, implode(chr(10), $requiredSqlStatements)));
}
try {
$appliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->tableName . ' WHERE subscriberid = :subscriberId', ['subscriberId' => $this->subscriberId]);
} catch (\Throwable $e) {
return CheckpointStorageStatus::error(sprintf('Failed to determine initial applied sequence number for subscriber "%s": %s', $this->subscriberId, $e->getMessage()));
}
if ($appliedSequenceNumber === false) {
return CheckpointStorageStatus::setupRequired(sprintf('Initial initial applied sequence number not set for subscriber "%s"', $this->subscriberId));
}
return CheckpointStorageStatus::ok();
}

public function acquireLock(): SequenceNumber
{
if ($this->connection->isTransactionActive()) {
Expand Down Expand Up @@ -121,4 +138,27 @@ public function getHighestAppliedSequenceNumber(): SequenceNumber
}
return SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber);
}

// --------------

/**
* @return array<string>
*/
private function determineRequiredSqlStatements(): array
{
$schemaManager = $this->connection->getSchemaManager();
if (!$schemaManager instanceof AbstractSchemaManager) {
throw new \RuntimeException('Failed to retrieve Schema Manager', 1705681161);
}
$tableSchema = new Table(
$this->tableName,
[
(new Column('subscriberid', Type::getType(Types::STRING)))->setLength(255),
(new Column('appliedsequencenumber', Type::getType(Types::INTEGER)))
]
);
$tableSchema->setPrimaryKey(['subscriberid']);
$schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [$tableSchema]);
return DbalSchemaDiff::determineRequiredSqlStatements($this->connection, $schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

declare(strict_types=1);

namespace Neos\ContentRepository\Core\Infrastructure;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;

/**
* @internal
*/
final class DbalSchemaDiff
{
// This class only contains static members and should not be constructed
private function __construct()
{
}

/**
* Compares the tables of the given $schema with existing tables for the given $connection
* and returns an array of required CREATE and ALTER TABLE statements if they don't match
*
* @return array<string> Array of SQL statements that have to be executed in order to create/adjust the tables
*/
public static function determineRequiredSqlStatements(Connection $connection, Schema $schema): array
{
$schemaManager = $connection->getSchemaManager();
if (!$schemaManager instanceof AbstractSchemaManager) {
throw new \RuntimeException('Failed to retrieve Schema Manager', 1705679142);
}
try {
$platform = $connection->getDatabasePlatform();
} catch (Exception $e) {
throw new \RuntimeException(sprintf('Failed to retrieve Database platform: %s', $e->getMessage()), 1705679144, $e);
}
if ($platform === null) { // @phpstan-ignore-line This is not possible according to doc types, but there is no corresponding type hint in DBAL 2.x
throw new \RuntimeException('Failed to retrieve Database platform', 1705679147);
}
$fromTableSchemas = [];
foreach ($schema->getTables() as $tableSchema) {
if ($schemaManager->tablesExist([$tableSchema->getName()])) {
$fromTableSchemas[] = $schemaManager->listTableDetails($tableSchema->getName());
}
}
$fromSchema = new Schema($fromTableSchemas, [], $schemaManager->createSchemaConfig());
return (new Comparator())->compare($fromSchema, $schema)->toSql($platform);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ interface CheckpointStorageInterface
*/
public function setUp(): void;

/**
* Retrieve the status of this checkpoint storage instance
*/
public function status(): CheckpointStorageStatus;

/**
* Obtain an exclusive lock (to prevent multiple instances from being executed simultaneously)
* and return the highest {@see SequenceNumber} that was processed by this checkpoint storage.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Neos\ContentRepository\Core\Projection;

/**
* @api
*/
final readonly class CheckpointStorageStatus
{
public function __construct(
public CheckpointStorageStatusType $type,
public string $details,
) {
}

public static function ok(string $details = ''): self
{
return new self(CheckpointStorageStatusType::OK, $details);
}

public static function error(string $details): self
{
return new self(CheckpointStorageStatusType::ERROR, $details);
}

public static function setupRequired(string $details = ''): self
{
return new self(CheckpointStorageStatusType::SETUP_REQUIRED, $details);
}
}
Loading
Loading