Skip to content

Commit

Permalink
Improve KM webservice OpenAPI details (#923)
Browse files Browse the repository at this point in the history
Add name, description and summary to KM web service OpenAPI manifest.
  • Loading branch information
dluc authored Dec 2, 2024
1 parent fd33dc4 commit dc607aa
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 52 deletions.
3 changes: 1 addition & 2 deletions examples/001-dotnet-WebClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ private static async Task AskSimpleQuestionStreamingTheAnswer()
Console.WriteLine($"Expected result: formula explanation using the information loaded");

Console.Write("\nAnswer: ");
var answerStream = s_memory.AskStreamingAsync(question, minRelevance: 0.6,
options: new SearchOptions { Stream = true });
var answerStream = s_memory.AskStreamingAsync(question, options: new SearchOptions { Stream = true });

await foreach (var answer in answerStream)
{
Expand Down
3 changes: 1 addition & 2 deletions examples/002-dotnet-Serverless/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ private static async Task AskSimpleQuestionStreamingTheAnswer()
Console.WriteLine($"Expected result: formula explanation using the information loaded");

Console.Write("\nAnswer: ");
var answerStream = s_memory.AskStreamingAsync(question, minRelevance: 0.6,
options: new SearchOptions { Stream = true });
var answerStream = s_memory.AskStreamingAsync(question, options: new SearchOptions { Stream = true });

await foreach (var answer in answerStream)
{
Expand Down
60 changes: 30 additions & 30 deletions service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,17 @@ public SimpleQueues(SimpleQueuesConfig config, ILoggerFactory? loggerFactory = n
switch (config.StorageType)
{
case FileSystemTypes.Disk:
this._log.LogTrace("Using {0} storage", nameof(DiskFileSystem));
this._log.LogTrace("Using {StorageType} storage", nameof(DiskFileSystem));
this._fileSystem = new DiskFileSystem(config.Directory, null, loggerFactory);
break;

case FileSystemTypes.Volatile:
this._log.LogTrace("Using {0} storage", nameof(VolatileFileSystem));
this._log.LogTrace("Using {StorageType} storage", nameof(VolatileFileSystem));
this._fileSystem = VolatileFileSystem.GetInstance(config.Directory, null, loggerFactory);
break;

default:
this._log.LogCritical("Unknown storage type {0}", config.StorageType);
this._log.LogCritical("Unknown storage type {StorageType}", config.StorageType);
throw new ArgumentException($"Unknown storage type {config.StorageType}");
}

Expand All @@ -124,15 +124,15 @@ public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions opt

if (!string.IsNullOrEmpty(this._queueName))
{
this._log.LogCritical("The client is already connected to queue {0}", this._queueName);
this._log.LogCritical("The client is already connected to queue {QueueName}", this._queueName);
throw new InvalidOperationException($"The queue is already connected to `{this._queueName}`");
}

this._queueName = queueName;
this._poisonQueueName = $"{queueName}{this._config.PoisonQueueSuffix}";
await this.CreateDirectoriesAsync(cancellationToken).ConfigureAwait(false);

this._log.LogTrace("Client connected to queue {0} and poison queue {1}", this._queueName, this._poisonQueueName);
this._log.LogTrace("Client connected to queue {QueueName} and poison queue {PoisonQueueName}", this._queueName, this._poisonQueueName);

if (options.DequeueEnabled)
{
Expand All @@ -144,11 +144,11 @@ public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions opt
this._dispatchTimer.Elapsed += this.DispatchMessage;
this._dispatchTimer.Start();

this._log.LogTrace("Queue {0}: polling and dispatching timers created", this._queueName);
this._log.LogTrace("Queue {QueueName}: polling and dispatching timers created", this._queueName);
}
else
{
this._log.LogTrace("Queue {0}: dequeue not enabled", this._queueName);
this._log.LogTrace("Queue {QueueName}: dequeue not enabled", this._queueName);
}

return this;
Expand All @@ -172,14 +172,14 @@ await this.StoreMessageAsync(
},
cancellationToken).ConfigureAwait(false);

this._log.LogInformation("Queue {0}: message {1} sent", this._queueName, messageId);
this._log.LogInformation("Queue {QueueName}: message {MessageId} sent", this._queueName, messageId);
}

/// <inheritdoc />
/// <see cref="DistributedPipelineOrchestrator.AddHandlerAsync"/> about the logic handling dequeued messages.
public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
{
this._log.LogInformation("Queue {0}: subscribing...", this._queueName);
this._log.LogInformation("Queue {QueueName}: subscribing...", this._queueName);
this.Received += async (sender, args) =>
{
Message message = new();
Expand All @@ -190,34 +190,34 @@ public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
ArgumentNullExceptionEx.ThrowIfNull(args.Message, nameof(args.Message), "The message received is NULL");
message = args.Message;

this._log.LogInformation("Queue {0}: message {0} received", this._queueName, message.Id);
this._log.LogInformation("Queue {QueueName}: message {MessageId} received", this._queueName, message.Id);

// Process message with the logic provided by the orchestrator
var returnType = await processMessageAction.Invoke(message.Content).ConfigureAwait(false);
switch (returnType)
{
case ReturnType.Success:
this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.Id);
this._log.LogTrace("Message '{MessageId}' successfully processed, deleting message", message.Id);
await this.DeleteMessageAsync(message.Id, this._cancellation.Token).ConfigureAwait(false);
break;

case ReturnType.TransientError:
message.LastError = "Message handler returned false";
if (message.DequeueCount == this._maxAttempts)
{
this._log.LogError("Message '{0}' processing failed to process, max attempts reached, moving to poison queue. Message content: {1}", message.Id, message.Content);
this._log.LogError("Message '{MessageId}' processing failed to process, max attempts reached, moving to poison queue. Message content: {MessageContent}", message.Id, message.Content);
poison = true;
}
else
{
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue. Message content: {1}", message.Id, message.Content);
this._log.LogWarning("Message '{MessageId}' failed to process, putting message back in the queue. Message content: {MessageContent}", message.Id, message.Content);
retry = true;
}

break;

case ReturnType.FatalError:
this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.Id);
this._log.LogError("Message '{MessageId}' failed to process due to a non-recoverable error, moving to poison queue", message.Id);
poison = true;
break;

Expand All @@ -228,7 +228,7 @@ public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
catch (KernelMemoryException e) when (e.IsTransient.HasValue && !e.IsTransient.Value)
{
message.LastError = $"{e.GetType().FullName} [{e.InnerException?.GetType().FullName}]: {e.Message}";
this._log.LogError(e, "Message '{0}' failed to process due to a non-recoverable error, moving to poison queue.", message.Id);
this._log.LogError(e, "Message '{MessageId}' failed to process due to a non-recoverable error, moving to poison queue.", message.Id);
poison = true;
}
// Note: must catch all also because using a void event handler
Expand All @@ -237,12 +237,12 @@ public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
message.LastError = $"{e.GetType().FullName}: {e.Message}";
if (message.DequeueCount == this._maxAttempts)
{
this._log.LogError(e, "Message '{0}' processing failed with exception, max attempts reached, moving to poison queue. Message content: {1}.", message.Id, message.Content);
this._log.LogError(e, "Message '{MessageId}' processing failed with exception, max attempts reached, moving to poison queue. Message content: {MessageContent}.", message.Id, message.Content);
poison = true;
}
else
{
this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue. Message content: {1}.", message.Id, message.Content);
this._log.LogWarning(e, "Message '{MessageId}' processing failed with exception, putting message back in the queue. Message content: {MessageContent}.", message.Id, message.Content);
retry = true;
}
}
Expand Down Expand Up @@ -280,15 +280,15 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)
var messagesOnStorage = (await this._fileSystem.GetAllFileNamesAsync(this._queueName, "", this._cancellation.Token).ConfigureAwait(false)).ToList();
if (messagesOnStorage.Count == 0) { return; }

this._log.LogTrace("Queue {0}: {1} messages on storage, {2} ready to dispatch, max batch size {3}",
this._log.LogTrace("Queue {QueueName}: {MsgCountOnStorage} messages on storage, {MsgCountReady} ready to dispatch, max batch size {FetchBatchSize}",
this._queueName, messagesOnStorage.Count, this._queue.Count, this._config.FetchBatchSize);

foreach (var fileName in messagesOnStorage)
{
// Limit the number of messages loaded in memory
if (this._queue.Count >= this._config.FetchBatchSize)
{
this._log.LogTrace("Queue {0}: max batch size {1} reached", this._queueName, this._config.FetchBatchSize);
this._log.LogTrace("Queue {QueueName}: max batch size {FetchBatchSize} reached", this._queueName, this._config.FetchBatchSize);
return;
}

Expand All @@ -309,22 +309,22 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)

// Add to list of messages to be processed
this._queue.Enqueue(message);
this._log.LogTrace("Queue {0}: found message {1}", this._queueName, messageId);
this._log.LogTrace("Queue {QueueName}: found message {MessageId}", this._queueName, messageId);
}

if (this._log.IsEnabled(LogLevel.Trace))
{
if (!message.IsTimeToRun())
{
this._log.LogTrace("Queue {0}: skipping message {1} scheduled in the future", this._queueName, messageId);
this._log.LogTrace("Queue {QueueName}: skipping message {MessageId} scheduled in the future", this._queueName, messageId);
}
else if (message.IsLocked())
{
this._log.LogTrace("Queue {0}: skipping message {1} because it is locked", this._queueName, messageId);
this._log.LogTrace("Queue {QueueName}: skipping message {MessageId} because it is locked", this._queueName, messageId);
}
else if (this._queue.Any(x => x.Id == messageId))
{
this._log.LogTrace("Queue {0}: skipping message {1} because it is already loaded", this._queueName, messageId);
this._log.LogTrace("Queue {QueueName}: skipping message {MessageId} because it is already loaded", this._queueName, messageId);
}
}
}
Expand All @@ -336,7 +336,7 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)
}
catch (Exception e)
{
this._log.LogError(e, "Queue {0}: Unexpected error while polling.", this._queueName);
this._log.LogError(e, "Queue {QueueName}: Unexpected error while polling.", this._queueName);
}
finally
{
Expand All @@ -360,7 +360,7 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)

