diff --git a/src/Imazen.Routing/Promises/Pipelines/BlobCachingTestHarness.cs b/src/Imazen.Routing/Promises/Pipelines/BlobCachingTestHarness.cs new file mode 100644 index 0000000..fee2c29 --- /dev/null +++ b/src/Imazen.Routing/Promises/Pipelines/BlobCachingTestHarness.cs @@ -0,0 +1,120 @@ +using Imazen.Abstractions.BlobCache; +using Imazen.Abstractions.Blobs; +using Imazen.Abstractions.Logging; +using Imazen.Abstractions.Resulting; +using Imazen.Common.Concurrency.BoundedTaskCollection; +using Imazen.Routing.Caching; +using Imazen.Routing.Health; +using Imazen.Routing.Requests; +using Microsoft.Extensions.Hosting; + +namespace Imazen.Routing.Promises.Pipelines; + +public record BlobCachingTestHarnessOptions( + long? MaxUploadQueueBytes, + MemoryCacheOptions? MemoryCacheOptions, + bool DelayRequestUntilUploadsComplete, + List> SeriesOfCacheGroups, + List SaveToCaches, + Func>> BlobProvider, + LatencyTrackingZone BlobProviderLatencyZone, + IReLogger Logger, + bool LockByUniqueRequest, + bool ShutdownServices) +{ + public static BlobCachingTestHarnessOptions TestSingleCacheSync(IBlobCache cache, Func>> blobProvider, IReLogger logger) + { + return new BlobCachingTestHarnessOptions( + null, + null, + true, + new List> { new List { cache } }, + new List { cache }, + blobProvider, + new LatencyTrackingZone("TestBlobProvider", 10000,true), + logger, + false, + true + ); + } +} + + +public class BlobCachingTestHarness: IHostedService +{ + BlobCachingTestHarnessOptions options; + BlobPipelineHarness blobPipelineHarness; + BoundedTaskCollection? uploadQueue; + CacheHealthTracker cacheHealthTracker; + CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource(); + public BlobCachingTestHarness(BlobCachingTestHarnessOptions options) + { + this.options = options; + if (options.MaxUploadQueueBytes != null) + { + uploadQueue = new BoundedTaskCollection(options.MaxUploadQueueBytes.Value, CancellationTokenSource); + // Now ensure caches wait for uploads to write before shutting down. + foreach(var c in options.SaveToCaches) + c.Initialize(new BlobCacheSupportData(() => uploadQueue!.AwaitAllCurrentTasks())); + } + cacheHealthTracker = new CacheHealthTracker(options.Logger); + var cacheEngineOptions = new CacheEngineOptions + { + HealthTracker = cacheHealthTracker, + SeriesOfCacheGroups = options.SeriesOfCacheGroups, + SaveToCaches = options.SaveToCaches, + Logger = options.Logger, + UploadQueue = uploadQueue, + DelayRequestUntilUploadsComplete = options.DelayRequestUntilUploadsComplete, + LockByUniqueRequest = options.LockByUniqueRequest, + BlobFactory = new SimpleReusableBlobFactory() + }; + var cacheEngine = new CacheEngine(null, cacheEngineOptions); + blobPipelineHarness = new BlobPipelineHarness(new BlobPipelineHarnessOptions( + cacheEngine, + options.BlobProvider, + options.Logger, + options.BlobProviderLatencyZone)); + + } + + public async ValueTask> RequestBlobWrapper(string path, string query = "", + CancellationToken cancellationToken = default) + { + return await blobPipelineHarness.RequestBlobWrapper(path, query, cancellationToken); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + if (uploadQueue != null) + { + await uploadQueue.StopAsync(cancellationToken); + } + + await cacheHealthTracker.StopAsync(cancellationToken); + if (options.ShutdownServices) + { + var allCaches = options.SeriesOfCacheGroups.SelectMany(x => x).Concat(options.SaveToCaches); + foreach (var cache in allCaches) + { + if (cache is IHostedService service) + { + await service.StopAsync(cancellationToken); + } + } + } + } + + public async Task AwaitEnqueuedTasks() + { + if (uploadQueue != null) + { + await uploadQueue.AwaitAllCurrentTasks(); + } + } +} \ No newline at end of file diff --git a/src/Imazen.Routing/Promises/Pipelines/BlobPipelineHarness.cs b/src/Imazen.Routing/Promises/Pipelines/BlobPipelineHarness.cs new file mode 100644 index 0000000..d93287c --- /dev/null +++ b/src/Imazen.Routing/Promises/Pipelines/BlobPipelineHarness.cs @@ -0,0 +1,89 @@ +using Imazen.Abstractions.Blobs; +using Imazen.Abstractions.Logging; +using Imazen.Abstractions.Resulting; +using Imazen.Routing.Engine; +using Imazen.Routing.HttpAbstractions; +using Imazen.Routing.Layers; +using Imazen.Routing.Requests; +using Microsoft.Extensions.Logging; + +namespace Imazen.Routing.Promises.Pipelines; + +public record BlobPipelineHarnessOptions( + IBlobPromisePipeline Pipeline, + Func>> BlobProvider, + IReLogger Logger, LatencyTrackingZone BlobOriginLatencyZone); + +public class BlobPipelineHarness +{ + readonly RoutingEngine router; + IBlobPromisePipeline pipeline; + IReLogger logger; + + public BlobPipelineHarness(RoutingEngine router, IBlobPromisePipeline pipeline, IReLogger logger) + { + this.router = router; + this.pipeline = pipeline; + this.logger = logger; + } + public BlobPipelineHarness(BlobPipelineHarnessOptions options) + { + var routingBuilder = new RoutingBuilder().AddEndpointLayer( + new SimpleLayer("BlobEndpoint", req => + { + var endpoint = + new PromiseWrappingEndpoint( + new CacheableBlobPromise(req, options.BlobOriginLatencyZone, options.BlobProvider)); + return CodeResult.Ok(endpoint); + }, null)); + router = routingBuilder.Build(options.Logger); + pipeline = options.Pipeline; + logger = options.Logger; + + } + + public async ValueTask> RequestBlobWrapper(string path, string query = "", + CancellationToken cancellationToken = default) + { + var request = new EmptyHttpRequest(path, query); + var mutableRequest = MutableRequest.OriginalRequest(request); + return await Request(mutableRequest, cancellationToken); + } + + public async ValueTask> Request(MutableRequest mutableRequest, CancellationToken cancellationToken = default) + { + var result = await router.RouteToPromiseAsync(mutableRequest, cancellationToken); + if (result == null) + { + return CodeResult.Err((404, "No route found")); + } + if (result.IsError) + { + return CodeResult.Err(result.Error); + } + + var outerRequest = mutableRequest.OriginatingRequest ?? new EmptyHttpRequest(mutableRequest); + + var pipelineResult = await pipeline.GetFinalPromiseAsync( + result.Unwrap(),router, pipeline, outerRequest,cancellationToken); + + var finalPromise = pipelineResult.Unwrap(); + + if (finalPromise.HasDependencies) + { + var dependencyResult = await finalPromise.RouteDependenciesAsync(router, cancellationToken); + if (dependencyResult.IsError) + { + return CodeResult.Err(dependencyResult.Error); + } + } + var blobResult = + await finalPromise.TryGetBlobAsync(mutableRequest, router, pipeline, cancellationToken); + if (blobResult.IsError) + { + return CodeResult.Err(blobResult.Error); + } + return CodeResult.Ok(blobResult.Unwrap()); + } + +} \ No newline at end of file diff --git a/tests/Imazen.HybridCache.Benchmark/Imazen.HybridCache.Benchmark.csproj b/tests/Imazen.HybridCache.Benchmark/Imazen.HybridCache.Benchmark.csproj index e2b879e..c4f7f0a 100644 --- a/tests/Imazen.HybridCache.Benchmark/Imazen.HybridCache.Benchmark.csproj +++ b/tests/Imazen.HybridCache.Benchmark/Imazen.HybridCache.Benchmark.csproj @@ -15,4 +15,8 @@ + + + + diff --git a/tests/Imazen.HybridCache.Benchmark/Program.cs b/tests/Imazen.HybridCache.Benchmark/Program.cs index 28cf676..26fdd29 100644 --- a/tests/Imazen.HybridCache.Benchmark/Program.cs +++ b/tests/Imazen.HybridCache.Benchmark/Program.cs @@ -3,10 +3,16 @@ using System.Diagnostics; using System.IO; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using Imazen.Abstractions.Blobs; +using Imazen.Abstractions.Logging; +using Imazen.Abstractions.Resulting; using Imazen.Common.Extensibility.StreamCache; using Imazen.HybridCache.MetaStore; +using Imazen.Routing.Promises.Pipelines; +using Imazen.Routing.Requests; using MELT; using Microsoft.Extensions.Logging; using Microsoft.IO; @@ -15,6 +21,7 @@ namespace Imazen.HybridCache.Benchmark { class Program { + private const string UniqueName = "HybridCache"; static async Task Main(string[] args) { var cts = new CancellationTokenSource(); @@ -41,30 +48,38 @@ static async Task Main(string[] args) } - - private static async Task TestAsyncMediumLimitedCacheWavesMetaStore(CancellationToken cancellationToken) + private static HybridCacheAdvancedOptions CreateHybridCacheAdvancedOptions( + [CallerMemberName] + string uniqueName = "HybridCache", + int subfolders = 1, + long maxCacheBytes = 0, + bool moveFilesIntoPlace = false, + TimeSpan minAgeToDelete = default, + long minCleanupBytes = 0) { - var options = new TestParams() + return new HybridCacheAdvancedOptions(uniqueName, "") { - CacheOptions = new HybridCacheAdvancedOptions(null) + Subfolders = subfolders, + AsyncCacheOptions = new AsyncCacheOptions() { - Subfolders = 2048, - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 100 * 100 * 1000, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - MoveFileOverwriteFunc = (from, to) => File.Move(from,to,true), - MoveFilesIntoPlace = false - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = 409600000, // 1/2th the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - MinCleanupBytes = 0, - }, + UniqueName = uniqueName, + MoveFilesIntoPlace = moveFilesIntoPlace }, - MetaStoreOptions = new MetaStoreOptions(null) + CleanupManagerOptions = new CleanupManagerOptions() + { + MaxCacheBytes = maxCacheBytes, + MinAgeToDelete = minAgeToDelete, + MinCleanupBytes = minCleanupBytes, + }, + }; + } + + private static async Task TestAsyncMediumLimitedCacheWavesMetaStore(CancellationToken cancellationToken) + { + var options = new TestParams() + { + CacheOptions = CreateHybridCacheAdvancedOptions(UniqueName, 2048, 409600000, moveFilesIntoPlace: false), + MetaStoreOptions = new MetaStoreOptions("") { Shards = 32 }, @@ -91,24 +106,8 @@ private static async Task TestSyncVeryLimitedCacheWavesMetaStore(CancellationTok { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions("HybridCache",null) - { - Subfolders = 1, - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 0, //100 * 100 * 1000, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - MoveFileOverwriteFunc = (from, to) => File.Move(from,to,true) - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = 8192000, // 1/50th the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - MinCleanupBytes = 0, - }, - }, - MetaStoreOptions = new MetaStoreOptions(null) + CacheOptions = CreateHybridCacheAdvancedOptions("HybridCache", 1, 8192000), + MetaStoreOptions = new MetaStoreOptions("") { Shards = 1 }, @@ -133,24 +132,9 @@ private static async Task TestMassiveFileQuantityMetaStore(CancellationToken can { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions(null) - { - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 0, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = (long)4096 * 2 * 1000 * 1000, // 1/2th the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - MinCleanupBytes = 0, //1 * 1000 * 1000, - - } - }, FileSize = 0, FileCount = 6000000, + CacheOptions = CreateHybridCacheAdvancedOptions(maxCacheBytes: (long)4096 * 2 * 1000 * 1000), RequestCountPerWave = 20000, RequestWaves = 1, UseMetaStore = true, @@ -169,22 +153,7 @@ private static async Task TestMassiveFileQuantity(CancellationToken cancellation { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions(null) - { - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 0, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = (long)4096 * 2 * 1000 * 1000, // 1/2th the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - MinCleanupBytes = 0, //1 * 1000 * 1000, - - } - }, + CacheOptions = CreateHybridCacheAdvancedOptions(maxCacheBytes: (long)4096 * 2 * 1000 * 1000), FileSize = 64, FileCount = 60000, RequestCountPerWave = 2000, @@ -202,23 +171,7 @@ private static async Task TestSyncVeryLimitedCacheWaves(CancellationToken cancel { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions(null) - { - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 0, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = 8192000, // 1/5th the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - MinCleanupBytes = 0, - - }, - Subfolders = 1 - }, + CacheOptions = CreateHybridCacheAdvancedOptions(maxCacheBytes: 8192000), FileSize = 81920, FileCount = 500, RequestCountPerWave = 1000, @@ -236,23 +189,8 @@ private static async Task TestRandomAsyncCache(CancellationToken cancellationTok { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions(null) - { - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 5 * 1000 * 1000, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = 4088000, // 1/2th the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - MinCleanupBytes = 0, - - }, - Subfolders = 1 - }, + CacheOptions = CreateHybridCacheAdvancedOptions(maxCacheBytes: 4088000), + MaxQueuedBytes = 5 * 1000 * 1000, FileSize = 81920, FileCount = 100, RequestCountPerWave = 500, @@ -270,23 +208,8 @@ private static async Task TestRandomAsyncVeryLimitedCache(CancellationToken canc { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions(null) - { - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 100 * 1000 * 1000, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = 8192000, // 1/5th the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - MinCleanupBytes = 0, - - }, - Subfolders = 1 - }, + CacheOptions = CreateHybridCacheAdvancedOptions(maxCacheBytes: 8192000), + MaxQueuedBytes = 100 * 1000 * 1000, FileSize = 81920, FileCount = 500, RequestCountPerWave = 2000, @@ -303,22 +226,7 @@ private static async Task TestRandomSynchronousVeryLimitedCache(CancellationToke { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions("HybridCache", null) - { - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 0, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = 8192000, // 1/5th the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - - }, - Subfolders = 1 - }, + CacheOptions = CreateHybridCacheAdvancedOptions(maxCacheBytes: 8192000), FileSize = 81920, FileCount = 500, RequestCountPerWave = 2000, @@ -335,21 +243,7 @@ private static async Task TestRandomSynchronousLimitedCache(CancellationToken ca { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions(null) - { - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 0, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = 20000000, // Half the size of the files we are trying to write - MinAgeToDelete = TimeSpan.Zero, - - } - }, + CacheOptions = CreateHybridCacheAdvancedOptions(maxCacheBytes: 20000000), FileCount = 500, FileSize = 81920, RequestCountPerWave = 1000, @@ -365,19 +259,8 @@ private static async Task TestRandomSynchronousNoEviction(bool getContentType, C { var options = new TestParams() { - CacheOptions = new HybridCacheAdvancedOptions(null) - { - AsyncCacheOptions = new AsyncCacheOptions() - { - MaxQueuedBytes = 0, - FailRequestsOnEnqueueLockTimeout = true, - WriteSynchronouslyWhenQueueFull = true, - }, - CleanupManagerOptions = new CleanupManagerOptions() - { - MaxCacheBytes = 100000000, - } - }, + CacheOptions = CreateHybridCacheAdvancedOptions(maxCacheBytes: 100000000), + RetrieveContentType = getContentType, FileCount = 500, FileSize = 81920, @@ -391,6 +274,9 @@ private static async Task TestRandomSynchronousNoEviction(bool getContentType, C private class TestParams { + internal int MaxQueuedBytes { get; set; } = 0; + internal bool WriteSynchronouslyWhenQueueFull { get; set; } = true; + internal bool FailRequestsOnEnqueueLockTimeout { get; set;} = true; internal int FileSize { get; set; } = 81920; internal int FileCount { get; set; } = 1000; @@ -404,7 +290,7 @@ private class TestParams internal bool RetrieveContentType { get; set; } internal bool DisplayLog { get; set; } - internal HybridCacheAdvancedOptions CacheOptions { get; set; } = new HybridCacheAdvancedOptions(null); + internal HybridCacheAdvancedOptions CacheOptions { get; set; } = new HybridCacheAdvancedOptions("",""); internal bool UseMetaStore { get; set; } public int Seed { get; set; } @@ -414,6 +300,11 @@ private class TestParams public int MaxLogEntries { get; set; } = 50; public bool Synchronous { get; set; } } + private static IReLogger CreateReLogger(ITestLoggerFactory loggerFactory) + { + + return new ReLoggerFactory(loggerFactory, new ReLogStore(new ReLogStoreOptions())).CreateReLogger("HybridCache"); + } private static async Task TestRandom(TestParams options, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) @@ -431,13 +322,10 @@ private static async Task TestRandom(TestParams options, CancellationToken cance for (var reboot = 0; reboot < options.RebootCount; reboot++) { Console.WriteLine($"------------- Cache Reboot {reboot} ---------------"); - var loggerFactory = TestLoggerFactory.Create(); - - var logger = loggerFactory.CreateLogger(); - + var logger = CreateReLogger(loggerFactory); ICacheDatabase database = new MetaStore.MetaStore(options.MetaStoreOptions, options.CacheOptions, logger); - HybridCache cache = new HybridCache(options.CacheOptions,database, logger); + HybridCache cache = new HybridCache(database, options.CacheOptions,logger); try { Console.Write("Starting cache..."); @@ -446,7 +334,7 @@ private static async Task TestRandom(TestParams options, CancellationToken cance swStart.Stop(); Console.Write($"ready in {swStart.Elapsed}\r\n"); - await TestRandomInner(cache, options, loggerFactory, cancellationToken); + await TestRandomInner(cache, options, logger, loggerFactory, cancellationToken); if (options.DisplayLog) { @@ -493,30 +381,48 @@ private static async Task TestRandom(TestParams options, CancellationToken cance } } - private static async Task TestRandomInner(HybridCache cache, TestParams options, ITestLoggerFactory loggerFactory, CancellationToken cancellationToken) + private static async Task TestRandomInner(HybridCache cache, TestParams options, IReLogger logger, ITestLoggerFactory loggerFactory, CancellationToken cancellationToken) { - var data = new byte[options.FileSize]; - var dataSegment = new ArraySegment(data); - var contentType = "application/octet-stream"; + var blobAttrs = new BlobAttributes() + { + ContentType = "application/octet-stream", + EstimatedBlobByteCount = options.FileSize + }; + var blob = new MemoryBlob(data,blobAttrs, options.CreationTaskDelay,backingAllocationSize: options.FileSize); + + var latencyZone = new LatencyTrackingZone("TestBlobProvider", (int)(options.CreationTaskDelay + options.CreationThreadSleep).TotalMilliseconds, true); + var blobWrapper = new BlobWrapper(latencyZone, blob); - async Task DataProvider(CancellationToken token) + async ValueTask> BlobProvider(IRequestSnapshot snapshot, CancellationToken token) { if (options.CreationTaskDelay.Ticks > 0) { await Task.Delay(options.CreationTaskDelay, cancellationToken); } + if (options.CreationThreadSleep.Ticks > 0) { Thread.Sleep(options.CreationThreadSleep); } - return new StreamCacheInput(contentType, dataSegment).ToIStreamCacheInput(); + + return CodeResult.Ok(blobWrapper.ForkReference()); } + // TODO: TestSingleCacheSync is not correct at all + var harnessOptions = BlobCachingTestHarnessOptions.TestSingleCacheSync(cache, + BlobProvider, logger); + + var harness = new BlobCachingTestHarness(harnessOptions); + + + await harness.StartAsync(cancellationToken); + logger.LogInformation("Test harness started"); + + var random = new Random(options.Seed); - var tasks = new List>>(); - - + var tasks = new List>>(); + var swTotal = Stopwatch.StartNew(); for (var wave = 0; wave < options.RequestWaves; wave++) { @@ -527,27 +433,46 @@ async Task DataProvider(CancellationToken token) Console.Write("Wave {0}, {1} requests...", wave + 1, options.RequestCountPerWave); var sw = Stopwatch.StartNew(); var memoryStreamManager = - new RecyclableMemoryStreamManager(Math.Max(2, options.FileSize), 2, options.FileSize * 2 + 2); + new RecyclableMemoryStreamManager(new RecyclableMemoryStreamManager.Options(){ + BlockSize = Math.Max(2, options.FileSize), + LargeBufferMultiple = 2, + MaximumBufferSize =options.FileSize * 2 + 2 + + }); for (var ix = 0; ix < options.RequestCountPerWave; ix++) { - Func>> task = async () => + Func>> task = async () => { var whichFile = random.Next(options.FileCount); var key = BitConverter.GetBytes(whichFile); + var generatedFileName = "/" + Imazen.Common.Helpers.EncodingUtils.ToBase64U(key); var itemSw = Stopwatch.StartNew(); - var cacheResult = await cache.GetOrCreateBytes(key, DataProvider, cancellationToken, - options.RetrieveContentType); - if (cacheResult.Data != null) + + var cacheResult = await harness.RequestBlobWrapper(generatedFileName, cancellationToken: cancellationToken); + if (cacheResult.TryUnwrapError(out var err)) + { + itemSw.Stop(); + logger.LogError("Error {0} fetching {1}", err, generatedFileName); + itemSw.Stop(); + return new Tuple(itemSw.Elapsed, err); + } + else { - await using (cacheResult.Data) + using var blobWrapper = cacheResult.Unwrap(); + if (options.RetrieveContentType) { - await using var ms = memoryStreamManager.GetStream(); - await cacheResult.Data.CopyToAsync(ms, cancellationToken); + var _ = blobWrapper.Attributes.ContentType; } - } + + using var ms = memoryStreamManager.GetStream(); + using var consumable = await blobWrapper.GetConsumablePromise().IntoConsumableBlob(); + using var stream = consumable.BorrowStream(DisposalPromise.CallerDisposesStreamThenBlob); + await stream.CopyToAsync(ms, 81920, cancellationToken); + itemSw.Stop(); + return new Tuple(itemSw.Elapsed, HttpStatus.Ok); - itemSw.Stop(); - return new Tuple(itemSw.Elapsed, cacheResult.Status); + } + }; if (options.Synchronous) { @@ -556,14 +481,17 @@ async Task DataProvider(CancellationToken token) } else { - tasks.Add(Task.Run(task, cancellationToken)); + tasks.Add(Task.Run(task,cancellationToken)); } } - await Task.WhenAll(tasks); + if (!options.Synchronous) + { + await Task.WhenAll(tasks); + } sw.Stop(); var swAsync = Stopwatch.StartNew(); - await cache.AwaitEnqueuedTasks(); + await harness.AwaitEnqueuedTasks(); swAsync.Stop(); Console.WriteLine("completed in {0}, plus {1} for async tasks. ", sw.Elapsed, swAsync.Elapsed); PrintDiskUtilization(options); @@ -575,8 +503,8 @@ async Task DataProvider(CancellationToken token) Console.WriteLine(); // Accumulate results - var resultCounts = new Dictionary(); - var resultTimes = new Dictionary>(); + var resultCounts = new Dictionary(); + var resultTimes = new Dictionary>(); foreach (var t in tasks) { var key = t.Result.Item2;