-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
53 additions
and
85 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,83 +1,66 @@ | ||
using Microsoft.Extensions.Logging; | ||
using ZirconNet.Core.Extensions; | ||
using System.Collections.Concurrent; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace ZirconNet.Core.Async; | ||
|
||
public sealed class QueueAsync | ||
{ | ||
private readonly SemaphoreSlim _semaphoreSlim; | ||
private readonly SemaphoreSlim _lockSemaphore = new(1, 1); | ||
private readonly IList<Func<Task>> _queuedActions = new List<Func<Task>>(); | ||
private readonly ILogger? _logger; | ||
private readonly ConcurrentQueue<Func<Task>> _queuedActions = new(); | ||
|
||
public QueueAsync(int maximumThreads = -1, ILogger? logger = null) | ||
public QueueAsync(int maximumThreads = -1) | ||
{ | ||
if (maximumThreads <= 0) | ||
{ | ||
maximumThreads = Environment.ProcessorCount; | ||
} | ||
|
||
_semaphoreSlim = new(maximumThreads); | ||
_logger = logger; | ||
_semaphoreSlim = new SemaphoreSlim(maximumThreads); | ||
} | ||
|
||
private async Task RunAction(Func<Task> actionToRun) | ||
private async Task RunAction(Func<Task> actionToRun, CancellationToken cancellationToken) | ||
{ | ||
if (_queuedActions.CountThreadSafe() == 0) | ||
{ | ||
await _lockSemaphore.WaitAsync(); | ||
} | ||
|
||
_queuedActions.AddThreadSafe(actionToRun); | ||
await _semaphoreSlim.WaitAsync(); | ||
_queuedActions.Enqueue(actionToRun); | ||
await _semaphoreSlim.WaitAsync(cancellationToken); | ||
|
||
_ = Task.Run(async () => | ||
{ | ||
try | ||
{ | ||
await actionToRun.Invoke(); | ||
} | ||
finally | ||
if (!cancellationToken.IsCancellationRequested) | ||
{ | ||
_ = _semaphoreSlim.Release(); | ||
_queuedActions.RemoveThreadSafe(actionToRun); | ||
|
||
if (_queuedActions.CountThreadSafe() == 0) | ||
try | ||
{ | ||
await actionToRun.Invoke(); | ||
} | ||
finally | ||
{ | ||
_ = _lockSemaphore.Release(); | ||
_semaphoreSlim.Release(); | ||
} | ||
} | ||
}); | ||
}, cancellationToken); | ||
} | ||
|
||
/// <summary> | ||
/// Add a task to the running queue. | ||
/// </summary> | ||
/// <param name="actionToRun">The current task to run.</param> | ||
/// <param name="cancellationToken">Cancellation token</param> | ||
/// <returns>The current running action</returns> | ||
public async Task AddTaskAsync(Func<Task> actionToRun, CancellationToken cancellationToken = default) | ||
{ | ||
if (!cancellationToken.IsCancellationRequested) | ||
{ | ||
await RunAction(actionToRun); | ||
await RunAction(actionToRun, cancellationToken); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Wait for the current queued items to reach 0. | ||
/// </summary> | ||
/// <returns>The current task that wait the queue to end.</returns> | ||
/// <exception cref="SemaphoreFullException"></exception> | ||
public async Task WaitForQueueEnd() | ||
{ | ||
try | ||
{ | ||
await _lockSemaphore.WaitAsync(); | ||
_ = _lockSemaphore.Release(); | ||
} | ||
catch (SemaphoreFullException e) | ||
while (_queuedActions.TryPeek(out _)) | ||
{ | ||
_logger?.LogWarning(e, "Error when waiting for semaphoreslim to end"); | ||
await Task.Delay(10); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters