Skip to content

Commit

Permalink
Fix #115 Concurrency issue when adding documents. (#116)
Browse files Browse the repository at this point in the history
* Fix #115 Concurrency issue when adding documents.

include date on lock

Tweaked logic

* Fixed cache counts in tests due to locking

* Address pr feedback
  • Loading branch information
niemyjski authored Jan 6, 2025
1 parent a4354cd commit 5ad1e75
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,3 @@ await repository.AddAsync(new GameReview
return services;
}
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
Expand Down Expand Up @@ -30,6 +31,7 @@ public class DailyIndex : VersionedIndex
private TimeSpan? _maxIndexAge;
protected readonly Func<object, DateTime> _getDocumentDateUtc;
protected readonly string[] _defaultIndexes;
private readonly ConcurrentDictionary<DateTime, object> _ensuredDates = new();

public DailyIndex(IElasticConfiguration configuration, string name, int version = 1, Func<object, DateTime> getDocumentDateUtc = null)
: base(configuration, name, version)
Expand All @@ -42,7 +44,7 @@ public DailyIndex(IElasticConfiguration configuration, string name, int version
HasMultipleIndexes = true;

if (_getDocumentDateUtc != null)
_getDocumentDateUtc = (document) =>
_getDocumentDateUtc = document =>
{
var date = getDocumentDateUtc(document);
return date != DateTime.MinValue ? date : DefaultDocumentDateFunc(document);
Expand Down Expand Up @@ -131,7 +133,6 @@ protected override DateTime GetIndexDate(string index)
return DateTime.MaxValue;
}

private readonly Dictionary<DateTime, object> _ensuredDates = new();
protected async Task EnsureDateIndexAsync(DateTime utcDate)
{
utcDate = utcDate.Date;
Expand Down Expand Up @@ -165,10 +166,10 @@ await CreateIndexAsync(index, descriptor =>
foreach (var a in Aliases.Where(a => ShouldCreateAlias(utcDate, a)))
aliasesDescriptor.Alias(a.Name);

_ensuredDates[utcDate] = null;
return ConfigureIndex(descriptor).Aliases(a => aliasesDescriptor);
}).AnyContext();

_ensuredDates[utcDate] = null;
await _aliasCache.SetAsync(unversionedIndexAlias, unversionedIndexAlias, expires).AnyContext();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;
using Foundatio.Repositories.Models;
using Foundatio.Utility;
using Microsoft.Extensions.Time.Testing;
using Xunit;
using Xunit.Abstractions;

namespace Foundatio.Repositories.Elasticsearch.Tests;

public sealed class DailyRepositoryTests : ElasticRepositoryTestBase
{
private readonly IFileAccessHistoryRepository _fileAccessHistoryRepository;

public DailyRepositoryTests(ITestOutputHelper output) : base(output)
{
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration.DailyFileAccessHistory);
}

public override async Task InitializeAsync()
{
await base.InitializeAsync();
await RemoveDataAsync();
}

[Fact]
public async Task AddAsyncWithCustomDateIndex()
{
var utcNow = new DateTime(2023, 1, 1, 0, 0, 0, DateTimeKind.Utc);
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { Path = "path1", AccessedDateUtc = utcNow }, o => o.ImmediateConsistency());
Assert.NotNull(history?.Id);

var result = await _fileAccessHistoryRepository.FindOneAsync(f => f.Id(history.Id));
Assert.Equal("file-access-history-daily-v1-2023.01.01", result.Data.GetString("index"));
}

[Fact]
public async Task AddAsyncWithCurrentDateViaDocumentsAdding()
{
_configuration.TimeProvider = new FakeTimeProvider(new DateTimeOffset(2023, 02, 1, 0, 0, 0, TimeSpan.Zero));

try
{
// NOTE: This has to be async handler as there is no way to remove a sync handler.
_fileAccessHistoryRepository.DocumentsAdding.AddHandler(OnDocumentsAdding);

var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { Path = "path2" }, o => o.ImmediateConsistency());
Assert.NotNull(history?.Id);

var result = await _fileAccessHistoryRepository.FindOneAsync(f => f.Id(history.Id));
Assert.Equal("file-access-history-daily-v1-2023.02.01", result.Data.GetString("index"));
}
finally
{
_fileAccessHistoryRepository.DocumentsAdding.RemoveHandler(OnDocumentsAdding);
}
}

private Task OnDocumentsAdding(object sender, DocumentsEventArgs<FileAccessHistory> arg)
{
foreach (var document in arg.Documents)
{
if (document.AccessedDateUtc == DateTime.MinValue || document.AccessedDateUtc > _configuration.TimeProvider.GetUtcNow().UtcDateTime)
document.AccessedDateUtc = _configuration.TimeProvider.GetUtcNow().UtcDateTime;
}

return Task.CompletedTask;
}

