Skip to content

Commit

Permalink
Optimized the queue further.
Browse files Browse the repository at this point in the history
  • Loading branch information
Simnico99 committed Jan 26, 2023
1 parent fa13636 commit 35540e9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
40 changes: 20 additions & 20 deletions src/ZirconNet.Core/Async/QueueAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ namespace ZirconNet.Core.Async;

public sealed class QueueAsync
{
private readonly SemaphoreSlim _semaphoreSlim;
private readonly ConcurrentQueue<Func<Task>> _queuedActions = new();
private readonly SemaphoreSlim _taskSemaphore;
private readonly SemaphoreSlim _queueSemaphore;
private int _tasksInQueue = 0;

public QueueAsync(int maximumThreads = -1)
{
Expand All @@ -16,13 +17,14 @@ public QueueAsync(int maximumThreads = -1)
maximumThreads = Environment.ProcessorCount;
}

_semaphoreSlim = new SemaphoreSlim(maximumThreads);
_taskSemaphore = new SemaphoreSlim(maximumThreads);
_queueSemaphore = new SemaphoreSlim(1,1);
}

private async Task RunAction(Func<Task> actionToRun, CancellationToken cancellationToken)
{
_queuedActions.Enqueue(actionToRun);
await _semaphoreSlim.WaitAsync(cancellationToken);
Interlocked.Increment(ref _tasksInQueue);
await _taskSemaphore.WaitAsync(cancellationToken);

_ = Task.Run(async () =>
{
Expand All @@ -32,19 +34,23 @@ private async Task RunAction(Func<Task> actionToRun, CancellationToken cancellat
{
await actionToRun.Invoke();
}
catch (Exception)
{
// Log or handle the exception as appropriate
}
finally
{
_semaphoreSlim.Release();
Interlocked.Decrement(ref _tasksInQueue);
if (_tasksInQueue == 0)
{
_queueSemaphore.Release();
}
_taskSemaphore.Release();
}
}
}, cancellationToken);
}

/// <summary>
/// Add a task to the running queue.
/// </summary>
/// <param name="actionToRun">The current task to run.</param>
/// <param name="cancellationToken">Cancellation token</param>
public async Task AddTaskAsync(Func<Task> actionToRun, CancellationToken cancellationToken = default)
{
if (!cancellationToken.IsCancellationRequested)
Expand All @@ -53,14 +59,8 @@ public async Task AddTaskAsync(Func<Task> actionToRun, CancellationToken cancell
}
}

/// <summary>
/// Wait for the current queued items to reach 0.
/// </summary>
public async Task WaitForQueueEnd()
public async Task WaitForQueueEnd(CancellationToken cancellationToken = default)
{
while (_queuedActions.TryPeek(out _))
{
await Task.Delay(10);
}
await _queueSemaphore.WaitAsync(cancellationToken);
}
}
}
4 changes: 3 additions & 1 deletion src/ZirconNet.WPF/Dispatcher/BufferedThreadDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ public sealed class BufferedThreadDispatcher
{
public static BufferedThreadDispatcher Current { get; } = new();

//Delay to wait between the screen refresh.
/// <summary>
/// Delay to wait between the screen refresh.
/// </summary>
public int Delay { get; set; } = 40;

private readonly int _mainThreadId;
Expand Down

0 comments on commit 35540e9

Please sign in to comment.