Skip to content

Commit

Permalink
Parallelize TS verification
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrubN committed Apr 17, 2024
1 parent c930c28 commit 4488173
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
35 changes: 22 additions & 13 deletions TwitchDownloaderCore/TsMerger.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -63,32 +64,40 @@ public async Task MergeAsync(CancellationToken cancellationToken)

private async Task VerifyVideoParts(IReadOnlyCollection<string> fileList, CancellationToken cancellationToken)
{
var resultLock = new object();
var resultCounts = new Dictionary<TsVerifyResult, int>();
var missingParts = new List<string>();
var failedParts = new List<string>();
var partCount = fileList.Count;
var doneCount = 0;

foreach (var part in fileList)
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = cancellationToken
};

await Parallel.ForEachAsync(fileList, parallelOptions, async (part, token) =>
{
var result = await DownloadTools.VerifyTransportStream(part, _progress);

CollectionsMarshal.GetValueRefOrAddDefault(resultCounts, result, out _)++; // Gets or creates the value and increments it
if (result == TsVerifyResult.NotFound)
{
missingParts.Add(part);
}
else if (result != TsVerifyResult.Success)
lock (resultLock)
{
failedParts.Add(part);
CollectionsMarshal.GetValueRefOrAddDefault(resultCounts, result, out _)++; // Gets or creates the value and increments it
if (result == TsVerifyResult.NotFound)
{
missingParts.Add(part);
}
else if (result != TsVerifyResult.Success)
{
failedParts.Add(part);
}
}

doneCount++;
Interlocked.Add(ref doneCount, 1);
var percent = (int)(doneCount / (double)partCount * 100);
_progress.ReportProgress(percent);

cancellationToken.ThrowIfCancellationRequested();
}
});

_progress.LogVerbose($"TS verification results: {string.Join(", ", resultCounts.Select(x => $"{x.Key}: {x.Value}"))}");

Expand Down
22 changes: 15 additions & 7 deletions TwitchDownloaderCore/VideoDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,28 +275,36 @@ private void LogDownloadThreadExceptions(IReadOnlyCollection<Exception> download

private async Task VerifyDownloadedParts(ICollection<M3U8.Stream> playlist, Range videoListCrop, Uri baseUrl, string downloadFolder, DateTimeOffset vodAirDate, CancellationToken cancellationToken)
{
var failLock = new object();
var failCounts = new Dictionary<TsVerifyResult, int>();
var failedParts = new List<M3U8.Stream>();
var partCount = videoListCrop.End.Value - videoListCrop.Start.Value;
var doneCount = 0;

foreach (var stream in playlist.Take(videoListCrop))
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = cancellationToken
};

await Parallel.ForEachAsync(playlist.Take(videoListCrop), parallelOptions, async (stream, token) =>
{
var filePath = Path.Combine(downloadFolder, DownloadTools.RemoveQueryString(stream.Path));

var result = await DownloadTools.VerifyTransportStream(filePath, _progress);
if (result != TsVerifyResult.Success)
{
CollectionsMarshal.GetValueRefOrAddDefault(failCounts, result, out _)++; // Gets or creates the value and increments it
failedParts.Add(stream);
lock (failLock)
{
CollectionsMarshal.GetValueRefOrAddDefault(failCounts, result, out _)++; // Gets or creates the value and increments it
failedParts.Add(stream);
}
}

doneCount++;
Interlocked.Add(ref doneCount, 1);
var percent = (int)(doneCount / (double)partCount * 100);
_progress.ReportProgress(percent);

cancellationToken.ThrowIfCancellationRequested();
}
});

if (failCounts.Count > 0)
{
Expand Down

0 comments on commit 4488173

Please sign in to comment.