[Fact]
public async Task CanAddAsync()
{
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { AccessedDateUtc = DateTime.UtcNow });
Assert.NotNull(history?.Id);
}

[Fact]
public Task AddAsyncConcurrentUpdates()
{
return Parallel.ForEachAsync(Enumerable.Range(0, 50), async (i, _) =>
{
var history = await _fileAccessHistoryRepository.AddAsync(new FileAccessHistory { AccessedDateUtc = DateTime.UtcNow });
Assert.NotNull(history?.Id);
});
}
}
10 changes: 4 additions & 6 deletions tests/Foundatio.Repositories.Elasticsearch.Tests/IndexTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -120,10 +120,8 @@ public async Task GetByDateBasedIndexAsync()

await _configuration.DailyLogEvents.ConfigureAsync();


// TODO: Fix this once https://github.com/elastic/elasticsearch-net/issues/3829 is fixed in beta2
//var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
//Assert.Empty(indexes);
var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
Assert.Empty(indexes);

var alias = await _client.Indices.GetAliasAsync(_configuration.DailyLogEvents.Name);
_logger.LogRequest(alias);
Expand All @@ -142,7 +140,7 @@ public async Task GetByDateBasedIndexAsync()
Assert.True(alias.IsValid);
Assert.Equal(2, alias.Indices.Count);

var indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
indexes = await _client.GetIndicesPointingToAliasAsync(_configuration.DailyLogEvents.Name);
Assert.Equal(2, indexes.Count);

await repository.RemoveAllAsync(o => o.ImmediateConsistency());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public sealed class MonthlyRepositoryTests : ElasticRepositoryTestBase

public MonthlyRepositoryTests(ITestOutputHelper output) : base(output)
{
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration);
_fileAccessHistoryRepository = new FileAccessHistoryRepository(_configuration.MonthlyFileAccessHistory);
}

public override async Task InitializeAsync()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Foundatio.Repositories.Elasticsearch.Configuration;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;
using Nest;

namespace Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration.Indexes;

public sealed class DailyFileAccessHistoryIndex : DailyIndex<FileAccessHistory>
{
public DailyFileAccessHistoryIndex(IElasticConfiguration configuration) : base(configuration, "file-access-history-daily", 1, d => ((FileAccessHistory)d).AccessedDateUtc)
{
}

public override CreateIndexDescriptor ConfigureIndex(CreateIndexDescriptor idx)
{
return base.ConfigureIndex(idx.Settings(s => s.NumberOfReplicas(0).NumberOfShards(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public MyAppElasticConfiguration(IQueue<WorkItemData> workItemQueue, ICacheClien
AddIndex(DailyLogEvents = new DailyLogEventIndex(this));
AddIndex(MonthlyLogEvents = new MonthlyLogEventIndex(this));
AddIndex(ParentChild = new ParentChildIndex(this));
AddIndex(DailyFileAccessHistory = new DailyFileAccessHistoryIndex(this));
AddIndex(MonthlyFileAccessHistory = new MonthlyFileAccessHistoryIndex(this));
AddCustomFieldIndex(replicas: 0);
}
Expand Down Expand Up @@ -95,5 +96,6 @@ protected override void ConfigureSettings(ConnectionSettings settings)
public DailyLogEventIndex DailyLogEvents { get; }
public MonthlyLogEventIndex MonthlyLogEvents { get; }
public ParentChildIndex ParentChild { get; }
public DailyFileAccessHistoryIndex DailyFileAccessHistory { get; }
public MonthlyFileAccessHistoryIndex MonthlyFileAccessHistory { get; }
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Configuration;
using Foundatio.Repositories.Elasticsearch.Configuration;
using Foundatio.Repositories.Elasticsearch.Tests.Repositories.Models;

namespace Foundatio.Repositories.Elasticsearch.Tests.Repositories;
Expand All @@ -7,7 +7,11 @@ public interface IFileAccessHistoryRepository : ISearchableRepository<FileAccess

public class FileAccessHistoryRepository : ElasticRepositoryBase<FileAccessHistory>, IFileAccessHistoryRepository
{
public FileAccessHistoryRepository(MyAppElasticConfiguration elasticConfiguration) : base(elasticConfiguration.MonthlyFileAccessHistory)
public FileAccessHistoryRepository(DailyIndex<FileAccessHistory> dailyIndex) : base(dailyIndex)
{
}

public FileAccessHistoryRepository(MonthlyIndex<FileAccessHistory> monthlyIndex) : base(monthlyIndex)
{
}
}

0 comments on commit 5ad1e75

Please sign in to comment.