From e8ac5fc217162a67df1ec79d859eb46abb96cd91 Mon Sep 17 00:00:00 2001 From: Devis Lucato Date: Thu, 12 Dec 2024 11:22:13 -0800 Subject: [PATCH] Fix SemaphoreSlim release logic (#941) Under some circumstances, SemaphoreSlim.WaitAsync() can fail, e.g. throwing an OperationCanceledException. In these cases, the internal semaphore counter didn't change, so SemaphoreSlim.Release() should not be called, to avoid exceptions. Classes affected: - SqlServerMemory (initialization) - SimpleQueue (message dispatching) --- .../SQLServer/SQLServer/SqlServerMemory.cs | 13 ++++++++++--- .../Pipeline/Queue/DevTools/SimpleQueues.cs | 18 ++++++++++++++++-- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/extensions/SQLServer/SQLServer/SqlServerMemory.cs b/extensions/SQLServer/SQLServer/SqlServerMemory.cs index 5adb98866..2dfa3a9e0 100644 --- a/extensions/SQLServer/SQLServer/SqlServerMemory.cs +++ b/extensions/SQLServer/SQLServer/SqlServerMemory.cs @@ -396,10 +396,12 @@ private async Task InitAsync(CancellationToken cancellationToken) { if (this._isReady) { return; } - await this._initSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - + var lockAcquired = false; try { + await this._initSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + lockAcquired = true; + if (this._isReady) { return; } await this.CacheSqlServerMajorVersionNumberAsync(cancellationToken).ConfigureAwait(false); @@ -408,7 +410,12 @@ private async Task InitAsync(CancellationToken cancellationToken) } finally { - this._initSemaphore.Release(); + // Decrease the internal counter only it the lock was acquired, + // e.g. not when WaitAsync times out or throws some exception + if (lockAcquired) + { + this._initSemaphore.Release(); + } } } diff --git a/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs b/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs index a2935dcef..71667e29a 100644 --- a/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs +++ b/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs @@ -270,11 +270,13 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs) { Task.Run(async () => { + var lockAcquired = false; try { if (this._queue.Count >= this._config.FetchBatchSize) { return; } await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false); + lockAcquired = true; // Loop through all messages on storage var messagesOnStorage = (await this._fileSystem.GetAllFileNamesAsync(this._queueName, "", this._cancellation.Token).ConfigureAwait(false)).ToList(); @@ -340,7 +342,12 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs) } finally { - s_lock.Release(); + // Decrease the internal counter only it the lock was acquired, + // e.g. not when WaitAsync times out or throws some exception + if (lockAcquired) + { + s_lock.Release(); + } } }, this._cancellation.Token); } @@ -354,11 +361,13 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e) { Task.Run(async () => { + var lockAcquired = false; try { if (this._queue.IsEmpty) { return; } await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false); + lockAcquired = true; this._log.LogTrace("Dispatching {MessageCount} messages", this._queue.Count); @@ -373,7 +382,12 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e) } finally { - s_lock.Release(); + // Decrease the internal counter only it the lock was acquired, + // e.g. not when WaitAsync times out or throws some exception + if (lockAcquired) + { + s_lock.Release(); + } } }, this._cancellation.Token); }