diff --git a/extensions/SQLServer/SQLServer/SqlServerMemory.cs b/extensions/SQLServer/SQLServer/SqlServerMemory.cs index 5adb98866..2dfa3a9e0 100644 --- a/extensions/SQLServer/SQLServer/SqlServerMemory.cs +++ b/extensions/SQLServer/SQLServer/SqlServerMemory.cs @@ -396,10 +396,12 @@ private async Task InitAsync(CancellationToken cancellationToken) { if (this._isReady) { return; } - await this._initSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - + var lockAcquired = false; try { + await this._initSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + lockAcquired = true; + if (this._isReady) { return; } await this.CacheSqlServerMajorVersionNumberAsync(cancellationToken).ConfigureAwait(false); @@ -408,7 +410,12 @@ private async Task InitAsync(CancellationToken cancellationToken) } finally { - this._initSemaphore.Release(); + // Decrease the internal counter only it the lock was acquired, + // e.g. not when WaitAsync times out or throws some exception + if (lockAcquired) + { + this._initSemaphore.Release(); + } } } diff --git a/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs b/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs index a2935dcef..71667e29a 100644 --- a/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs +++ b/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs @@ -270,11 +270,13 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs) { Task.Run(async () => { + var lockAcquired = false; try { if (this._queue.Count >= this._config.FetchBatchSize) { return; } await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false); + lockAcquired = true; // Loop through all messages on storage var messagesOnStorage = (await this._fileSystem.GetAllFileNamesAsync(this._queueName, "", this._cancellation.Token).ConfigureAwait(false)).ToList(); @@ -340,7 +342,12 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs) } finally { - s_lock.Release(); + // Decrease the internal counter only it the lock was acquired, + // e.g. not when WaitAsync times out or throws some exception + if (lockAcquired) + { + s_lock.Release(); + } } }, this._cancellation.Token); } @@ -354,11 +361,13 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e) { Task.Run(async () => { + var lockAcquired = false; try { if (this._queue.IsEmpty) { return; } await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false); + lockAcquired = true; this._log.LogTrace("Dispatching {MessageCount} messages", this._queue.Count); @@ -373,7 +382,12 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e) } finally { - s_lock.Release(); + // Decrease the internal counter only it the lock was acquired, + // e.g. not when WaitAsync times out or throws some exception + if (lockAcquired) + { + s_lock.Release(); + } } }, this._cancellation.Token); }