Skip to content

Commit

Permalink
Implement CatchUpDeduplication with locks
Browse files Browse the repository at this point in the history
Using symfony/lock ensures atomic locking, which
should really prevent duplication even under load.
  • Loading branch information
kitsunet committed Dec 6, 2023
1 parent 5e31584 commit 5299fcc
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
use Neos\Utility\Arrays;
use Neos\Utility\PositionalArraySorter;
use Psr\Clock\ClockInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\PersistingStoreInterface;
use Symfony\Component\Lock\Store\DoctrineDbalStore;
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface;
use Symfony\Component\Serializer\Normalizer\NormalizerInterface;
use Symfony\Component\Serializer\Serializer;
Expand Down Expand Up @@ -238,14 +241,20 @@ private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepo
}
$projectionCatchUpTrigger = $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []);

$catchUpStateCache = $this->objectManager->get('Neos.ContentRepositoryRegistry:CacheCatchUpStates');
if (!$catchUpStateCache instanceof FrontendInterface) {
throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:CacheCatchUpStates" must provide a Cache Frontend, but is "%s".', get_debug_type($catchUpStateCache));
$catchUpStateLockStorage = $this->objectManager->get('Neos.ContentRepositoryRegistry:QueueLockStorage');
if (!$catchUpStateLockStorage instanceof PersistingStoreInterface) {
throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:QueueLockStorage" must provide a \Symfony\Component\Lock\PersistingStoreInterface, but is "%s".', get_debug_type($catchUpStateLockStorage));
}
if ($catchUpStateLockStorage instanceof DoctrineDbalStore) {
try {
// hack to ensure tables exist for Dbal
$catchUpStateLockStorage->createTable();
} catch (\Doctrine\DBAL\Exception\TableExistsException $_) {}
}

return new CatchUpDeduplicationQueue(
$contentRepositoryId,
$catchUpStateCache,
new LockFactory($catchUpStateLockStorage),
$projectionCatchUpTrigger
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<?php
namespace Neos\ContentRepositoryRegistry\Service;

use Neos\Cache\Frontend\FrontendInterface;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Symfony\Component\Lock\LockFactory;

/**
* This encapsulates logic to provide exactly once catchUps
Expand All @@ -15,7 +15,7 @@
{
public function __construct(
private ContentRepositoryId $contentRepositoryId,
private FrontendInterface $catchUpLock,
private LockFactory $lockFactoy,
private ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger
) {}

Expand Down Expand Up @@ -46,24 +46,25 @@ public function requestCatchUp(Projections $projections): void
*/
public function releaseCatchUpLock(string $projectionClassName): void
{
$this->catchUpLock->remove($this->cacheKeyRunning($projectionClassName));
$runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projectionClassName));
$runningLock->isAcquired() && $runningLock->release();
}

private function triggerCatchUpAndReturnQueued(Projections $projections): Projections
{
$projectionsToCatchUp = [];
$queuedProjections = [];
foreach ($projections as $projection) {
if (!$this->isRunning($projection::class)) {
$this->run($projection::class);
$runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projection::class));
$queuedLock = $this->lockFactoy->createLock($this->cacheKeyQueued($projection::class));
if ($runningLock->acquire()) {
// We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it.
$this->dequeue($projection::class);
$queuedLock->release();
$projectionsToCatchUp[] = $projection;
continue;
}

if (!$this->isQueued($projection::class)) {
$this->queue($projection::class);
if ($queuedLock->acquire()) {
$queuedProjections[] = $projection;
}
}
Expand All @@ -77,16 +78,19 @@ private function retryQueued(Projections $queuedProjections): Projections
{
$passToCatchUp = [];
$stillQueuedProjections = [];

foreach ($queuedProjections as $projection) {
if (!$this->isQueued($projection::class)) {
$runningLock = $this->lockFactoy->createLock($this->cacheKeyRunning($projection::class));
$queuedLock = $this->lockFactoy->createLock($this->cacheKeyQueued($projection::class));

if (!$queuedLock->isAcquired()) {
// was dequeued, we can drop it
continue;
}

if (!$this->isRunning($projection::class)) {
$this->run($projection::class);
if ($runningLock->acquire()) {
// We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it.
$this->dequeue($projection::class);
$queuedLock->release();
$passToCatchUp[] = $projection;
continue;
}
Expand All @@ -99,51 +103,6 @@ private function retryQueued(Projections $queuedProjections): Projections
return Projections::fromArray($stillQueuedProjections);
}

/**
* @param class-string $projectionClassName
* @return bool
*/
private function isRunning(string $projectionClassName): bool
{
return $this->catchUpLock->has($this->cacheKeyRunning($projectionClassName));
}

/**
* @param class-string $projectionClassName
* @return void
*/
private function run(string $projectionClassName): void
{
$this->catchUpLock->set($this->cacheKeyRunning($projectionClassName), 1);
}

/**
* @param class-string $projectionClassName
* @return bool
*/
private function isQueued(string $projectionClassName): bool
{
return $this->catchUpLock->has($this->cacheKeyQueued($projectionClassName));
}

/**
* @param class-string $projectionClassName
* @return void
*/
private function queue(string $projectionClassName): void
{
$this->catchUpLock->set($this->cacheKeyQueued($projectionClassName), 1);
}

/**
* @param class-string $projectionClassName
* @return void
*/
private function dequeue(string $projectionClassName): void
{
$this->catchUpLock->remove($this->cacheKeyQueued($projectionClassName));
}

/**
* @param class-string $projectionClassName
* @return string
Expand Down
17 changes: 17 additions & 0 deletions Neos.ContentRepositoryRegistry/Configuration/Objects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,20 @@ Neos\ContentRepository\Core\Projection\NodeHiddenState\NodeHiddenStateProjection
arguments:
1:
value: Neos_ContentRepositoryRegistry_CatchUpStates

'Neos.ContentRepositoryRegistry:DbalConnection':
className: Doctrine\DBAL\Connection
factoryObjectName: Neos\ContentRepositoryRegistry\DoctrineDbalClient\DoctrineDbalClient
factoryMethodName: getConnection

'Neos.ContentRepositoryRegistry:QueueLockStorage':
className: Symfony\Component\Lock\Store\DoctrineDbalStore
arguments:
1:
object: 'Neos.ContentRepositoryRegistry:DbalConnection'

'Neos.ContentRepositoryRegistry:QueueLockFactory':
className: Symfony\Component\Lock\LockFactory
arguments:
1:
object: 'Neos.ContentRepositoryRegistry:QueueLockStorage'
3 changes: 2 additions & 1 deletion Neos.ContentRepositoryRegistry/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"neos/contentrepository-core": "self.version",
"neos/contentrepositoryregistry-storageclient": "self.version",
"symfony/property-access": "^5.4|^6.0",
"psr/clock": "^1"
"psr/clock": "^1",
"symfony/lock": "^6.0.0"
},
"autoload": {
"psr-4": {
Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"neos/behat": "^9.0",
"neos/contentrepositoryregistry-storageclient": "self.version",
"symfony/property-access": "^5.4|^6.0",
"symfony/lock": "^6.0.0",
"neos/fluid-adaptor": "*",
"neos/cache": "*",
"neos/eel": "*",
Expand Down

0 comments on commit 5299fcc

Please sign in to comment.