Skip to content

Commit

Permalink
Merge pull request #887 from Project-MONAI/AI-226
Browse files Browse the repository at this point in the history
Ai 226
  • Loading branch information
neildsouth authored Oct 11, 2023
2 parents f8e1f11 + a664dbf commit 94576ce
Show file tree
Hide file tree
Showing 18 changed files with 292 additions and 67 deletions.
48 changes: 43 additions & 5 deletions src/TaskManager/Plug-ins/Argo/ArgoClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
using System.Text;
using Argo;
using Ardalis.GuardClauses;

using Microsoft.Extensions.Logging;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Logging;
using System.Net;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
{
public class ArgoClient : BaseArgoClient, IArgoClient
{
public ArgoClient(HttpClient httpClient) : base(httpClient) { }
public ArgoClient(HttpClient httpClient, ILoggerFactory logger) : base(httpClient, logger) { }

public async Task<Workflow> Argo_CreateWorkflowAsync(string argoNamespace, WorkflowCreateRequest body, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -77,7 +80,23 @@ public async Task<Workflow> Argo_StopWorkflowAsync(string argoNamespace, string

const string method = "PUT";
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(body));
return await SendRequest<Workflow>(content, urlBuilder, method, new CancellationToken()).ConfigureAwait(false);
try
{
return await SendRequest<Workflow>(content, urlBuilder, method, new CancellationToken()).ConfigureAwait(false);
}
catch (ApiException<Error> ex)
{
if (ex.StatusCode == (int)HttpStatusCode.NotFound)
{
throw new ArgoWorkflowNotFoundException(body.Name, ex);
}
throw;
}
catch (Exception)
{
throw;
}


}

Expand All @@ -92,7 +111,22 @@ public async Task<Workflow> Argo_TerminateWorkflowAsync(string argoNamespace, st

const string method = "PUT";
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(body));
return await SendRequest<Workflow>(content, urlBuilder, method, new CancellationToken()).ConfigureAwait(false);
try
{
return await SendRequest<Workflow>(content, urlBuilder, method, new CancellationToken()).ConfigureAwait(false);
}
catch (ApiException<Error> ex)
{
if (ex.StatusCode == (int)HttpStatusCode.NotFound)
{
throw new ArgoWorkflowNotFoundException(body.Name, ex);
}
throw;
}
catch (Exception)
{
throw;
}
}

public async Task<WorkflowTemplate?> Argo_GetWorkflowTemplateAsync(string argoNamespace, string name, string? getOptionsResourceVersion)
Expand Down Expand Up @@ -231,9 +265,11 @@ public class BaseArgoClient

protected readonly HttpClient HttpClient;

public BaseArgoClient(HttpClient httpClient)
protected readonly ILogger Logger;
public BaseArgoClient(HttpClient httpClient, ILoggerFactory loggerFactory)
{
HttpClient = httpClient;
Logger = loggerFactory.CreateLogger("BaseArgoClient");
}

protected async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder urlBuilder, string method, CancellationToken cancellationToken)
Expand All @@ -250,6 +286,8 @@ protected async Task<T> SendRequest<T>(StringContent stringContent, StringBuilde
request.RequestUri = new Uri(urlBuilder.ToString(), UriKind.RelativeOrAbsolute);

HttpResponseMessage? response = null;
var logStringContent = stringContent == null ? string.Empty : await stringContent.ReadAsStringAsync();
Logger.CallingArgoHttpInfo(request.RequestUri.ToString(), method, logStringContent);
response = await HttpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken).ConfigureAwait(false);

try
Expand Down
29 changes: 20 additions & 9 deletions src/TaskManager/Plug-ins/Argo/ArgoPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using Monai.Deploy.WorkflowManager.TaskManager.API.Models;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Logging;
using Newtonsoft.Json;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions;

