Skip to content

Commit

Permalink
Adds proper logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Kali-Sh committed May 14, 2024
1 parent 326b811 commit 32fa309
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class CassandraProjectionStore : IProjectionStore
private readonly VersionedProjectionsNaming naming;
private readonly ILogger<CassandraProjectionStore> logger;

public static EventId CronusProjectionEventLoadError = new EventId(74300, "CronusProjectionEventLoadError");
private static readonly Action<ILogger, string, string, Exception> LogError = LoggerMessage.Define<string, string>(LogLevel.Error, CronusProjectionEventLoadError, "Failed to load event data. Handler -> {cronus_projection_type} Projection id -> {cronus_projection_id}");

private Task<ISession> GetSessionAsync() => cassandraProvider.GetSessionAsync(); // In order to keep only 1 session alive (https://docs.datastax.com/en/developer/csharp-driver/3.16/faq/)

public CassandraProjectionStore(ICassandraProvider cassandraProvider, ISerializer serializer, VersionedProjectionsNaming naming, ILogger<CassandraProjectionStore> logger)
Expand Down Expand Up @@ -73,7 +76,7 @@ public async IAsyncEnumerable<ProjectionCommit> LoadAsync(ProjectionVersion vers
}
else
{
logger.Error(() => $"Failed to load event `data`");
LogError(logger, version.ProjectionName, Convert.ToBase64String(projectionId.RawId), null);
}
}
}
Expand Down Expand Up @@ -135,15 +138,15 @@ async Task EnumerateWithPagingAsync(ProjectionsOperator @operator, ProjectionQue
result = await EnumerateWithPagingInternalAsync(options).ConfigureAwait(false);

var stream = new ProjectionStream(options.Version, options.Id, result.Events);
await @operator.OnProjectionStreamLoadedAsync(stream);
await @operator.OnProjectionStreamLoadedAsync(stream).ConfigureAwait(false);
}
else if (@operator.OnProjectionStreamLoadedWithPagingAsync is not null)
{
result = await EnumerateWithPagingInternalAsync(options).ConfigureAwait(false);

var pagedStream = new ProjectionStream(options.Version, options.Id, result.Events);
var pagedOptions = new PagingOptions(options.PagingOptions.Take, result.NewPagingToken, options.PagingOptions.Order);
await @operator.OnProjectionStreamLoadedWithPagingAsync(pagedStream, pagedOptions);
await @operator.OnProjectionStreamLoadedWithPagingAsync(pagedStream, pagedOptions).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -175,7 +178,7 @@ async IAsyncEnumerable<IEvent> LoadAsOfDateInternalAsync(ProjectionQueryOptions
}
else
{
logger.Error(() => $"Failed to load event `data`");
LogError(logger, options.Version.ProjectionName, Convert.ToBase64String(options.Id.RawId), null);
}
}
pagingInfo = PagingInfo.From(rows);
Expand All @@ -187,15 +190,16 @@ async Task<PagingProjectionsResult> EnumerateWithPagingInternalAsync(ProjectionQ
PreparedStatement preparedStatement;
PagingProjectionsResult pagingResult = new PagingProjectionsResult();

ProjectionQueryOptions projectionQueryOptions = new ProjectionQueryOptions();
string columnFamily = naming.GetColumnFamily(options.Version);
ISession session = await GetSessionAsync().ConfigureAwait(false);
if (options.PagingOptions.Order.Equals(Order.Descending))
{
preparedStatement = await GetDescendingPreparedStatementAsync(columnFamily, session);
preparedStatement = await GetDescendingPreparedStatementAsync(columnFamily, session).ConfigureAwait(false);
}
else
{
preparedStatement = await GetPreparedStatementToGetProjectionAsync(columnFamily, session);
preparedStatement = await GetPreparedStatementToGetProjectionAsync(columnFamily, session).ConfigureAwait(false);
}

IStatement boundStatement = preparedStatement.Bind(options.Id.RawId).SetPageSize(options.BatchSize).SetAutoPage(false);
Expand All @@ -215,10 +219,10 @@ async Task<PagingProjectionsResult> EnumerateWithPagingInternalAsync(ProjectionQ
}
else
{
logger.Error(() => $"Failed to load event `data`");
LogError(logger, options.Version.ProjectionName, Convert.ToBase64String(options.Id.RawId), null);
}
}
pagingResult.NewPagingToken = result.PagingState; // fix me later
pagingResult.NewPagingToken = result.PagingState;
return pagingResult;
}

Expand Down

0 comments on commit 32fa309

Please sign in to comment.