diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index 104ffc9f6..76ef114a9 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -236,6 +236,7 @@ def get_required_setting( "workflow_manager.file_execution", "workflow_manager.endpoint_v2", "workflow_manager.workflow_v2", + "workflow_manager.execution", "tool_instance_v2", "pipeline_v2", "platform_settings_v2", diff --git a/backend/backend/urls_v2.py b/backend/backend/urls_v2.py index 0a8bd0f95..222679092 100644 --- a/backend/backend/urls_v2.py +++ b/backend/backend/urls_v2.py @@ -59,4 +59,5 @@ include("prompt_studio.prompt_studio_index_manager_v2.urls"), ), path("tags/", include("tags.urls")), + path("execution/", include("workflow_manager.execution.urls")), ] diff --git a/backend/pipeline_v2/execution_view.py b/backend/pipeline_v2/execution_view.py index fa514615a..56e8e06e5 100644 --- a/backend/pipeline_v2/execution_view.py +++ b/backend/pipeline_v2/execution_view.py @@ -1,7 +1,7 @@ from permissions.permission import IsOwner -from pipeline_v2.serializers.execute import DateRangeSerializer from rest_framework import viewsets from rest_framework.versioning import URLPathVersioning +from utils.date import DateRangeKeys, DateRangeSerializer from utils.pagination import CustomPagination from workflow_manager.workflow_v2.models.execution import WorkflowExecution from workflow_manager.workflow_v2.serializers import WorkflowExecutionSerializer @@ -14,8 +14,6 @@ class PipelineExecutionViewSet(viewsets.ModelViewSet): pagination_class = CustomPagination CREATED_AT_FIELD_DESC = "-created_at" - START_DATE_FIELD = "start_date" - END_DATE_FIELD = "end_date" def get_queryset(self): # Get the pipeline_id from the URL path @@ -25,8 +23,8 @@ def get_queryset(self): # Validate start_date and end_date parameters using DateRangeSerializer date_range_serializer = DateRangeSerializer(data=self.request.query_params) date_range_serializer.is_valid(raise_exception=True) - start_date = date_range_serializer.validated_data.get(self.START_DATE_FIELD) - end_date = date_range_serializer.validated_data.get(self.END_DATE_FIELD) + start_date = date_range_serializer.validated_data.get(DateRangeKeys.START_DATE) + end_date = date_range_serializer.validated_data.get(DateRangeKeys.END_DATE) if start_date and end_date: queryset = queryset.filter(created_at__range=(start_date, end_date)) diff --git a/backend/pipeline_v2/serializers/execute.py b/backend/pipeline_v2/serializers/execute.py index 4e5a7fd58..0f0457a37 100644 --- a/backend/pipeline_v2/serializers/execute.py +++ b/backend/pipeline_v2/serializers/execute.py @@ -17,8 +17,3 @@ def validate_pipeline_id(self, value: str) -> str: except Pipeline.DoesNotExist: raise serializers.ValidationError("Invalid pipeline ID") return value - - -class DateRangeSerializer(serializers.Serializer): - start_date = serializers.DateTimeField(required=False) - end_date = serializers.DateTimeField(required=False) diff --git a/backend/usage_v2/utils.py b/backend/usage_v2/utils.py index 7bdd6ce6f..0ad0d3443 100644 --- a/backend/usage_v2/utils.py +++ b/backend/usage_v2/utils.py @@ -116,7 +116,6 @@ def filter_date_range(cls, value: str) -> Optional[DateRange]: if not preset: return None start_date, end_date = preset.get_date_range() - print(start_date, end_date) return cls._validate_date_range(start_date=start_date, end_date=end_date) @classmethod diff --git a/backend/utils/date/__init__.py b/backend/utils/date/__init__.py new file mode 100644 index 000000000..faf94e125 --- /dev/null +++ b/backend/utils/date/__init__.py @@ -0,0 +1,2 @@ +from .constants import DateRangeKeys # noqa: F401 +from .serializer import DateRangeSerializer # noqa: F401 diff --git a/backend/utils/date/constants.py b/backend/utils/date/constants.py new file mode 100644 index 000000000..1d5a3bcdf --- /dev/null +++ b/backend/utils/date/constants.py @@ -0,0 +1,3 @@ +class DateRangeKeys: + START_DATE = "start_date" + END_DATE = "end_date" diff --git a/backend/utils/date/serializer.py b/backend/utils/date/serializer.py new file mode 100644 index 000000000..054d700bb --- /dev/null +++ b/backend/utils/date/serializer.py @@ -0,0 +1,6 @@ +from rest_framework import serializers + + +class DateRangeSerializer(serializers.Serializer): + start_date = serializers.DateTimeField(required=False) + end_date = serializers.DateTimeField(required=False) diff --git a/backend/workflow_manager/execution/__init__.py b/backend/workflow_manager/execution/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/workflow_manager/execution/apps.py b/backend/workflow_manager/execution/apps.py new file mode 100644 index 000000000..eef172db9 --- /dev/null +++ b/backend/workflow_manager/execution/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class ExecutionConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "workflow_manager.execution" diff --git a/backend/workflow_manager/execution/enum.py b/backend/workflow_manager/execution/enum.py new file mode 100644 index 000000000..27900508c --- /dev/null +++ b/backend/workflow_manager/execution/enum.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class ExecutionEntity(Enum): + ETL = "ETL" + API = "API" + TASK = "TASK" + WORKFLOW = "WF" diff --git a/backend/workflow_manager/execution/serializer/__init__.py b/backend/workflow_manager/execution/serializer/__init__.py new file mode 100644 index 000000000..9b06d6d5b --- /dev/null +++ b/backend/workflow_manager/execution/serializer/__init__.py @@ -0,0 +1,2 @@ +from .execution import ExecutionSerializer # noqa: F401 +from .file_centric import FileCentricExecutionSerializer # noqa: F401 diff --git a/backend/workflow_manager/execution/serializer/execution.py b/backend/workflow_manager/execution/serializer/execution.py new file mode 100644 index 000000000..1dcf0758f --- /dev/null +++ b/backend/workflow_manager/execution/serializer/execution.py @@ -0,0 +1,22 @@ +from typing import Optional + +from rest_framework import serializers +from workflow_manager.workflow_v2.models import WorkflowExecution + + +# TODO: Optimize with select_related / prefetch_related to reduce DB queries +class ExecutionSerializer(serializers.ModelSerializer): + workflow_name = serializers.SerializerMethodField() + pipeline_name = serializers.SerializerMethodField() + + class Meta: + model = WorkflowExecution + exclude = ["task_id", "execution_log_id", "execution_type"] + + def get_workflow_name(self, obj: WorkflowExecution) -> Optional[str]: + """Fetch the workflow name using workflow_id""" + return obj.workflow_name + + def get_pipeline_name(self, obj: WorkflowExecution) -> Optional[str]: + """Fetch the pipeline or API deployment name""" + return obj.pipeline_name diff --git a/backend/workflow_manager/execution/serializer/file_centric.py b/backend/workflow_manager/execution/serializer/file_centric.py new file mode 100644 index 000000000..c55e554b1 --- /dev/null +++ b/backend/workflow_manager/execution/serializer/file_centric.py @@ -0,0 +1,22 @@ +from typing import Optional + +from rest_framework import serializers +from workflow_manager.file_execution.models import ( + WorkflowFileExecution as FileExecution, +) + + +class FileCentricExecutionSerializer(serializers.ModelSerializer): + latest_log = serializers.SerializerMethodField() + + class Meta: + model = FileExecution + exclude = ["file_hash"] + + def get_latest_log(self, obj: FileExecution) -> Optional[dict[str, any]]: + latest_log = ( + obj.execution_logs.exclude(data__log_level__in=["DEBUG", "WARN"]) + .order_by("-event_time") + .first() + ) + return latest_log.data if latest_log else None diff --git a/backend/workflow_manager/execution/urls.py b/backend/workflow_manager/execution/urls.py new file mode 100644 index 000000000..7445e4037 --- /dev/null +++ b/backend/workflow_manager/execution/urls.py @@ -0,0 +1,25 @@ +from django.urls import path +from rest_framework.urlpatterns import format_suffix_patterns +from workflow_manager.execution.views import ( + ExecutionViewSet, + FileCentricExecutionViewSet, +) +from workflow_manager.workflow_v2.execution_log_view import ( + WorkflowExecutionLogViewSet as ExecutionLogViewSet, +) + +execution_list = ExecutionViewSet.as_view( + { + "get": "list", + } +) +file_centric_list = FileCentricExecutionViewSet.as_view({"get": "list"}) +execution_log_list = ExecutionLogViewSet.as_view({"get": "list"}) + +urlpatterns = format_suffix_patterns( + [ + path("", execution_list, name="execution-list"), + path("/files/", file_centric_list, name="file-centric-execution-list"), + path("/logs/", execution_log_list, name="execution-log"), + ] +) diff --git a/backend/workflow_manager/execution/views/__init__.py b/backend/workflow_manager/execution/views/__init__.py new file mode 100644 index 000000000..7b694c9bf --- /dev/null +++ b/backend/workflow_manager/execution/views/__init__.py @@ -0,0 +1,2 @@ +from .execution import ExecutionViewSet # noqa: F401 +from .file_centric import FileCentricExecutionViewSet # noqa: F401 diff --git a/backend/workflow_manager/execution/views/execution.py b/backend/workflow_manager/execution/views/execution.py new file mode 100644 index 000000000..9ee97794a --- /dev/null +++ b/backend/workflow_manager/execution/views/execution.py @@ -0,0 +1,69 @@ +import logging +from typing import Optional + +from api_v2.models import APIDeployment +from django.db.models import Q, QuerySet +from pipeline_v2.models import Pipeline +from rest_framework import viewsets +from rest_framework.filters import OrderingFilter +from rest_framework.permissions import IsAuthenticated +from utils.date import DateRangeKeys, DateRangeSerializer +from utils.pagination import CustomPagination +from workflow_manager.execution.enum import ExecutionEntity +from workflow_manager.execution.serializer import ExecutionSerializer +from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution + +logger = logging.getLogger(__name__) + + +class ExecutionViewSet(viewsets.ReadOnlyModelViewSet): + permission_classes = [IsAuthenticated] + serializer_class = ExecutionSerializer + pagination_class = CustomPagination + filter_backends = [OrderingFilter] + ordering_fields = ["created_at"] + ordering = ["-created_at"] + + def get_queryset(self) -> Optional[QuerySet]: + execution_entity = self.request.query_params.get("execution_entity") + + queryset = WorkflowExecution.objects.all() + + # Filter based on execution entity + if execution_entity == ExecutionEntity.API: + queryset = queryset.filter( + pipeline_id__in=APIDeployment.objects.values_list("id", flat=True) + ) + elif execution_entity == ExecutionEntity.ETL: + queryset = queryset.filter( + pipeline_id__in=Pipeline.objects.filter( + pipeline_type=Pipeline.PipelineType.ETL + ).values_list("id", flat=True) + ) + elif execution_entity == ExecutionEntity.TASK: + queryset = queryset.filter( + pipeline_id__in=Pipeline.objects.filter( + pipeline_type=Pipeline.PipelineType.TASK + ).values_list("id", flat=True) + ) + elif execution_entity == ExecutionEntity.WORKFLOW: + queryset = queryset.filter( + pipeline_id=None, + workflow_id__in=Workflow.objects.values_list("id", flat=True), + ) + + # Parse and apply date filters + date_range_serializer = DateRangeSerializer(data=self.request.query_params) + date_range_serializer.is_valid(raise_exception=True) + + filters = Q() + if start_date := date_range_serializer.validated_data.get( + DateRangeKeys.START_DATE + ): + filters &= Q(created_at__gte=start_date) + if end_date := date_range_serializer.validated_data.get(DateRangeKeys.END_DATE): + filters &= Q(created_at__lte=end_date) + + queryset = queryset.filter(filters) + + return queryset diff --git a/backend/workflow_manager/execution/views/file_centric.py b/backend/workflow_manager/execution/views/file_centric.py new file mode 100644 index 000000000..d961ef069 --- /dev/null +++ b/backend/workflow_manager/execution/views/file_centric.py @@ -0,0 +1,25 @@ +import logging + +from rest_framework import viewsets +from rest_framework.filters import OrderingFilter +from rest_framework.permissions import IsAuthenticated +from utils.pagination import CustomPagination +from workflow_manager.execution.serializer import FileCentricExecutionSerializer +from workflow_manager.file_execution.models import ( + WorkflowFileExecution as FileExecution, +) + +logger = logging.getLogger(__name__) + + +class FileCentricExecutionViewSet(viewsets.ReadOnlyModelViewSet): + permission_classes = [IsAuthenticated] + serializer_class = FileCentricExecutionSerializer + pagination_class = CustomPagination + filter_backends = [OrderingFilter] + ordering_fields = ["created_at"] + ordering = ["created_at"] + + def get_queryset(self): + execution_id = self.kwargs.get("pk") + return FileExecution.objects.filter(workflow_execution_id=execution_id) diff --git a/backend/workflow_manager/workflow_v2/migrations/0006_workflowexecution_workflow_ex_workflo_5942c9_idx_and_more.py b/backend/workflow_manager/workflow_v2/migrations/0006_workflowexecution_workflow_ex_workflo_5942c9_idx_and_more.py new file mode 100644 index 000000000..637165519 --- /dev/null +++ b/backend/workflow_manager/workflow_v2/migrations/0006_workflowexecution_workflow_ex_workflo_5942c9_idx_and_more.py @@ -0,0 +1,27 @@ +# Generated by Django 4.2.1 on 2025-02-04 04:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("workflow_v2", "0005_workflowexecution_tags"), + ] + + operations = [ + migrations.AddIndex( + model_name="workflowexecution", + index=models.Index( + fields=["workflow_id", "-created_at"], + name="workflow_ex_workflo_5942c9_idx", + ), + ), + migrations.AddIndex( + model_name="workflowexecution", + index=models.Index( + fields=["pipeline_id", "-created_at"], + name="workflow_ex_pipelin_126dbf_idx", + ), + ), + ] diff --git a/backend/workflow_manager/workflow_v2/models/__init__.py b/backend/workflow_manager/workflow_v2/models/__init__.py index 938907762..817f43b21 100644 --- a/backend/workflow_manager/workflow_v2/models/__init__.py +++ b/backend/workflow_manager/workflow_v2/models/__init__.py @@ -1,4 +1,7 @@ -from .execution import WorkflowExecution # noqa: F401 +# isort:skip_file + +# Do not change the order of the imports below to avoid circular dependency issues +from .workflow import Workflow # noqa: F401 from .execution_log import ExecutionLog # noqa: F401 +from .execution import WorkflowExecution # noqa: F401 from .file_history import FileHistory # noqa: F401 -from .workflow import Workflow # noqa: F401 diff --git a/backend/workflow_manager/workflow_v2/models/execution.py b/backend/workflow_manager/workflow_v2/models/execution.py index 1f040d654..905214ced 100644 --- a/backend/workflow_manager/workflow_v2/models/execution.py +++ b/backend/workflow_manager/workflow_v2/models/execution.py @@ -1,8 +1,17 @@ +import logging import uuid +from typing import Optional +from api_v2.models import APIDeployment +from django.core.exceptions import ObjectDoesNotExist from django.db import models +from pipeline_v2.models import Pipeline from tags.models import Tag from utils.models.base_model import BaseModel +from workflow_manager.workflow_v2.models import Workflow + +logger = logging.getLogger(__name__) + EXECUTION_ERROR_LENGTH = 256 @@ -32,6 +41,7 @@ class Type(models.TextChoices): null=True, db_comment="task id of asynchronous execution", ) + # TODO: Make as foreign key to access the instance directly workflow_id = models.UUIDField( editable=False, db_comment="Id of workflow to be executed" ) @@ -68,12 +78,47 @@ class Meta: verbose_name = "Workflow Execution" verbose_name_plural = "Workflow Executions" db_table = "workflow_execution" + indexes = [ + models.Index(fields=["workflow_id", "-created_at"]), + models.Index(fields=["pipeline_id", "-created_at"]), + ] @property def tag_names(self) -> list[str]: """Return a list of tag names associated with the workflow execution.""" return list(self.tags.values_list("name", flat=True)) + @property + def workflow_name(self) -> Optional[str]: + """Obtains the workflow's name associated to this execution.""" + try: + return Workflow.objects.get(id=self.workflow_id).workflow_name + except ObjectDoesNotExist: + logger.warning( + f"Expected workflow ID '{self.workflow_id}' to exist but missing" + ) + return None + + @property + def pipeline_name(self) -> Optional[str]: + """Obtains the pipeline's name associated to this execution. + It could be ETL / TASK / API pipeline, None returned if there's no such pipeline + """ + if not self.pipeline_id: + return None + + try: + return APIDeployment.objects.get(id=self.pipeline_id).display_name + except ObjectDoesNotExist: + pass + + try: + return Pipeline.objects.get(id=self.pipeline_id).pipeline_name + except ObjectDoesNotExist: + pass + + return None + def __str__(self) -> str: return ( f"Workflow execution: {self.id} (" diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index 4478ad359..27fbeab29 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -258,6 +258,11 @@ def _process_file( except Exception as e: error = f"Error processing file '{os.path.basename(input_file)}'. {str(e)}" execution_service.publish_log(error, level=LogLevel.ERROR) + workflow_file_execution.update_status( + status=ExecutionStatus.ERROR, + execution_error=error, + ) + # Not propagating error here to continue execution for other files execution_service.publish_update_log( LogState.RUNNING, f"Processing output for {file_name}", @@ -368,6 +373,8 @@ def run_workflow( ) raise finally: + # TODO: Handle error gracefully during delete + # Mark status as an ERROR correctly destination.delete_execution_directory() @staticmethod