[assembly: PlugIn()]
namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
Expand Down Expand Up @@ -902,18 +903,28 @@ private async ValueTask DisposeAsyncCore()
public override async Task HandleTimeout(string identity)
{
var client = _argoProvider.CreateClient(_baseUrl, _apiToken, _allowInsecure);

await client.Argo_StopWorkflowAsync(_namespace, identity, new WorkflowStopRequest
try
{
Namespace = _namespace,
Name = identity,
});
await client.Argo_StopWorkflowAsync(_namespace, identity, new WorkflowStopRequest
{
Namespace = _namespace,
Name = identity,
});

await client.Argo_TerminateWorkflowAsync(_namespace, identity, new WorkflowTerminateRequest
await client.Argo_TerminateWorkflowAsync(_namespace, identity, new WorkflowTerminateRequest
{
Name = identity,
Namespace = _namespace
});
}
catch (ArgoWorkflowNotFoundException ex)
{
Name = identity,
Namespace = _namespace
});
_logger.ExecptionStoppingArgoWorkflow(identity, ex);
}
catch (Exception)
{
throw;
}
}

public async Task<WorkflowTemplate> CreateArgoTemplate(string template)
Expand Down
7 changes: 4 additions & 3 deletions src/TaskManager/Plug-ins/Argo/ArgoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ public class ArgoProvider : IArgoProvider
{
private readonly ILogger<ArgoProvider> _logger;
private readonly IHttpClientFactory _httpClientFactory;

public ArgoProvider(ILogger<ArgoProvider> logger, IHttpClientFactory httpClientFactory)
private readonly ILoggerFactory _logFactory;
public ArgoProvider(ILogger<ArgoProvider> logger, IHttpClientFactory httpClientFactory, ILoggerFactory logFactory)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
_logFactory = logFactory;
}

public IArgoClient CreateClient(string baseUrl, string? apiToken, bool allowInsecure = true)
Expand All @@ -50,7 +51,7 @@ public IArgoClient CreateClient(string baseUrl, string? apiToken, bool allowInse
{
httpClient.SetBearerToken(apiToken);
}
return new ArgoClient(httpClient) { BaseUrl = baseUrl };
return new ArgoClient(httpClient, _logFactory) { BaseUrl = baseUrl };
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Runtime.Serialization;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions
{
[Serializable]
public class ArgoWorkflowNotFoundException : Exception
{
public ArgoWorkflowNotFoundException(string argoWorkflowName)
: base($"Argo workflow '{argoWorkflowName}' not found.")
{
}

public ArgoWorkflowNotFoundException(string? message, Exception? innerException) : base(message, innerException)
{
}

protected ArgoWorkflowNotFoundException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}

public ArgoWorkflowNotFoundException()
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

using System.Runtime.Serialization;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
namespace Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions
{
[Serializable]
public class ArtifactMappingNotFoundException : Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

using System.Runtime.Serialization;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
namespace Monai.Deploy.WorkflowManager.TaskManager.Argo.Exceptions
{
[Serializable]
public class TemplateNotFoundException : Exception
Expand Down
6 changes: 6 additions & 0 deletions src/TaskManager/Plug-ins/Argo/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,11 @@ public static partial class Log
[LoggerMessage(EventId = 1019, Level = LogLevel.Error, Message = "Error deleting Template in Argo.")]
public static partial void ErrorDeletingWorkflowTemplate(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 1020, Level = LogLevel.Trace, Message = "Calling argo at url {url} : {method} : {stringContent}")]
public static partial void CallingArgoHttpInfo(this ILogger logger, string url, string method, string stringContent);

[LoggerMessage(EventId = 1021, Level = LogLevel.Debug, Message = "Exception stopping argo workflow {workflowId}, does it exist?")]
public static partial void ExecptionStoppingArgoWorkflow(this ILogger logger, string workflowId, Exception ex);

}
}
5 changes: 4 additions & 1 deletion src/TaskManager/TaskManager/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static partial class Log
[LoggerMessage(EventId = 109, Level = LogLevel.Warning, Message = "Unable to query for job status, no activate executor associated with execution ID={executionId}.")]
public static partial void NoActiveExecutorWithTheId(this ILogger logger, string executionId);

[LoggerMessage(EventId = 110, Level = LogLevel.Error, Message = "Unsupported type of task runner: '{assemblyName}'.")]
[LoggerMessage(EventId = 110, Level = LogLevel.Error, Message = "Exception initialising task runner: '{assemblyName}'.")]
public static partial void UnsupportedRunner(this ILogger logger, string assemblyName, Exception ex);

[LoggerMessage(EventId = 111, Level = LogLevel.Debug, Message = "Sending acknowledgment message for {eventType}.")]
Expand Down Expand Up @@ -122,5 +122,8 @@ public static partial class Log

[LoggerMessage(EventId = 120, Level = LogLevel.Error, Message = "Recovering connection to storage service: {reason}.")]
public static partial void MessagingServiceErrorRecover(this ILogger logger, string reason);

[LoggerMessage(EventId = 121, Level = LogLevel.Error, Message = "Exception handling task : '{assemblyName}' timeout.")]
public static partial void ExectionTimingOutTask(this ILogger logger, string assemblyName, Exception ex);
}
}
21 changes: 17 additions & 4 deletions src/TaskManager/TaskManager/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ private async Task TaskDispatchEventReceivedCallback(MessageReceivedEventArgs ar

private async Task TaskCancelationEventCallback(MessageReceivedEventArgs args)
{
// Cancelation just stops running tasks and does Not set any status
await TaskCallBackGeneric<TaskCancellationEvent>(args, HandleCancellationTask);
}

Expand Down Expand Up @@ -240,6 +241,7 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
}