await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false);

this._log.LogTrace("Dispatching {0} messages", this._queue.Count);
this._log.LogTrace("Dispatching {MessageCount} messages", this._queue.Count);

while (this._queue.TryDequeue(out Message? message))
{
Expand All @@ -369,7 +369,7 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)
}
catch (Exception ex)
{
this._log.LogError(ex, "Queue {0}: Unexpected error while dispatching", this._queueName);
this._log.LogError(ex, "Queue {QueueName}: Unexpected error while dispatching", this._queueName);
}
finally
{
Expand All @@ -384,25 +384,25 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)

private async Task<Message> ReadMessageAsync(string id, CancellationToken cancellationToken = default)
{
this._log.LogTrace("Queue {0}: reading message {1}", this._queueName, id);
this._log.LogTrace("Queue {QueueName}: reading message {MessageId}", this._queueName, id);
var serializedMsg = await this._fileSystem.ReadFileAsTextAsync(
volume: this._queueName, relPath: "", fileName: $"{id}{FileExt}", cancellationToken: cancellationToken).ConfigureAwait(false);
return Deserialize(serializedMsg);
}

private async Task StoreMessageAsync(string queueName, Message message, CancellationToken cancellationToken = default)
{
this._log.LogTrace("Queue {0}: storing message {1}", this._queueName, message.Id);
this._log.LogTrace("Queue {QueueName}: storing message {MessageId}", this._queueName, message.Id);
await this._fileSystem.WriteFileAsync(queueName, "", $"{message.Id}{FileExt}", Serialize(message), cancellationToken).ConfigureAwait(false);
}

private async Task DeleteMessageAsync(string id, CancellationToken cancellationToken = default)
{
try
{
this._log.LogTrace("Queue {0}: deleting message {1}", this._queueName, id);
this._log.LogTrace("Queue {QueueName}: deleting message {MessageId}", this._queueName, id);
var fileName = $"{id}{FileExt}";
this._log.LogTrace("Deleting file from storage {0}", fileName);
this._log.LogTrace("Deleting file from storage {FileName}", fileName);
await this._fileSystem.DeleteFileAsync(this._queueName, "", fileName, cancellationToken).ConfigureAwait(false);
}
catch (DirectoryNotFoundException)
Expand Down
Loading

0 comments on commit dc607aa

Please sign in to comment.