var pluginAssembly = string.Empty;
ITaskPlugin? taskRunner = null;
try
{
var taskExecution = await _taskDispatchEventService.GetByTaskExecutionIdAsync(message.Body.ExecutionId).ConfigureAwait(false);
Expand All @@ -250,17 +252,28 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
throw new InvalidOperationException("Task Event data not found.");
}

var taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
await taskRunner.HandleTimeout(message.Body.Identity);

AcknowledgeMessage(message);
taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
}
catch (Exception ex)
{
_logger.UnsupportedRunner(pluginAssembly, ex);
await HandleMessageException(message, message.Body.WorkflowInstanceId, message.Body.TaskId, message.Body.ExecutionId, false).ConfigureAwait(false);
return;
}

try
{
await taskRunner.HandleTimeout(message.Body.Identity);
}
catch (Exception ex)
{
// Ignoring exception here as we've asked for the task to be stopped.
_logger.ExectionTimingOutTask(pluginAssembly, ex);
}
finally
{
AcknowledgeMessage(message);
}
}

private async Task HandleTaskCallback(JsonMessage<TaskCallbackEvent> message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private static async Task EnsureIndex(IMongoCollection<WorkflowInstance> workflo
Name = "TasksIndex"
};
var model = new CreateIndexModel<WorkflowInstance>(
Builders<WorkflowInstance>.IndexKeys.Ascending(s => s.Tasks),
Builders<WorkflowInstance>.IndexKeys.Ascending($"{nameof(WorkflowInstance.Tasks)}.{nameof(Task.Status)}"),
options
);

Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowManager/Logging/Log.200000.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static partial class Log
[LoggerMessage(EventId = 200012, Level = LogLevel.Error, Message = "The following task: {taskId} in workflow {workflowInstanceId} is currently timed out and not processing anymore updates, timed out at {timedOut}.")]
public static partial void TaskTimedOut(this ILogger logger, string taskId, string workflowInstanceId, DateTime timedOut);

[LoggerMessage(EventId = 200013, Level = LogLevel.Critical, Message = "Workflow `{workflowId}` not found.")]
[LoggerMessage(EventId = 200013, Level = LogLevel.Critical, Message = "Workflow `{workflowId}` not found or is deleted.")]
public static partial void WorkflowNotFound(this ILogger logger, string workflowId);

[LoggerMessage(EventId = 200014, Level = LogLevel.Error, Message = "The task execution status for task {taskId} cannot be updated from {oldStatus} to {newStatus}. Payload: {payloadId}")]
Expand Down
13 changes: 0 additions & 13 deletions src/WorkflowManager/MonaiBackgroundService/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,6 @@ private async Task PublishCancellationEvent(TaskExecution task, string correlati
{
_logger.TimingOutTaskCancellationEvent(identity, task.WorkflowInstanceId);

var updateEvent = EventMapper.GenerateTaskUpdateEvent(new GenerateTaskUpdateEventParams
{
CorrelationId = correlationId,
ExecutionId = task.ExecutionId,
WorkflowInstanceId = workflowInstanceId,
TaskId = task.TaskId,
TaskExecutionStatus = TaskExecutionStatus.Failed,
FailureReason = FailureReason.TimedOut,
Stats = task.ExecutionStats
});

updateEvent.Validate();

var cancellationEvent = EventMapper.GenerateTaskCancellationEvent(
identity,
task.ExecutionId,
Expand Down
Loading

0 comments on commit 94576ce

Please sign in to comment.